Skip to content

Commit

Permalink
Expose Engine.Searcher provider to ingest plugins. (#41010)
Browse files Browse the repository at this point in the history
Relates to #32789
  • Loading branch information
martijnvg committed Apr 24, 2019
1 parent 38e6dcd commit a61ec11
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 9 deletions.
36 changes: 34 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -45,8 +47,12 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -86,17 +92,43 @@ public class IngestService implements ClusterStateApplier {

public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {
List<IngestPlugin> ingestPlugins, IndicesService indicesService) {
this.clusterService = clusterService;
this.scriptService = scriptService;
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
this.processorFactories = processorFactories(
ingestPlugins,
new Processor.Parameters(
env, scriptService, analysisRegistry,
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this
), this, indexExpression -> {
ClusterState state = clusterService.state();
Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression);
if (resolvedIndices.length != 1) {
throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index");
}
Index index = resolvedIndices[0];

// check if indexExpression matches with an alias that has a filter
// There is no guarantee that alias filters are applied, so fail if this is the case.
Set<String> indicesAndAliases = resolver.resolveExpressions(state, indexExpression);
String[] aliasesWithFilter = resolver.filteringAliases(state, index.getName(), indicesAndAliases);
if (aliasesWithFilter != null && aliasesWithFilter.length > 0) {
throw new IllegalStateException("expression [" + indexExpression + "] points an alias with a filter");
}

IndexService indexService = indicesService.indexServiceSafe(index);
int numShards = indexService.getMetaData().getNumberOfShards();
if (numShards != 1) {
throw new IllegalStateException("index [" + index.getName() + "] must have 1 shard, but has " + numShards +
" shards");
}

IndexShard indexShard = indexService.getShard(0);
return indexShard.acquireSearcher("ingest");
}
)
);
this.threadPool = threadPool;
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/ingest/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;

import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -110,16 +112,24 @@ class Parameters {
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;

/**
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
*
* The locally allocated index must be have a single primary shard.
*/
public final Function<String, Engine.Searcher> localShardSearcher;

public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService) {
IngestService ingestService, Function<String, Engine.Searcher> localShardSearcher) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.ingestService = ingestService;
this.localShardSearcher = localShardSearcher;
}

}
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ protected Node(
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
Expand Down Expand Up @@ -419,6 +417,10 @@ protected Node(
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);

final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class), indicesService);

final AliasValidator aliasValidator = new AliasValidator();

final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(
Expand Down
Loading

0 comments on commit a61ec11

Please sign in to comment.