Skip to content

Commit

Permalink
Filter enrich policy index deletes to just the policy's associated in…
Browse files Browse the repository at this point in the history
…dices (#82568)
  • Loading branch information
joegallo committed Jan 14, 2022
1 parent eebbc7b commit 6bab477
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,32 @@ public static String getBaseName(String policyName) {
return ENRICH_INDEX_NAME_BASE + policyName;
}

/**
* Given a policy name and a timestamp, return the enrich index name that should be used.
*
* @param policyName the name of the policy
* @param nowTimestamp the current time
* @return an enrich index name
*/
public static String getIndexName(String policyName, long nowTimestamp) {
Objects.nonNull(policyName);
return EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
}

/**
* Tests whether the named policy is associated with the named index according to the naming
* pattern that exists between policy names and index names.
*
* @param policyName the policy name
* @param indexName the index name
* @return true if and only if the named policy is associated with the named index
*/
public static boolean isPolicyForIndex(String policyName, String indexName) {
Objects.nonNull(policyName);
Objects.nonNull(indexName);
return indexName.matches(EnrichPolicy.getBaseName(policyName) + "-" + "\\d+");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuild

private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) {
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
String enrichIndexName = EnrichPolicy.getIndexName(policyName, nowTimestamp);
Settings enrichIndexSettings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

Expand Down Expand Up @@ -80,23 +81,24 @@ protected void masterOperation(
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
final String policyName = request.getName();
final EnrichPolicy policy = EnrichStore.getPolicy(policyName, state); // ensure the policy exists first
if (policy == null) {
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
throw new ResourceNotFoundException("policy [{}] not found", policyName);
}

enrichPolicyLocks.lockPolicy(request.getName());
enrichPolicyLocks.lockPolicy(policyName);
try {
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
List<String> pipelinesWithProcessors = new ArrayList<>();
final List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
final List<String> pipelinesWithProcessors = new ArrayList<>();

for (PipelineConfiguration pipelineConfiguration : pipelines) {
List<AbstractEnrichProcessor> enrichProcessors = ingestService.getProcessorsInPipeline(
pipelineConfiguration.getId(),
AbstractEnrichProcessor.class
);
for (AbstractEnrichProcessor processor : enrichProcessors) {
if (processor.getPolicyName().equals(request.getName())) {
if (processor.getPolicyName().equals(policyName)) {
pipelinesWithProcessors.add(pipelineConfiguration.getId());
}
}
Expand All @@ -106,26 +108,30 @@ protected void masterOperation(
throw new ElasticsearchStatusException(
"Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT,
request.getName(),
policyName,
pipelinesWithProcessors
);
}
} catch (Exception e) {
enrichPolicyLocks.releasePolicy(request.getName());
enrichPolicyLocks.releasePolicy(policyName);
listener.onFailure(e);
return;
}

GetIndexRequest indices = new GetIndexRequest().indices(EnrichPolicy.getBaseName(request.getName()) + "-*")
final GetIndexRequest indices = new GetIndexRequest().indices(EnrichPolicy.getBaseName(policyName) + "-*")
.indicesOptions(IndicesOptions.lenientExpand());

String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, indices);

deleteIndicesAndPolicy(concreteIndices, request.getName(), ActionListener.wrap((response) -> {
enrichPolicyLocks.releasePolicy(request.getName());
// the wildcard expansion could be too wide (e.g. in the case of a policy named policy-1 and another named policy-10),
// so we need to filter down to just the concrete indices that are actually indices for this policy
concreteIndices = Stream.of(concreteIndices).filter(i -> EnrichPolicy.isPolicyForIndex(policyName, i)).toArray(String[]::new);

deleteIndicesAndPolicy(concreteIndices, policyName, ActionListener.wrap((response) -> {
enrichPolicyLocks.releasePolicy(policyName);
listener.onResponse(response);
}, (exc) -> {
enrichPolicyLocks.releasePolicy(request.getName());
enrichPolicyLocks.releasePolicy(policyName);
listener.onFailure(exc);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,19 @@ public static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPoli
assertThat(newInstance.getMatchField(), equalTo(expectedInstance.getMatchField()));
assertThat(newInstance.getEnrichFields(), equalTo(expectedInstance.getEnrichFields()));
}

public void testIsPolicyForIndex() {
String policy1 = "policy-1";
String policy2 = "policy-10"; // the first policy is a prefix of the second policy!

String index1 = EnrichPolicy.getIndexName(policy1, 1000);
String index2 = EnrichPolicy.getIndexName(policy2, 2000);

assertTrue(EnrichPolicy.isPolicyForIndex(policy1, index1));
assertTrue(EnrichPolicy.isPolicyForIndex(policy2, index2));

assertFalse(EnrichPolicy.isPolicyForIndex(policy1, index2));
assertFalse(EnrichPolicy.isPolicyForIndex(policy2, index1));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void cleanupPolicy() {

public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
String fakeId = "fake-id";
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");
createIndex(EnrichPolicy.getIndexName(fakeId, 1001));
createIndex(EnrichPolicy.getIndexName(fakeId, 1002));

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
Expand Down Expand Up @@ -127,13 +127,13 @@ public void testDeleteIsNotLocked() throws Exception {
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
}

createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
createIndex(EnrichPolicy.getIndexName(name, 1001));
createIndex(EnrichPolicy.getIndexName(name, 1002));

client().admin()
.indices()
.prepareGetIndex()
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
.setIndices(EnrichPolicy.getIndexName(name, 1001), EnrichPolicy.getIndexName(name, 1002))
.get();

final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -159,7 +159,7 @@ public void onFailure(final Exception e) {
() -> client().admin()
.indices()
.prepareGetIndex()
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
.setIndices(EnrichPolicy.getIndexName(name, 1001), EnrichPolicy.getIndexName(name, 1001))
.get()
);

Expand All @@ -182,8 +182,8 @@ public void testDeleteLocked() throws InterruptedException {
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());

createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
createIndex(EnrichPolicy.getIndexName(name, 1001));
createIndex(EnrichPolicy.getIndexName(name, 1002));

EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
Expand Down Expand Up @@ -240,4 +240,50 @@ public void onFailure(final Exception e) {
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}
}

public void testDeletePolicyPrefixes() throws InterruptedException {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);

String name = "my-policy";
String otherName = "my-policy-two"; // the first policy is a prefix of this one

final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
AtomicReference<Exception> error;
error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());
error = saveEnrichPolicy(otherName, policy, clusterService);
assertThat(error.get(), nullValue());

// create an index for the *other* policy
createIndex(EnrichPolicy.getIndexName(otherName, 1001));

{
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();

transportAction.execute(null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}

public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());

assertNull(EnrichStore.getPolicy(name, clusterService.state()));

// deleting name policy should have no effect on the other policy
assertNotNull(EnrichStore.getPolicy(otherName, clusterService.state()));

// and the index associated with the other index should be unaffected
client().admin().indices().prepareGetIndex().setIndices(EnrichPolicy.getIndexName(otherName, 1001)).get();
}
}
}

0 comments on commit 6bab477

Please sign in to comment.