Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -577,12 +577,12 @@ private static TokenizerFactory parseTokenizerFactory(AnalyzeRequest request, An
return tokenizerFactory;
}

private static IndexSettings getNaIndexSettings(Settings settings) {
public static IndexSettings getNaIndexSettings(Settings settings) {
IndexMetaData metaData = IndexMetaData.builder(IndexMetaData.INDEX_UUID_NA_VALUE).settings(settings).build();
return new IndexSettings(metaData, Settings.EMPTY);
}

private static Settings getAnonymousSettings(Settings providerSetting) {
public static Settings getAnonymousSettings(Settings providerSetting) {
return Settings.builder().put(providerSetting)
// for _na_
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ protected void doRun() throws Exception {
}
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
}

@Override
public void onAfter() {
request.getPipeline().close();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,17 @@ static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config,
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
try {
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
} catch (Exception ex) {
try {
pipeline.close();
} catch (Exception cex) {
ex.addSuppressed(cex);
}
throw ex;
}
}

private static List<IngestDocument> parseDocs(Map<String, Object> config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
Expand Down Expand Up @@ -50,18 +49,23 @@ public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool,
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
final Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false).v2();

final SimulatePipelineRequest.Parsed simulateRequest;
SimulatePipelineRequest.Parsed simulateRequest = null;
try {
if (request.getId() != null) {
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
} else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
}
executionService.execute(simulateRequest, listener);
} catch (Exception e) {
try {
if (simulateRequest != null) {
simulateRequest.getPipeline().close();
}
} catch (Exception cex) {
e.addSuppressed(cex);
}
listener.onFailure(e);
return;
}

executionService.execute(simulateRequest, listener);
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,14 @@ private ElasticsearchException newCompoundProcessorException(Exception e, String

return exception;
}

@Override
public void close() {
for (Processor processor : processors) {
processor.close();
}
for (Processor processor : onFailureProcessors) {
processor.close();
}
}
}
68 changes: 44 additions & 24 deletions core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -29,6 +30,8 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.ingest.Processor.closeWhileCatchingExceptions;

public final class ConfigurationUtils {

public static final String TAG_KEY = "tag";
Expand Down Expand Up @@ -240,15 +243,22 @@ public static ElasticsearchException newConfigurationException(String processorT
public static List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs,
Map<String, Processor.Factory> processorFactories) throws Exception {
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
boolean success = false;
try {
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
}
}
}
success = true;
return processors;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(processors);
}
}

return processors;
}

public static TemplateService.Template compileTemplate(String processorType, String processorTag, String propertyName,
Expand Down Expand Up @@ -281,27 +291,37 @@ public static Processor readProcessor(Map<String, Processor.Factory> processorFa
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);

List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);

if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY,
"processors list cannot be empty");
}

List<Processor> onFailureProcessors = null;
Processor processor = null;
try {
Processor processor = factory.create(processorFactories, tag, config);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}",
type, Arrays.toString(config.keySet().toArray()));
onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);

if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY,
"processors list cannot be empty");
}
if (onFailureProcessors.size() > 0 || ignoreFailure) {
return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors);
} else {
return processor;

try {
processor = factory.create(processorFactories, tag, config);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration " +
"parameters {}", type, Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessors.size() > 0 || ignoreFailure) {
CompoundProcessor compoundProcessor =
new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors);
return compoundProcessor;
} else {
return processor;
}
} catch (Exception e) {
throw newConfigurationException(type, tag, null, e);
}
} catch (Exception e) {
throw newConfigurationException(type, tag, null, e);
} catch (Exception ex) {
Processor.closeWhileCatchingExceptions(processor, ex);
closeWhileCatchingExceptions(onFailureProcessors, ex);
throw ex;
}
}
throw new ElasticsearchParseException("No processor type exists with name [" + type + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -40,10 +41,12 @@ public class IngestService {
private final PipelineExecutionService pipelineExecutionService;

public IngestService(Settings settings, ThreadPool threadPool,
Environment env, ScriptService scriptService, List<IngestPlugin> ingestPlugins) {
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {

final TemplateService templateService = new InternalTemplateService(scriptService);
Processor.Parameters parameters = new Processor.Parameters(env, scriptService, templateService,
threadPool.getThreadContext());
analysisRegistry, threadPool.getThreadContext());
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (IngestPlugin ingestPlugin : ingestPlugins) {
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(parameters);
Expand Down
41 changes: 29 additions & 12 deletions core/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.lease.Releasable;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.ingest.Processor.closeWhileCatchingExceptions;

/**
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
*/
public final class Pipeline {
public final class Pipeline implements Releasable {

static final String DESCRIPTION_KEY = "description";
static final String PROCESSORS_KEY = "processors";
Expand Down Expand Up @@ -96,25 +99,39 @@ public List<Processor> flattenAllProcessors() {
return compoundProcessor.flattenProcessors();
}

@Override
public void close() {
compoundProcessor.close();
}

public static final class Factory {

public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorFactories) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
List<Processor> processors = null;
List<Processor> onFailureProcessors = null;
try {
processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("pipeline [" + id +
onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("pipeline [" + id +
"] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined");
}
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors),
}
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined");
}
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors),
Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor);
Pipeline pipeline = new Pipeline(id, description, compoundProcessor);
return pipeline;
} catch (Exception ex) {
closeWhileCatchingExceptions(processors, ex);
closeWhileCatchingExceptions(onFailureProcessors, ex);
throw ex;
}
}

}
Expand Down
59 changes: 36 additions & 23 deletions core/src/main/java/org/elasticsearch/ingest/PipelineStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@

package org.elasticsearch.ingest;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
Expand All @@ -45,6 +38,15 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.ingest.Processor.closeWhileCatchingExceptions;

public class PipelineStore extends AbstractComponent implements ClusterStateListener {

private final Pipeline.Factory factory = new Pipeline.Factory();
Expand Down Expand Up @@ -74,16 +76,26 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
}

Map<String, Pipeline> pipelines = new HashMap<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
} catch (ElasticsearchParseException e) {
throw e;
} catch (Exception e) {
throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e);
Map<String, Pipeline> oldPipelines = this.pipelines;
try {
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
} catch (ElasticsearchParseException e) {
throw e;
} catch (Exception e) {
throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e);
}
}
this.pipelines = Collections.unmodifiableMap(pipelines);;
} catch (Exception ex){
closeWhileCatchingExceptions(pipelines.values(), ex);
throw ex;
}

for(Pipeline pipeline : oldPipelines.values()) {
pipeline.close();
}
this.pipelines = Collections.unmodifiableMap(pipelines);
}

/**
Expand Down Expand Up @@ -152,17 +164,18 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
}

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories);
List<IllegalArgumentException> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
if (entry.getValue().containsProcessor(processor.getType()) == false) {
String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
exceptions.add(new IllegalArgumentException(message));
try (Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories)) {
List<IllegalArgumentException> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
if (entry.getValue().containsProcessor(processor.getType()) == false) {
String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
exceptions.add(new IllegalArgumentException(message));
}
}
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
Expand Down
Loading