Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify watcher indexing listener. #52627

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -97,20 +97,25 @@ void setConfiguration(Configuration configuration) {
*
* @param shardId The shard id object of the document being processed
* @param operation The index operation
* @return The index operation
* @param result The result of the operation
*/
@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResult result) {
if (isWatchDocument(shardId.getIndexName())) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
postIndex(shardId, operation, result.getFailure());
return;
}

ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
operation.getIfSeqNo(), operation.getIfPrimaryTerm());
ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",
watch.id(), shardId, configuration.localShards.keySet());
return operation;
watch.id(), shardId, configuration.localShards.keySet());
return;
}

boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
Expand All @@ -128,32 +133,12 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
} catch (IOException e) {
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
}

}

return operation;
}

/**
* In case of a document related failure (for example version conflict), then clean up resources for a watch
* in the trigger service.
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
assert result.getFailure() != null;
postIndex(shardId, index, result.getFailure());
}
}

/**
* In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind
*
* TODO: If the configuration changes between preindex and postindex methods and we add a
* watch, that could not be indexed
* TODO: this watch might not be deleted from the triggerservice. Are we willing to accept this?
* TODO: This could be circumvented by using a threadlocal in preIndex(), that contains the
* watch and is cleared afterwards
* In case of an engine related error, we just log that we failed the add the watch to the trigger service.
* No need to interact with the trigger service.
*
* @param shardId The shard id object of the document being processed
* @param index The index operation
Expand All @@ -162,8 +147,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (isWatchDocument(shardId.getIndexName())) {
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
triggerService.remove(index.id());
logger.debug(() -> new ParameterizedMessage("failed to add watch [{}] to trigger service", index.id()), ex);
}
}

Expand Down
Expand Up @@ -113,18 +113,20 @@ public void testPreIndexCheckActive() throws Exception {
verifyZeroInteractions(parser);
}

public void testPreIndex() throws Exception {
public void testPostIndex() throws Exception {
when(operation.id()).thenReturn(randomAlphaOfLength(10));
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
List<Engine.Result.Type> types = new ArrayList<>(List.of(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);
when(result.getResultType()).thenReturn(randomFrom(types));

boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch("_id", watchActive, isNewWatch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);

Engine.Index returnedOperation = listener.preIndex(shardId, operation);
assertThat(returnedOperation, is(operation));
listener.postIndex(shardId, operation, result);
ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());

Expand All @@ -139,12 +141,13 @@ public void testPreIndex() throws Exception {

// this test emulates an index with 10 shards, and ensures that triggering only happens on a
// single shard
public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
String id = randomAlphaOfLength(10);
int totalShardCount = randomIntBetween(1, 10);
boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch(id, watchActive, isNewWatch);
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
Expand All @@ -154,7 +157,7 @@ public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Excep
localShards.put(shardId, new ShardAllocationConfiguration(idx, totalShardCount, Collections.emptyList()));
Configuration configuration = new Configuration(Watch.INDEX, localShards);
listener.setConfiguration(configuration);
listener.preIndex(shardId, operation);
listener.postIndex(shardId, operation, result);
}

// no matter how many shards we had, this should have been only called once
Expand Down Expand Up @@ -186,16 +189,17 @@ private Watch mockWatch(String id, boolean active, boolean isNewWatch) {
return watch;
}

public void testPreIndexCheckParsingException() throws Exception {
public void testPostIndexCheckParsingException() throws Exception {
String id = randomAlphaOfLength(10);
when(operation.id()).thenReturn(id);
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong()))
.thenThrow(new IOException("self thrown"));
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class,
() -> listener.preIndex(shardId, operation));
() -> listener.postIndex(shardId, operation, result));
assertThat(exc.getMessage(), containsString("Could not parse watch"));
assertThat(exc.getMessage(), containsString(id));
}
Expand All @@ -206,19 +210,6 @@ public void testPostIndexRemoveTriggerOnDocumentRelatedException() throws Except
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, result);
verify(triggerService).remove(eq("_id"));
}

public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception {
List<Engine.Result.Type> types = new ArrayList<>(List.of(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);

when(operation.id()).thenReturn("_id");
when(result.getResultType()).thenReturn(randomFrom(types));
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, result);
verifyZeroInteractions(triggerService);
}
Expand All @@ -238,7 +229,7 @@ public void testPostIndexRemoveTriggerOnEngineLevelException() throws Exception
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever"));
verify(triggerService).remove(eq("_id"));
verifyZeroInteractions(triggerService);
}

public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception {
Expand Down