Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest cluster state listeners #46650

Merged
merged 3 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -84,6 +85,7 @@ public class IngestService implements ClusterStateApplier {
private volatile Map<String, PipelineHolder> pipelines = Map.of();
private final ThreadPool threadPool;
private final IngestMetric totalMetrics = new IngestMetric();
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();

public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
Expand Down Expand Up @@ -385,6 +387,17 @@ public IngestStats stats() {
return statsBuilder.build();
}

/**
* Adds a listener that gets invoked with the current cluster state before processor factories
* get invoked.
*
* This is useful for components that are used by ingest processors, so that they have the opportunity to update
* before these components get used by the ingest processor factory.
*/
public void addIngestClusterStateListener(Consumer<ClusterState> listener) {
ingestClusterStateListeners.add(listener);
}

//package private for testing
static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
Expand Down Expand Up @@ -457,6 +470,12 @@ public void applyClusterState(final ClusterChangedEvent event) {
return;
}

// Publish cluster state to components that are used by processor factories before letting
// processor factories create new processor instances.
// (Note that this needs to be done also in the case when there is no change to ingest metadata, because in the case
// when only the part of the cluster state that a component is interested in, is updated.)
ingestClusterStateListeners.forEach(consumer -> consumer.accept(state));

IngestMetadata newIngestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
if (newIngestMetadata == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -1117,6 +1118,44 @@ public String getTag() {
verify(dropHandler, times(1)).accept(indexRequest);
}

public void testIngestClusterStateListeners_orderOfExecution() {
final AtomicInteger counter = new AtomicInteger(0);

// Ingest cluster state listener state should be invoked first:
Consumer<ClusterState> ingestClusterStateListener = clusterState -> {
assertThat(counter.compareAndSet(0, 1), is(true));
};

// Processor factory should be invoked secondly after ingest cluster state listener:
IngestPlugin testPlugin = new IngestPlugin() {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap("test", (factories, tag, config) -> {
assertThat(counter.compareAndSet(1, 2), is(true));
return new FakeProcessor("test", tag, ingestDocument -> {});
});
}
};

// Create ingest service:
ThreadPool tp = mock(ThreadPool.class);
Client client = mock(Client.class);
IngestService ingestService =
new IngestService(mock(ClusterService.class), tp, null, null, null, List.of(testPlugin), client);
ingestService.addIngestClusterStateListener(ingestClusterStateListener);

// Create pipeline and apply the resulting cluster state, which should update the counter in the right order:
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"test\" : {}}]}"), XContentType.JSON);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

// Sanity check that counter has been updated twice:
assertThat(counter.get(), equalTo(2));
}

private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
}
Expand Down