diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java index f035bc0f4b7d1..41b533d21f15e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java @@ -577,12 +577,12 @@ private static TokenizerFactory parseTokenizerFactory(AnalyzeRequest request, An return tokenizerFactory; } - private static IndexSettings getNaIndexSettings(Settings settings) { + public static IndexSettings getNaIndexSettings(Settings settings) { IndexMetaData metaData = IndexMetaData.builder(IndexMetaData.INDEX_UUID_NA_VALUE).settings(settings).build(); return new IndexSettings(metaData, Settings.EMPTY); } - private static Settings getAnonymousSettings(Settings providerSetting) { + public static Settings getAnonymousSettings(Settings providerSetting) { return Settings.builder().put(providerSetting) // for _na_ .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index db7397ba1f86b..fcfb1cef34313 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -71,6 +71,11 @@ protected void doRun() throws Exception { } listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); } + + @Override + public void onAfter() { + request.getPipeline().close(); + } }); } } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index a63f7a30dbeac..31cf0f659ce8d 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -142,8 +142,17 @@ static Parsed parseWithPipelineId(String pipelineId, Map config, static Parsed parse(Map config, boolean verbose, PipelineStore pipelineStore) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); - List ingestDocumentList = parseDocs(config); - return new Parsed(pipeline, ingestDocumentList, verbose); + try { + List ingestDocumentList = parseDocs(config); + return new Parsed(pipeline, ingestDocumentList, verbose); + } catch (Exception ex) { + try { + pipeline.close(); + } catch (Exception cex) { + ex.addSuppressed(cex); + } + throw ex; + } } private static List parseDocs(Map config) { diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 4f9a219c8ad9e..c31f03053d438 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -50,18 +49,23 @@ public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, protected void doExecute(SimulatePipelineRequest request, ActionListener listener) { final Map source = XContentHelper.convertToMap(request.getSource(), false).v2(); - final SimulatePipelineRequest.Parsed simulateRequest; + SimulatePipelineRequest.Parsed simulateRequest = null; try { if (request.getId() != null) { simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore); } else { simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore); } + executionService.execute(simulateRequest, listener); } catch (Exception e) { + try { + if (simulateRequest != null) { + simulateRequest.getPipeline().close(); + } + } catch (Exception cex) { + e.addSuppressed(cex); + } listener.onFailure(e); - return; } - - executionService.execute(simulateRequest, listener); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 3ab7c078cd7ad..ad41962369f55 100644 --- a/core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -164,4 +164,14 @@ private ElasticsearchException newCompoundProcessorException(Exception e, String return exception; } + + @Override + public void close() { + for (Processor processor : processors) { + processor.close(); + } + for (Processor processor : onFailureProcessors) { + processor.close(); + } + } } diff --git a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 3f725c43b25a9..8d30fe4e4cc06 100644 --- a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; @@ -29,6 +30,8 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.ingest.Processor.closeWhileCatchingExceptions; + public final class ConfigurationUtils { public static final String TAG_KEY = "tag"; @@ -240,15 +243,22 @@ public static ElasticsearchException newConfigurationException(String processorT public static List readProcessorConfigs(List>> processorConfigs, Map processorFactories) throws Exception { List processors = new ArrayList<>(); - if (processorConfigs != null) { - for (Map> processorConfigWithKey : processorConfigs) { - for (Map.Entry> entry : processorConfigWithKey.entrySet()) { - processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue())); + boolean success = false; + try { + if (processorConfigs != null) { + for (Map> processorConfigWithKey : processorConfigs) { + for (Map.Entry> entry : processorConfigWithKey.entrySet()) { + processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue())); + } } } + success = true; + return processors; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(processors); + } } - - return processors; } public static TemplateService.Template compileTemplate(String processorType, String processorTag, String propertyName, @@ -281,27 +291,37 @@ public static Processor readProcessor(Map processorFa List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); - List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories); - String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); - - if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { - throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY, - "processors list cannot be empty"); - } - + List onFailureProcessors = null; + Processor processor = null; try { - Processor processor = factory.create(processorFactories, tag, config); - if (config.isEmpty() == false) { - throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}", - type, Arrays.toString(config.keySet().toArray())); + onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); + + if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { + throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY, + "processors list cannot be empty"); } - if (onFailureProcessors.size() > 0 || ignoreFailure) { - return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); - } else { - return processor; + + try { + processor = factory.create(processorFactories, tag, config); + if (config.isEmpty() == false) { + throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration " + + "parameters {}", type, Arrays.toString(config.keySet().toArray())); + } + if (onFailureProcessors.size() > 0 || ignoreFailure) { + CompoundProcessor compoundProcessor = + new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); + return compoundProcessor; + } else { + return processor; + } + } catch (Exception e) { + throw newConfigurationException(type, tag, null, e); } - } catch (Exception e) { - throw newConfigurationException(type, tag, null, e); + } catch (Exception ex) { + Processor.closeWhileCatchingExceptions(processor, ex); + closeWhileCatchingExceptions(onFailureProcessors, ex); + throw ex; } } throw new ElasticsearchParseException("No processor type exists with name [" + type + "]"); diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index 07e2aa1fe5160..5249ed7a7dc84 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -40,10 +41,12 @@ public class IngestService { private final PipelineExecutionService pipelineExecutionService; public IngestService(Settings settings, ThreadPool threadPool, - Environment env, ScriptService scriptService, List ingestPlugins) { + Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, + List ingestPlugins) { + final TemplateService templateService = new InternalTemplateService(scriptService); Processor.Parameters parameters = new Processor.Parameters(env, scriptService, templateService, - threadPool.getThreadContext()); + analysisRegistry, threadPool.getThreadContext()); Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { Map newProcessors = ingestPlugin.getProcessors(parameters); diff --git a/core/src/main/java/org/elasticsearch/ingest/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java index e29d206543ca4..6d81e8c4128c1 100644 --- a/core/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -20,16 +20,19 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.lease.Releasable; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.elasticsearch.ingest.Processor.closeWhileCatchingExceptions; + /** * A pipeline is a list of {@link Processor} instances grouped under a unique id. */ -public final class Pipeline { +public final class Pipeline implements Releasable { static final String DESCRIPTION_KEY = "description"; static final String PROCESSORS_KEY = "processors"; @@ -96,25 +99,39 @@ public List flattenAllProcessors() { return compoundProcessor.flattenProcessors(); } + @Override + public void close() { + compoundProcessor.close(); + } + public static final class Factory { public Pipeline create(String id, Map config, Map processorFactories) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); List>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); - List>> onFailureProcessorConfigs = + List processors = null; + List onFailureProcessors = null; + try { + processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); + List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); - if (config.isEmpty() == false) { - throw new ElasticsearchParseException("pipeline [" + id + + onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + if (config.isEmpty() == false) { + throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); - } - if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { - throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); - } - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), + } + if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { + throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); + } + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); - return new Pipeline(id, description, compoundProcessor); + Pipeline pipeline = new Pipeline(id, description, compoundProcessor); + return pipeline; + } catch (Exception ex) { + closeWhileCatchingExceptions(processors, ex); + closeWhileCatchingExceptions(onFailureProcessors, ex); + throw ex; + } } } diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 94850674e755c..169875822b9d6 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -19,13 +19,6 @@ package org.elasticsearch.ingest; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -45,6 +38,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.ingest.Processor.closeWhileCatchingExceptions; + public class PipelineStore extends AbstractComponent implements ClusterStateListener { private final Pipeline.Factory factory = new Pipeline.Factory(); @@ -74,16 +76,26 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) { } Map pipelines = new HashMap<>(); - for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { - try { - pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); - } catch (ElasticsearchParseException e) { - throw e; - } catch (Exception e) { - throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e); + Map oldPipelines = this.pipelines; + try { + for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { + try { + pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); + } catch (ElasticsearchParseException e) { + throw e; + } catch (Exception e) { + throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e); + } } + this.pipelines = Collections.unmodifiableMap(pipelines);; + } catch (Exception ex){ + closeWhileCatchingExceptions(pipelines.values(), ex); + throw ex; + } + + for(Pipeline pipeline : oldPipelines.values()) { + pipeline.close(); } - this.pipelines = Collections.unmodifiableMap(pipelines); } /** @@ -152,17 +164,18 @@ void validatePipeline(Map ingestInfos, PutPipelineReq } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2(); - Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories); - List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { - for (Map.Entry entry : ingestInfos.entrySet()) { - if (entry.getValue().containsProcessor(processor.getType()) == false) { - String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; - exceptions.add(new IllegalArgumentException(message)); + try (Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories)) { + List exceptions = new ArrayList<>(); + for (Processor processor : pipeline.flattenAllProcessors()) { + for (Map.Entry entry : ingestInfos.entrySet()) { + if (entry.getValue().containsProcessor(processor.getType()) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add(new IllegalArgumentException(message)); + } } } + ExceptionsHelper.rethrowAndSuppress(exceptions); } - ExceptionsHelper.rethrowAndSuppress(exceptions); } ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { diff --git a/core/src/main/java/org/elasticsearch/ingest/Processor.java b/core/src/main/java/org/elasticsearch/ingest/Processor.java index ef1cd882d2222..672cb87b51374 100644 --- a/core/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/core/src/main/java/org/elasticsearch/ingest/Processor.java @@ -19,17 +19,20 @@ package org.elasticsearch.ingest; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.script.ScriptService; +import java.util.Collection; import java.util.Map; /** * A processor implementation may modify the data belonging to a document. * Whether changes are made and what exactly is modified is up to the implementation. */ -public interface Processor { +public interface Processor extends Releasable { /** * Introspect and potentially modify the incoming data. @@ -46,6 +49,14 @@ public interface Processor { */ String getTag(); + /** + * Close pipeline + */ + @Override + default void close() { + + } + /** * A factory that knows how to construct a processor based on a map of maps. */ @@ -86,6 +97,11 @@ class Parameters { */ public final TemplateService templateService; + /** + * Provide analyzer support + */ + public final AnalysisRegistry analysisRegistry; + /** * Allows processors to read headers set by {@link org.elasticsearch.action.support.ActionFilter} * instances that have run prior to in ingest. @@ -93,12 +109,32 @@ class Parameters { public final ThreadContext threadContext; public Parameters(Environment env, ScriptService scriptService, TemplateService templateService, - ThreadContext threadContext) { + AnalysisRegistry analysisRegistry, ThreadContext threadContext) { this.env = env; this.scriptService = scriptService; this.templateService = templateService; this.threadContext = threadContext; + this.analysisRegistry = analysisRegistry; } + } + /** + * Closes all processors in case of an exception. If the close operation fails, it adds the exception to the ex as a suppressed + * exception. + */ + static void closeWhileCatchingExceptions(Collection processors, Exception ex) { + if (processors != null) { + for(Releasable processor : processors) { + closeWhileCatchingExceptions(processor, ex); + } + } + } + + static void closeWhileCatchingExceptions(Releasable processor, Exception ex) { + try { + processor.close(); + } catch (Exception cex) { + ex.addSuppressed(cex); + } } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 523e6faefb165..e5c61947ae77d 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -311,7 +311,7 @@ protected Node(final Environment environment, Collection final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId()); resourcesToClose.add(tribeService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, - scriptModule.getScriptService(), pluginsService.filterPlugins(IngestPlugin.class)); + scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 08cde7e04d806..3a842a4690afa 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -39,7 +39,7 @@ public Map getProcessors(Processor.Parameters paramet public void testIngestPlugin() { ThreadPool tp = Mockito.mock(ThreadPool.class); - IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, Collections.singletonList(DUMMY_PLUGIN)); + IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN)); Map factories = ingestService.getPipelineStore().getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); @@ -48,7 +48,7 @@ public void testIngestPlugin() { public void testIngestPluginDuplicate() { ThreadPool tp = Mockito.mock(ThreadPool.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> - new IngestService(Settings.EMPTY, tp, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)) + new IngestService(Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index f0c0e9f6c13d8..84c7846c20ee4 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -660,6 +660,61 @@ A node will not start if either of these plugins are not available. The <> can be used to fetch ingest usage statistics, globally and on a per pipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing. +[[analyzer-processor]] +=== Analyzer Processor +Splits a field into an array of tokens using an analyzer. Only works on string fields. + +[[analyzer-options]] +.Analyzer Options +[options="header"] +|====== +| Name | Required | Default | Description +| `field` | yes | - | The field to be analyzed +| `target_field` | no | `field` | The field to assign the analyzed tokens, by default `field` is updated in-place +| `analyzer` | no | - | The name of the analyzer to use +| `tokenizer` | no | - | The tokenizer to apply +| `char_filter` | no | - | An array of char filters to apply +| `token_filter` | no | - | An array of token filters to apply +|====== + +[NOTE] +Either the `analyzer` or the `tokenizer` parameter has to be specified, but they cannot be used at the same time. + +An existing, globally defined analyzer can be referred by name using the `analyzer` parameter. + +[source,js] +-------------------------------------------------- +{ + "analyzer": { + "field": "my_field", + "analyzer": "standard" + } +} +-------------------------------------------------- + +Alternatively, a custom analyzers can be build by specifying its `tokenizer`, and optionally a list of token filters in the `token_filter` +parameter and char filters in the `char_filter` parameter. All components can be either specified by name or inline using the It is using +the <> syntax. + +[source,js] +-------------------------------------------------- +{ + "analyzer": { + "field": "my_field", + "char_filter": ["html_strip"], + "tokenizer": "keyword", + "filter": [ + "lowercase", + { + "type": "stop", + "stopwords": ["a", "the", "this"] + } + ] + } +} +-------------------------------------------------- + + [[append-procesesor]] === Append Processor Appends one or more values to an existing array if the field already exists and it is an array. diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AnalyzerProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AnalyzerProcessor.java new file mode 100644 index 0000000000000..1d3062aedb1ce --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AnalyzerProcessor.java @@ -0,0 +1,242 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.analysis.CharFilterFactory; +import org.elasticsearch.index.analysis.CustomAnalyzer; +import org.elasticsearch.index.analysis.TokenFilterFactory; +import org.elasticsearch.index.analysis.TokenizerFactory; +import org.elasticsearch.indices.analysis.AnalysisModule; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.elasticsearch.action.admin.indices.analyze.TransportAnalyzeAction.getAnonymousSettings; +import static org.elasticsearch.action.admin.indices.analyze.TransportAnalyzeAction.getNaIndexSettings; + +/** + * Processor that splits fields content into different items based on the occurrence of a specified separator. + * New field value will be an array containing all of the different extracted items. + * Throws exception if the field is null or a type other than string. + */ +public final class AnalyzerProcessor extends AbstractProcessor { + + public static final String TYPE = "analyzer"; + + private final String targetField; + + private final String field; + + private final Analyzer analyzer; + + private final boolean customAnalyzer; + + AnalyzerProcessor(String tag, String field, String targetField, Analyzer analyzer, boolean customAnalyzer) { + super(tag); + this.field = field; + this.targetField = targetField; + this.analyzer = analyzer; + this.customAnalyzer = customAnalyzer; + } + + String getField() { + return field; + } + + String getTargetField() { + return targetField; + } + + Analyzer getAnalyzer() { + return analyzer; + } + + @Override + public void execute(IngestDocument document) { + Object oldVal = document.getFieldValue(field, Object.class); + if (oldVal == null) { + throw new IllegalArgumentException("field [" + field + "] is null, cannot be analyzed."); + } + List splitList = new ArrayList<>(); + if (oldVal instanceof String) { + split(splitList, (String) oldVal); + } else if (oldVal instanceof ArrayList) { + for (Object obj : (ArrayList) oldVal) { + split(splitList, obj.toString()); + } + } else { + throw new IllegalArgumentException("field [" + field + "] has type [" + oldVal.getClass().getName() + + "] and cannot be analyzed"); + } + document.setFieldValue(targetField, splitList); + } + + private void split(List splitList, String val) { + try (TokenStream stream = analyzer.tokenStream(field, val)) { + stream.reset(); + CharTermAttribute term = stream.addAttribute(CharTermAttribute.class); + while (stream.incrementToken()) { + splitList.add(term.toString()); + } + stream.end(); + } catch (IOException e) { + throw new ElasticsearchException("failed to analyze field [" + field + "]", e); + } + } + + @Override + public void close() { + if (customAnalyzer) { + analyzer.close(); + } + } + + @Override + public String getType() { + return TYPE; + } + + public static class Factory implements Processor.Factory { + + private final AnalysisRegistry analysisRegistry; + private final Environment environment; + + public Factory(Environment environment, AnalysisRegistry analysisRegistry) { + this.environment = environment; + this.analysisRegistry = analysisRegistry; + } + + @Override + public AnalyzerProcessor create(Map registry, String processorTag, + Map config) throws Exception { + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", field); + + final Analyzer analyzer; + boolean closeAnalyzer = false; + if (config.containsKey("analyzer")) { + if (config.containsKey("filter") || config.containsKey("token_filter") || + config.containsKey("char_filter") || config.containsKey("tokenizer")) { + throw new ElasticsearchParseException("custom tokenizer or filters cannot be specified if a named analyzer is used"); + } + String analyzerName = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "analyzer"); + analyzer = analysisRegistry.getAnalyzer(analyzerName); + if (analyzer == null) { + throw new IllegalArgumentException("Unknown analyzer [" + analyzerName + "]"); + } + } else if (config.containsKey("tokenizer")) { + Object tokenizer = ConfigurationUtils.readObject(TYPE, processorTag, config, "tokenizer"); + TokenizerFactory tokenizerFactory = parseDefinition(tokenizer, analysisRegistry::getTokenizerProvider, "tokenizer", -1); + + List filters = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "filter"); + if (filters == null) { + filters = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "token_filter"); + } + TokenFilterFactory[] tokenFilterFactories = parseTokenFilterFactories(filters); + + List charFilters = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "char_filter"); + CharFilterFactory[] charFilterFactories = parseCharFilterFactories(charFilters); + + analyzer = new CustomAnalyzer(tokenizerFactory, charFilterFactories, tokenFilterFactories); + closeAnalyzer = true; + } else { + throw new ElasticsearchParseException("missing analyzer definition"); + } + return new AnalyzerProcessor(processorTag, field, targetField, analyzer, closeAnalyzer); + } + + TokenFilterFactory[] parseTokenFilterFactories(List filterDefinitions) throws IOException { + if (filterDefinitions != null && filterDefinitions.size() > 0) { + TokenFilterFactory[] tokenFilterFactories = new TokenFilterFactory[filterDefinitions.size()]; + for (int i = 0; i < filterDefinitions.size(); i++) { + tokenFilterFactories[i] = parseDefinition(filterDefinitions.get(i), analysisRegistry::getTokenFilterProvider, + "token_filter", i); + } + return tokenFilterFactories; + } else { + return new TokenFilterFactory[0]; + } + } + + CharFilterFactory[] parseCharFilterFactories(List filterDefinitions) throws IOException { + if (filterDefinitions != null && filterDefinitions.size() > 0) { + CharFilterFactory[] charFilterFactories = new CharFilterFactory[filterDefinitions.size()]; + for (int i = 0; i < filterDefinitions.size(); i++) { + charFilterFactories[i] = parseDefinition(filterDefinitions.get(i), analysisRegistry::getCharFilterProvider, + "char_filter", i); + } + return charFilterFactories; + } else { + return new CharFilterFactory[0]; + } + } + + private T parseDefinition(Object nameOrDefinition, Function> factory, + String type, int i) throws IOException { + AnalysisModule.AnalysisProvider filterFactoryProvider; + if (nameOrDefinition instanceof String) { + String charFilterName = (String) nameOrDefinition; + filterFactoryProvider = factory.apply(charFilterName); + if (filterFactoryProvider == null) { + throw new IllegalArgumentException("failed to find global " + type + " [" + charFilterName + "]"); + } + return filterFactoryProvider.get(environment, charFilterName); + } else if (nameOrDefinition instanceof Map) { + @SuppressWarnings("unchecked") Map map = (Map) nameOrDefinition; + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.map(map); + Settings settings = Settings.builder().loadFromSource(builder.string()).build(); + settings = getAnonymousSettings(settings); + String filterTypeName = settings.get("type"); + if (filterTypeName == null) { + throw new IllegalArgumentException("Missing [type] setting for anonymous " + type + ": " + map); + } + AnalysisModule.AnalysisProvider charFilterFactoryFactory = factory.apply(filterTypeName); + if (charFilterFactoryFactory == null) { + throw new IllegalArgumentException("failed to find global " + type + " [" + filterTypeName + "]"); + } + // Need to set anonymous "name" of analysis component + String anonymousName = "_anonymous_" + type + (i < 0 ? "_[" + i + "]" : "_"); + return charFilterFactoryFactory.get(getNaIndexSettings(settings), environment, anonymousName, settings); + } else { + throw new IllegalArgumentException("failed to find or create " + type + " for [" + nameOrDefinition + "]"); + } + } + + } +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index c89f6164de7a0..8ab00137b41d2 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -19,6 +19,10 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -28,10 +32,6 @@ import java.util.HashMap; import java.util.Map; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.Plugin; - public class IngestCommonPlugin extends Plugin implements IngestPlugin { private final Map builtinPatterns; @@ -61,6 +61,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(builtinPatterns)); processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)); + processors.put(AnalyzerProcessor.TYPE, new AnalyzerProcessor.Factory(parameters.env, parameters.analysisRegistry)); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AnalyzerProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AnalyzerProcessorFactoryTests.java new file mode 100644 index 0000000000000..506cd001cd811 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AnalyzerProcessorFactoryTests.java @@ -0,0 +1,266 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.analysis.CustomAnalyzer; +import org.elasticsearch.index.analysis.MappingCharFilterFactory; +import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.indices.analysis.AnalysisModule; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; + +public class AnalyzerProcessorFactoryTests extends ESTestCase { + + private final Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + + Environment environment = new Environment(settings); + + private static AnalysisModule.AnalysisProvider requriesAnalysisSettings(AnalysisModule.AnalysisProvider provider) { + return new AnalysisModule.AnalysisProvider() { + @Override + public T get(IndexSettings indexSettings, Environment environment, String name, Settings settings) throws IOException { + return provider.get(indexSettings, environment, name, settings); + } + @Override + public boolean requiresAnalysisSettings() { + return true; + } + }; + } + + private final AnalysisRegistry analysisRegistry = new AnalysisRegistry(environment, + Collections.singletonMap("mapping", requriesAnalysisSettings(MappingCharFilterFactory::new)), + emptyMap(), + emptyMap(), + emptyMap()); + + public void testCreate() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("target_field", "field2"); + config.put("analyzer", "standard"); + String processorTag = randomAsciiOfLength(10); + + try(AnalyzerProcessor analyzerProcessor = factory.create(null, processorTag, config)) { + assertThat(analyzerProcessor.getTag(), equalTo(processorTag)); + assertThat(analyzerProcessor.getField(), equalTo("field1")); + assertThat(analyzerProcessor.getTargetField(), equalTo("field2")); + assertThat(analyzerProcessor.getAnalyzer(), instanceOf(NamedAnalyzer.class)); + assertThat(((NamedAnalyzer)analyzerProcessor.getAnalyzer()).name(), equalTo("standard")); + } + } + + public void testCreateWithMinimalCustomAnalyzer() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("tokenizer", "whitespace"); + + String processorTag = randomAsciiOfLength(10); + try (AnalyzerProcessor analyzerProcessor = factory.create(null, processorTag, config)) { + assertThat(analyzerProcessor.getTag(), equalTo(processorTag)); + assertThat(analyzerProcessor.getField(), equalTo("field1")); + assertThat(analyzerProcessor.getTargetField(), equalTo("field1")); + assertThat(analyzerProcessor.getAnalyzer(), instanceOf(CustomAnalyzer.class)); + + CustomAnalyzer customAnalyzer = (CustomAnalyzer) analyzerProcessor.getAnalyzer(); + assertThat(customAnalyzer.charFilters().length, equalTo(0)); + assertThat(customAnalyzer.tokenFilters().length, equalTo(0)); + } + } + + + public void testCreateWithCustomAnalyzer() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("tokenizer", "keyword"); + + Map truncateConfig = new HashMap<>(); + truncateConfig.put("type", "truncate"); + truncateConfig.put("length", 2); + config.put("filter", Arrays.asList("lowercase", truncateConfig)); + + Map charConfig = new HashMap<>(); + charConfig.put("type", "mapping"); + charConfig.put("mappings", Collections.singletonList("! => ?")); + config.put("char_filter", Collections.singletonList(charConfig)); + + String processorTag = randomAsciiOfLength(10); + try (AnalyzerProcessor analyzerProcessor = factory.create(null, processorTag, config)) { + assertThat(analyzerProcessor.getTag(), equalTo(processorTag)); + assertThat(analyzerProcessor.getField(), equalTo("field1")); + assertThat(analyzerProcessor.getTargetField(), equalTo("field1")); + assertThat(analyzerProcessor.getAnalyzer(), instanceOf(CustomAnalyzer.class)); + + CustomAnalyzer customAnalyzer = (CustomAnalyzer) analyzerProcessor.getAnalyzer(); + assertThat(customAnalyzer.charFilters().length, equalTo(1)); + assertThat(customAnalyzer.tokenFilters().length, equalTo(2)); + } + } + + public void testCreateNoFieldPresent() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("analyzer", "standard"); + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (ElasticsearchParseException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); + } + } + + public void testCreateNoAnalyzerPresent() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (ElasticsearchParseException e) { + assertThat(e.getMessage(), equalTo("missing analyzer definition")); + } + } + + public void testCreateWithUnknownAnalyzer() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("analyzer", "unknown_analyzer"); + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("Unknown analyzer [unknown_analyzer]")); + } + } + + public void testCreateWithUnknownTokenizer() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("tokenizer", "unknown"); + config.put("filter", Collections.singletonList("lowercase")); + config.put("char_filter", Collections.singletonList("html_strip")); + + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("failed to find global tokenizer [unknown]")); + } + } + + public void testCreateWithBothAnalyzerAndTokenizer() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("analyzer", "standard"); + config.put("tokenizer", "keyword"); + + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (ElasticsearchParseException e) { + assertThat(e.getMessage(), equalTo("custom tokenizer or filters cannot be specified if a named analyzer is used")); + } + } + + public void testCreateWithUnknownTokenFilter() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("tokenizer", "keyword"); + config.put("filter", Arrays.asList("lowercase", Collections.singletonMap("type", "unknown"))); + + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("failed to find global token_filter [unknown]")); + } + } + + public void testCreateWithCharFilterWithoutType() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("tokenizer", "keyword"); + config.put("char_filter", Arrays.asList("html_strip", Collections.singletonMap("no_type", "missing"))); + + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("Missing [type] setting for anonymous char_filter: {no_type=missing}")); + } + } + + public void testCreateWithNonStringTokenizerName() throws Exception { + AnalyzerProcessor.Factory factory = new AnalyzerProcessor.Factory(environment, analysisRegistry); + + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("tokenizer", 42); + + String processorTag = randomAsciiOfLength(10); + try { + factory.create(null, processorTag, config); + fail("factory create should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("failed to find or create tokenizer for [42]")); + } + } +} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AnalyzerProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AnalyzerProcessorTests.java new file mode 100644 index 0000000000000..a567d08016f55 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AnalyzerProcessorTests.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.apache.lucene.analysis.Analyzer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class AnalyzerProcessorTests extends ESTestCase { + + + private final Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + + private final AnalysisRegistry analysisRegistry = new AnalysisRegistry(new Environment(settings), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap()); + + private Analyzer getAnalyzer(String name) throws IOException { + return analysisRegistry.getAnalyzer(name); + } + + public void testAnalysis() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "This is a test."); + try (Processor processor = new AnalyzerProcessor(randomAsciiOfLength(10), fieldName, fieldName, getAnalyzer("standard"), false)) { + processor.execute(ingestDocument); + } + assertThat(ingestDocument.getFieldValue(fieldName, List.class), equalTo(Arrays.asList("this", "is", "a", "test"))); + } + + public void testAnalysisMultiValue() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + List multiValueField = new ArrayList<>(); + multiValueField.add("This is"); + multiValueField.add("a test."); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, multiValueField); + try (Processor processor = new AnalyzerProcessor(randomAsciiOfLength(10), fieldName, fieldName, getAnalyzer("standard"), false)) { + processor.execute(ingestDocument); + } + assertThat(ingestDocument.getFieldValue(fieldName, List.class), equalTo(Arrays.asList("this", "is", "a", "test"))); + } + + public void testAnalysisWithTargetField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "This is a test."); + String targetFieldName = randomAsciiOfLength(10); + try (Processor processor = new AnalyzerProcessor(randomAsciiOfLength(10), fieldName, targetFieldName, getAnalyzer("standard"), + false)) { + processor.execute(ingestDocument); + } + assertThat(ingestDocument.getFieldValue(targetFieldName, List.class), equalTo(Arrays.asList("this", "is", "a", "test"))); + } + + public void testAnalysisFieldNotFound() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + String fieldName = RandomDocumentPicks.randomFieldName(random()); + try (Processor processor = new AnalyzerProcessor(randomAsciiOfLength(10), fieldName, fieldName, getAnalyzer("standard"), false)) { + processor.execute(ingestDocument); + fail("analyzer processor should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("not present as part of path [" + fieldName + "]")); + } + } + + public void testAnalysisNullValue() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null)); + try (Processor processor = new AnalyzerProcessor(randomAsciiOfLength(10), "field", "field", getAnalyzer("standard"), false)) { + processor.execute(ingestDocument); + fail("analyzer processor should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("field [field] is null, cannot be analyzed.")); + } + } + + public void testAnalysisNonStringValue() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + String fieldName = RandomDocumentPicks.randomFieldName(random()); + ingestDocument.setFieldValue(fieldName, randomInt()); + try (Processor processor = new AnalyzerProcessor(randomAsciiOfLength(10), fieldName, fieldName, getAnalyzer("standard"), false)) { + processor.execute(ingestDocument); + fail("analyzer processor should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("field [" + fieldName + "] has type [java.lang.Integer] and cannot be analyzed")); + } + } + + public void testAnalysisAppendable() throws Exception { + Map source = new HashMap<>(); + source.put("text", "This is a test."); + IngestDocument ingestDocument = new IngestDocument(source, new HashMap<>()); + try (Processor processor = new AnalyzerProcessor(randomAsciiOfLength(10), "text", "text", getAnalyzer("standard"), false)) { + processor.execute(ingestDocument); + } + @SuppressWarnings("unchecked") + List flags = (List) ingestDocument.getFieldValue("text", List.class); + assertThat(flags, equalTo(Arrays.asList("this", "is", "a", "test"))); + ingestDocument.appendFieldValue("text", "and this"); + assertThat(ingestDocument.getFieldValue("text", List.class), equalTo(Arrays.asList("this", "is", "a", "test", "and this"))); + } + + +} diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml index 14f58369dfacf..9394cd54d1c04 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml @@ -9,21 +9,22 @@ nodes.info: {} - match: { nodes.$master.modules.0.name: ingest-common } - - match: { nodes.$master.ingest.processors.0.type: append } - - match: { nodes.$master.ingest.processors.1.type: convert } - - match: { nodes.$master.ingest.processors.2.type: date } - - match: { nodes.$master.ingest.processors.3.type: date_index_name } - - match: { nodes.$master.ingest.processors.4.type: fail } - - match: { nodes.$master.ingest.processors.5.type: foreach } - - match: { nodes.$master.ingest.processors.6.type: grok } - - match: { nodes.$master.ingest.processors.7.type: gsub } - - match: { nodes.$master.ingest.processors.8.type: join } - - match: { nodes.$master.ingest.processors.9.type: lowercase } - - match: { nodes.$master.ingest.processors.10.type: remove } - - match: { nodes.$master.ingest.processors.11.type: rename } - - match: { nodes.$master.ingest.processors.12.type: script } - - match: { nodes.$master.ingest.processors.13.type: set } - - match: { nodes.$master.ingest.processors.14.type: sort } - - match: { nodes.$master.ingest.processors.15.type: split } - - match: { nodes.$master.ingest.processors.16.type: trim } - - match: { nodes.$master.ingest.processors.17.type: uppercase } + - match: { nodes.$master.ingest.processors.0.type: analyzer } + - match: { nodes.$master.ingest.processors.1.type: append } + - match: { nodes.$master.ingest.processors.2.type: convert } + - match: { nodes.$master.ingest.processors.3.type: date } + - match: { nodes.$master.ingest.processors.4.type: date_index_name } + - match: { nodes.$master.ingest.processors.5.type: fail } + - match: { nodes.$master.ingest.processors.6.type: foreach } + - match: { nodes.$master.ingest.processors.7.type: grok } + - match: { nodes.$master.ingest.processors.8.type: gsub } + - match: { nodes.$master.ingest.processors.9.type: join } + - match: { nodes.$master.ingest.processors.10.type: lowercase } + - match: { nodes.$master.ingest.processors.11.type: remove } + - match: { nodes.$master.ingest.processors.12.type: rename } + - match: { nodes.$master.ingest.processors.13.type: script } + - match: { nodes.$master.ingest.processors.14.type: set } + - match: { nodes.$master.ingest.processors.15.type: sort } + - match: { nodes.$master.ingest.processors.16.type: split } + - match: { nodes.$master.ingest.processors.17.type: trim } + - match: { nodes.$master.ingest.processors.18.type: uppercase } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/140_analyzer.yaml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/140_analyzer.yaml new file mode 100644 index 0000000000000..0dd05943a518f --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/140_analyzer.yaml @@ -0,0 +1,138 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test Analyzer Pipeline": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "analyzer" : { + "field" : "field1", + "analyzer" : "standard" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {field1: "Foo, bar and baz!"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.field1: ["foo", "bar", "and", "baz"] } + +--- +"Test Analyzer Pipeline With Custom Analyzer": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "analyzer" : { + "field" : "field1", + "tokenizer" : "keyword", + "filter" : ["lowercase"] + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {field1: "Foo, bar and baz!"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.field1: ["foo, bar and baz!"] } + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "analyzer" : { + "field" : "field1", + "tokenizer" : "standard", + "token_filter" : [ + "lowercase", + {"type": "stop", "stopwords": ["boo"]} + ], + "char_filter": [ + {"type": "mapping", "mappings": ["F => B"]} + ] + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 2 + pipeline: "my_pipeline" + body: {field1: "Foo, bar and baz!"} + + - do: + get: + index: test + type: test + id: 2 + - match: { _source.field1: ["bar", "and", "baz"] } + +--- +"Test putting analyzer pipeline with invalid analyzer": + - do: + catch: request + ingest.put_pipeline: + id: "bad_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "analyzer" : { + "field" : "field1", + "analyzer" : "unknown_analyzer" + } + } + ] + } + + - match: { error.root_cause.0.type: "exception" } + - match: { error.root_cause.0.header.processor_type: "analyzer" } + - match: { error.caused_by.type: "illegal_argument_exception" } + - match: { error.caused_by.reason: "Unknown analyzer [unknown_analyzer]" }