From 1ac393effabace7cd13de41ad1273820fa24f891 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 3 Apr 2012 08:52:56 -0400 Subject: [PATCH] add plugin mechanism for handling source storage and retrieval --- .../action/update/TransportUpdateAction.java | 12 +- .../index/get/ShardGetService.java | 18 +- .../index/mapper/DocumentMapperParser.java | 15 +- .../elasticsearch/index/mapper/Mapper.java | 10 +- .../index/mapper/MapperService.java | 6 +- .../mapper/internal/SourceFieldMapper.java | 125 +++++-- .../index/source/ExternalSourceProvider.java | 69 ++++ .../source/ExternalSourceProviderFactory.java | 29 ++ .../source/ExternalSourceProviderModule.java | 130 +++++++ .../source/ExternalSourceProviderService.java | 77 ++++ ...xternalSourceProviderSettingsRequired.java | 35 ++ .../source/ParsingExternalSourceProvider.java | 71 ++++ .../indices/InternalIndicesService.java | 2 + .../search/fetch/FetchPhase.java | 11 +- .../search/internal/InternalSearchHit.java | 6 +- .../search/lookup/SearchLookup.java | 2 +- .../search/lookup/SourceLookup.java | 24 +- .../CustomSourceMappingIntegrationTests.java | 345 ++++++++++++++++++ .../source/IdBasedExternalSourceProvider.java | 62 ++++ ...nsformingExternalSourceProviderParser.java | 96 +++++ .../aliases/IndexAliasesServiceTests.java | 2 + .../test/unit/index/mapper/MapperTests.java | 35 +- .../source/CustomSourceMappingTests.java | 65 ++++ .../source/TestExternalSourceProvider.java | 70 ++++ .../percolator/PercolatorExecutorTests.java | 2 + .../query/SimpleIndexQueryParserTests.java | 2 + .../guice/IndexQueryParserModuleTests.java | 2 + .../plugin/IndexQueryParserPlugin2Tests.java | 2 + .../plugin/IndexQueryParserPluginTests.java | 2 + 29 files changed, 1266 insertions(+), 61 deletions(-) create mode 100644 src/main/java/org/elasticsearch/index/source/ExternalSourceProvider.java create mode 100644 src/main/java/org/elasticsearch/index/source/ExternalSourceProviderFactory.java create mode 100644 src/main/java/org/elasticsearch/index/source/ExternalSourceProviderModule.java create mode 100644 src/main/java/org/elasticsearch/index/source/ExternalSourceProviderService.java create mode 100644 src/main/java/org/elasticsearch/index/source/ExternalSourceProviderSettingsRequired.java create mode 100644 src/main/java/org/elasticsearch/index/source/ParsingExternalSourceProvider.java create mode 100644 src/test/java/org/elasticsearch/test/integration/indices/source/CustomSourceMappingIntegrationTests.java create mode 100644 src/test/java/org/elasticsearch/test/integration/indices/source/IdBasedExternalSourceProvider.java create mode 100644 src/test/java/org/elasticsearch/test/integration/indices/source/TransformingExternalSourceProviderParser.java create mode 100644 src/test/java/org/elasticsearch/test/unit/index/mapper/source/CustomSourceMappingTests.java create mode 100644 src/test/java/org/elasticsearch/test/unit/index/mapper/source/TestExternalSourceProvider.java diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 480ca31e06603..f5b966911a539 100644 --- a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetField; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.RoutingFieldMapper; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; @@ -158,6 +159,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< protected void shardOperation(final UpdateRequest request, final ActionListener listener, final int retryCount) throws ElasticSearchException { IndexService indexService = indicesService.indexServiceSafe(request.index()); IndexShard indexShard = indexService.shardSafe(request.shardId()); + final MapperService mapperService = indexService.mapperService(); long getDate = System.currentTimeMillis(); final GetResult getResult = indexShard.getService().get(request.type(), request.id(), @@ -231,7 +233,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< public void onResponse(IndexResponse response) { UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version()); update.matches(response.matches()); - update.getResult(extractGetResult(request, response.version(), updatedSourceAsMap, updateSourceContentType, updateSourceBytes)); + update.getResult(extractGetResult(request, response.version(), updatedSourceAsMap, updateSourceContentType, updateSourceBytes, mapperService)); listener.onResponse(update); } @@ -260,7 +262,7 @@ public void run() { @Override public void onResponse(DeleteResponse response) { UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version()); - update.getResult(extractGetResult(request, response.version(), updatedSourceAsMap, updateSourceContentType, null)); + update.getResult(extractGetResult(request, response.version(), updatedSourceAsMap, updateSourceContentType, null, mapperService)); listener.onResponse(update); } @@ -283,7 +285,7 @@ public void run() { }); } else if ("none".equals(operation)) { UpdateResponse update = new UpdateResponse(getResult.index(), getResult.type(), getResult.id(), getResult.version()); - update.getResult(extractGetResult(request, getResult.version(), updatedSourceAsMap, updateSourceContentType, null)); + update.getResult(extractGetResult(request, getResult.version(), updatedSourceAsMap, updateSourceContentType, null, mapperService)); listener.onResponse(update); } else { logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script); @@ -292,14 +294,14 @@ public void run() { } @Nullable - protected GetResult extractGetResult(final UpdateRequest request, long version, final Map source, XContentType sourceContentType, @Nullable final BytesHolder sourceAsBytes) { + protected GetResult extractGetResult(final UpdateRequest request, long version, final Map source, XContentType sourceContentType, @Nullable final BytesHolder sourceAsBytes, MapperService mapperService) { if (request.fields() == null || request.fields().length == 0) { return null; } boolean sourceRequested = false; Map fields = null; if (request.fields() != null && request.fields().length > 0) { - SourceLookup sourceLookup = new SourceLookup(); + SourceLookup sourceLookup = new SourceLookup(mapperService); sourceLookup.setNextSource(source); for (String field : request.fields()) { if (field.equals("_source")) { diff --git a/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/src/main/java/org/elasticsearch/index/get/ShardGetService.java index ecc58f52ad702..5e322ee2657ed 100644 --- a/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -140,7 +140,7 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real // break between having loaded it from translog (so we only have _source), and having a document to load if (get.docIdAndVersion() != null) { Map fields = null; - byte[] source = null; + BytesHolder source = null; UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); ResetFieldSelector fieldSelector = buildFieldSelectors(docMapper, gFields); if (fieldSelector != null) { @@ -151,7 +151,7 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real } catch (IOException e) { throw new ElasticSearchException("Failed to get type [" + type + "] and id [" + id + "]", e); } - source = extractSource(doc, docMapper); + source = extractSource(type, id, doc, docMapper); for (Object oField : doc.getFields()) { Fieldable field = (Fieldable) oField; @@ -198,6 +198,9 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real SearchScript searchScript = scriptService.search(searchLookup, "mvel", field, null); searchScript.setNextReader(docIdAndVersion.reader); searchScript.setNextDocId(docIdAndVersion.docId); + if(source != null) { + searchLookup.source().setNextSource(source.bytes(), source.offset(), source.length()); + } try { value = searchScript.run(); @@ -214,6 +217,9 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real searchLookup = new SearchLookup(mapperService, indexCache.fieldData()); searchLookup.setNextReader(docIdAndVersion.reader); searchLookup.setNextDocId(docIdAndVersion.docId); + if(source != null) { + searchLookup.source().setNextSource(source.bytes(), source.offset(), source.length()); + } } value = searchLookup.source().extractValue(field); } @@ -233,7 +239,7 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real } } - return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields); + return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), source, fields); } else { Translog.Source source = get.source(); @@ -355,11 +361,11 @@ private static ResetFieldSelector buildFieldSelectors(DocumentMapper docMapper, return fieldSelector; } - private static byte[] extractSource(Document doc, DocumentMapper documentMapper) { - byte[] source = null; + private static BytesHolder extractSource(String type, String id, Document doc, DocumentMapper documentMapper) { + BytesHolder source = null; Fieldable sourceField = doc.getFieldable(documentMapper.sourceMapper().names().indexName()); if (sourceField != null) { - source = documentMapper.sourceMapper().nativeValue(sourceField); + source = documentMapper.sourceMapper().extractSource(type, id, sourceField); doc.removeField(documentMapper.sourceMapper().names().indexName()); } return source; diff --git a/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java b/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java index 4991a15547020..337dba0e191e4 100644 --- a/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java +++ b/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.source.ExternalSourceProviderService; import java.io.IOException; import java.util.Map; @@ -59,16 +60,20 @@ public class DocumentMapperParser extends AbstractIndexComponent { private final Object typeParsersMutex = new Object(); + final ExternalSourceProviderService externalSourceProviderService; + private volatile ImmutableMap typeParsers; private volatile ImmutableMap rootTypeParsers; - public DocumentMapperParser(Index index, AnalysisService analysisService) { - this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, analysisService); + public DocumentMapperParser(Index index, AnalysisService analysisService, ExternalSourceProviderService externalSourceProviderService) { + this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, analysisService, externalSourceProviderService); } - public DocumentMapperParser(Index index, @IndexSettings Settings indexSettings, AnalysisService analysisService) { + public DocumentMapperParser(Index index, @IndexSettings Settings indexSettings, AnalysisService analysisService, + ExternalSourceProviderService externalSourceProviderService) { super(index, indexSettings); this.analysisService = analysisService; + this.externalSourceProviderService = externalSourceProviderService; typeParsers = new MapBuilder() .put(ByteFieldMapper.CONTENT_TYPE, new ByteFieldMapper.TypeParser()) .put(ShortFieldMapper.CONTENT_TYPE, new ShortFieldMapper.TypeParser()) @@ -123,7 +128,7 @@ public void putRootTypeParser(String type, Mapper.TypeParser typeParser) { } public Mapper.TypeParser.ParserContext parserContext() { - return new Mapper.TypeParser.ParserContext(analysisService, typeParsers); + return new Mapper.TypeParser.ParserContext(analysisService, typeParsers, externalSourceProviderService); } public DocumentMapper parse(String source) throws MapperParsingException { @@ -157,7 +162,7 @@ public DocumentMapper parse(@Nullable String type, String source, String default } } - Mapper.TypeParser.ParserContext parserContext = new Mapper.TypeParser.ParserContext(analysisService, typeParsers); + Mapper.TypeParser.ParserContext parserContext = new Mapper.TypeParser.ParserContext(analysisService, typeParsers, externalSourceProviderService); DocumentMapper.Builder docBuilder = doc(index.name(), indexSettings, (RootObjectMapper.Builder) rootObjectTypeParser.parse(type, mapping, parserContext)); diff --git a/src/main/java/org/elasticsearch/index/mapper/Mapper.java b/src/main/java/org/elasticsearch/index/mapper/Mapper.java index e891f30797b46..b8bdec942d466 100644 --- a/src/main/java/org/elasticsearch/index/mapper/Mapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/Mapper.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.index.analysis.AnalysisService; +import org.elasticsearch.index.source.ExternalSourceProviderService; import java.io.IOException; import java.util.Map; @@ -80,9 +81,12 @@ public static class ParserContext { private final ImmutableMap typeParsers; - public ParserContext(AnalysisService analysisService, ImmutableMap typeParsers) { + private final ExternalSourceProviderService externalSourceProviderService; + + public ParserContext(AnalysisService analysisService, ImmutableMap typeParsers, ExternalSourceProviderService externalSourceProviderService) { this.analysisService = analysisService; this.typeParsers = typeParsers; + this.externalSourceProviderService = externalSourceProviderService; } public AnalysisService analysisService() { @@ -92,6 +96,10 @@ public AnalysisService analysisService() { public TypeParser typeParser(String type) { return typeParsers.get(Strings.toUnderscoreCase(type)); } + + public ExternalSourceProviderService sourceProviderService() { + return externalSourceProviderService; + } } Mapper.Builder parse(String name, Map node, ParserContext parserContext) throws MapperParsingException; diff --git a/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 006e126476829..b84dada07975d 100644 --- a/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.index.search.nested.NonNestedDocsFilter; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.source.ExternalSourceProviderService; import org.elasticsearch.indices.InvalidTypeNameException; import org.elasticsearch.indices.TypeMissingException; @@ -97,10 +98,11 @@ public class MapperService extends AbstractIndexComponent implements Iterable { @@ -82,6 +85,8 @@ public static class Builder extends Mapper.Builder { private String[] includes = Defaults.INCLUDES; private String[] excludes = Defaults.EXCLUDES; + + private ExternalSourceProvider externalSourceProvider = Defaults.EXTERNAL_SOURCE_PROVIDER; public Builder() { super(Defaults.NAME); @@ -116,10 +121,15 @@ public Builder excludes(String[] excludes) { this.excludes = excludes; return this; } + + public Builder provider(ExternalSourceProvider externalSourceProvider) { + this.externalSourceProvider = externalSourceProvider; + return this; + } @Override public SourceFieldMapper build(BuilderContext context) { - return new SourceFieldMapper(name, enabled, format, compress, compressThreshold, includes, excludes); + return new SourceFieldMapper(name, enabled, format, compress, compressThreshold, includes, excludes, externalSourceProvider); } } @@ -159,6 +169,13 @@ public Mapper.Builder parse(String name, Map node, ParserContext excludes[i] = values.get(i).toString(); } builder.excludes(excludes); + } else if ("provider".equals(fieldName)) { + String sourceProviderType = nodeStringValue(fieldNode, null); + ExternalSourceProvider externalSourceProvider = parserContext.sourceProviderService().externalSourceProvider(sourceProviderType); + if (externalSourceProvider == null) { + throw new MapperParsingException("Unable to find external source provider with name [" + sourceProviderType + "]"); + } + builder.provider(externalSourceProvider); } } return builder; @@ -179,12 +196,14 @@ public Mapper.Builder parse(String name, Map node, ParserContext private String format; private XContentType formatContentType; + + private ExternalSourceProvider externalSourceProvider; public SourceFieldMapper() { - this(Defaults.NAME, Defaults.ENABLED, Defaults.FORMAT, null, -1, Defaults.INCLUDES, Defaults.EXCLUDES); + this(Defaults.NAME, Defaults.ENABLED, Defaults.FORMAT, null, -1, Defaults.INCLUDES, Defaults.EXCLUDES, Defaults.EXTERNAL_SOURCE_PROVIDER); } - protected SourceFieldMapper(String name, boolean enabled, String format, Boolean compress, long compressThreshold, String[] includes, String[] excludes) { + protected SourceFieldMapper(String name, boolean enabled, String format, Boolean compress, long compressThreshold, String[] includes, String[] excludes, ExternalSourceProvider externalSourceProvider) { super(new Names(name, name, name, name), Defaults.INDEX, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST, Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER); this.enabled = enabled; @@ -194,6 +213,7 @@ protected SourceFieldMapper(String name, boolean enabled, String format, Boolean this.excludes = excludes; this.format = format; this.formatContentType = format == null ? null : XContentType.fromRestContentType(format); + this.externalSourceProvider = externalSourceProvider; } public boolean enabled() { @@ -242,31 +262,66 @@ protected Field parseCreateField(ParseContext context) throws IOException { int dataOffset = context.sourceOffset(); int dataLength = context.sourceLength(); - boolean filtered = includes.length > 0 || excludes.length > 0; - if (filtered) { - // we don't update the context source if we filter, we want to keep it as is... + boolean updateContext = true; - Tuple> mapTuple = XContentHelper.convertToMap(data, dataOffset, dataLength, true); - Map filteredSource = XContentMapValues.filter(mapTuple.v2(), includes, excludes); - CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); - StreamOutput streamOutput; - if (compress != null && compress && (compressThreshold == -1 || dataLength > compressThreshold)) { - streamOutput = cachedEntry.cachedLZFBytes(); + if (externalSourceProvider != null ) { + BytesHolder dehydratedSource = externalSourceProvider.dehydrateSource(context.type(), context.id(), data, dataOffset, dataLength); + if (dehydratedSource == null) { + // Source shouldn't be stored + return null; } else { - streamOutput = cachedEntry.cachedBytes(); + data = dehydratedSource.bytes(); + dataOffset = dehydratedSource.offset(); + dataLength = dehydratedSource.length(); } - XContentType contentType = formatContentType; - if (contentType == null) { - contentType = mapTuple.v1(); + if (dataLength == 0) { + // No more processing is required + return new Field(names().indexName(), data, dataOffset, dataLength); } - XContentBuilder builder = XContentFactory.contentBuilder(contentType, streamOutput).map(filteredSource); - builder.close(); + // Don't update context source + updateContext = false; + } - data = cachedEntry.bytes().copiedByteArray(); - dataOffset = 0; - dataLength = data.length; + boolean filtered = includes.length > 0 || excludes.length > 0; + if (filtered) { + // we don't update the context source if we filter, we want to keep it as is... + if (includes.length > 0 || excludes.length > 0) { + Tuple> mapTuple = XContentHelper.convertToMap(data, dataOffset, dataLength, true); + Map filteredSource = XContentMapValues.filter(mapTuple.v2(), includes, excludes); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + StreamOutput streamOutput; + if (compress != null && compress && (compressThreshold == -1 || dataLength > compressThreshold)) { + streamOutput = cachedEntry.cachedLZFBytes(); + } else { + streamOutput = cachedEntry.cachedBytes(); + } + XContentType contentType = formatContentType; + if (contentType == null) { + contentType = mapTuple.v1(); + } + XContentBuilder builder = XContentFactory.contentBuilder(contentType, streamOutput).map(filteredSource); + builder.close(); - CachedStreamOutput.pushEntry(cachedEntry); + data = cachedEntry.bytes().copiedByteArray(); + dataOffset = 0; + dataLength = data.length; + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } else if (compress != null && compress && (compressThreshold == -1 || dataLength > compressThreshold) && !LZF.isCompressed(data, dataOffset, dataLength)) { + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); + streamOutput.writeBytes(data, dataOffset, dataLength); + streamOutput.flush(); + data = cachedEntry.bytes().copiedByteArray(); + dataOffset = 0; + dataLength = data.length; + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } } else if (compress != null && compress && !LZF.isCompressed(data, dataOffset, dataLength)) { if (compressThreshold == -1 || dataLength > compressThreshold) { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); @@ -287,7 +342,9 @@ protected Field parseCreateField(ParseContext context) throws IOException { dataOffset = 0; dataLength = data.length; // update the data in the context, so it can be compressed and stored compressed outside... - context.source(data, dataOffset, dataLength); + if (updateContext) { + context.source(data, dataOffset, dataLength); + } } finally { CachedStreamOutput.pushEntry(cachedEntry); } @@ -311,7 +368,9 @@ protected Field parseCreateField(ParseContext context) throws IOException { dataOffset = 0; dataLength = data.length; // update the data in the context, so we store it in the translog in this format - context.source(data, dataOffset, dataLength); + if (updateContext) { + context.source(data, dataOffset, dataLength); + } } finally { CachedStreamOutput.pushEntry(cachedEntry); } @@ -330,7 +389,9 @@ protected Field parseCreateField(ParseContext context) throws IOException { dataOffset = 0; dataLength = data.length; // update the data in the context, so we store it in the translog in this format - context.source(data, dataOffset, dataLength); + if (updateContext) { + context.source(data, dataOffset, dataLength); + } } finally { CachedStreamOutput.pushEntry(cachedEntry); } @@ -345,8 +406,13 @@ public byte[] value(Document document) { return field == null ? null : value(field); } - public byte[] nativeValue(Fieldable field) { - return field.getBinaryValue(); + public BytesHolder extractSource(String type, String id, Fieldable fieldable) { + if (externalSourceProvider != null) { + return externalSourceProvider.rehydrateSource(type, id, fieldable.getBinaryValue(), + fieldable.getBinaryOffset(), fieldable.getBinaryLength()); + } else { + return new BytesHolder(fieldable.getBinaryValue(), fieldable.getBinaryOffset(), fieldable.getBinaryLength()); + } } @Override @@ -388,7 +454,7 @@ protected String contentType() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { // all are defaults, no need to write it at all - if (enabled == Defaults.ENABLED && compress == null && compressThreshold == -1 && includes.length == 0 && excludes.length == 0) { + if (enabled == Defaults.ENABLED && compress == null && compressThreshold == -1 && includes.length == 0 && excludes.length == 0 && externalSourceProvider == Defaults.EXTERNAL_SOURCE_PROVIDER) { return builder; } builder.startObject(contentType()); @@ -410,6 +476,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (excludes.length > 0) { builder.field("excludes", excludes); } + if (externalSourceProvider != null) { + builder.field("provider", externalSourceProvider.name()); + } builder.endObject(); return builder; } diff --git a/src/main/java/org/elasticsearch/index/source/ExternalSourceProvider.java b/src/main/java/org/elasticsearch/index/source/ExternalSourceProvider.java new file mode 100644 index 0000000000000..13547e7e819a4 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/source/ExternalSourceProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.source; + +import org.elasticsearch.common.BytesHolder; + +import java.io.IOException; + +/** + * + */ +public interface ExternalSourceProvider { + + /** + * Name of the source provider + * @return name of the source provider + */ + String name(); + + /** + * Dehydrates source + * + * Implementations of this method should remove everything from the source that is + * not necessary to restore the source later. If source can be restored based on type + * and id, this method should return emptyMap(). + * + * @param type record type + * @param id record id + * @param source original source + * @param sourceOffset original source offset + * @param sourceLength original source length + * @return dehydrated source + * @throws IOException exception + */ + BytesHolder dehydrateSource(String type, String id, byte[] source, int sourceOffset, int sourceLength) throws IOException; + + /** + * Rehydrates source + * + * Implementations of this method should restore the source based on type, id and dehydrated + * source. + * + * @param type record type + * @param id record id + * @param source dehydrated source + * @param sourceOffset dehydrated source offset + * @param sourceLength dehydrated source length + * @return rehydrated source + */ + BytesHolder rehydrateSource(String type, String id, byte[] source, int sourceOffset, int sourceLength); + +} diff --git a/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderFactory.java b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderFactory.java new file mode 100644 index 0000000000000..69831e04e7dd1 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.source; + +import org.elasticsearch.common.settings.Settings; + +/** + * + */ +public interface ExternalSourceProviderFactory { + ExternalSourceProvider create(String name, Settings settings); +} diff --git a/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderModule.java b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderModule.java new file mode 100644 index 0000000000000..4854669910d45 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderModule.java @@ -0,0 +1,130 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.source; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Scopes; +import org.elasticsearch.common.inject.assistedinject.FactoryProvider; +import org.elasticsearch.common.inject.multibindings.MapBinder; +import org.elasticsearch.common.settings.NoClassSettingsException; +import org.elasticsearch.common.settings.Settings; + +import java.util.LinkedList; +import java.util.Map; + +/** + * + */ +public class ExternalSourceProviderModule extends AbstractModule { + + public static class ExternalSourceProviderBinderProcessor { + + public void processSourceProviders(ExternalSourceProviderBindings externalSourceProviderBindings) { + + } + + public static class ExternalSourceProviderBindings { + private final Map> sourceProviders = Maps.newHashMap(); + + public ExternalSourceProviderBindings() { + } + + public void processExternalSourceProvider(String name, Class sourceProvider) { + sourceProviders.put(name, sourceProvider); + } + } + } + + private final Settings settings; + + private final LinkedList processorExternals = Lists.newLinkedList(); + + public ExternalSourceProviderModule(Settings settings) { + this.settings = settings; + processorExternals.add(new DefaultProcessorExternal()); + } + + @Override + protected void configure() { + bind(ExternalSourceProviderService.class).asEagerSingleton(); + MapBinder providerBinder + = MapBinder.newMapBinder(binder(), String.class, ExternalSourceProviderFactory.class); + + // initial default bindings + ExternalSourceProviderBinderProcessor.ExternalSourceProviderBindings externalSourceProviderBindings = new ExternalSourceProviderBinderProcessor.ExternalSourceProviderBindings(); + for (ExternalSourceProviderBinderProcessor processorExternal : processorExternals) { + processorExternal.processSourceProviders(externalSourceProviderBindings); + } + + Map providersSettings = settings.getGroups(ExternalSourceProviderService.Defaults.SOURCE_PROVIDER_PREFIX); + for (Map.Entry entry : providersSettings.entrySet()) { + String providerName = entry.getKey(); + Settings providerSettings = entry.getValue(); + Class type = null; + try { + type = providerSettings.getAsClass("type", null, "org.elasticsearch.index.source.", "ExternalSourceProvider"); + } catch (NoClassSettingsException e) { + // nothing found, see if its in bindings as a binding name + if (providerSettings.get("type") != null) { + type = externalSourceProviderBindings.sourceProviders.get(Strings.toUnderscoreCase(providerSettings.get("type"))); + if (type == null) { + type = externalSourceProviderBindings.sourceProviders.get(Strings.toCamelCase(providerSettings.get("type"))); + } + } + } + if (type == null) { + throw new IllegalArgumentException("External Source Provider [" + providerName + "] must be provided with a type"); + } + providerBinder.addBinding(providerName).toProvider(FactoryProvider.newFactory(ExternalSourceProviderFactory.class, type)).in(Scopes.SINGLETON); + } + + // go over the source providers in the bindings and register the ones that are not configured + for (Map.Entry> entry : externalSourceProviderBindings.sourceProviders.entrySet()) { + String providerName = entry.getKey(); + Class type = entry.getValue(); + // we don't want to re-register one that already exists + if (providersSettings.containsKey(providerName)) { + continue; + } + // check, if it requires settings, then don't register it, we know default has no settings... + if (type.getAnnotation(ExternalSourceProviderSettingsRequired.class) != null) { + continue; + } + providerBinder.addBinding(providerName).toProvider(FactoryProvider.newFactory(ExternalSourceProviderFactory.class, type)).in(Scopes.SINGLETON); + } + + } + + public ExternalSourceProviderModule addProcessor(ExternalSourceProviderBinderProcessor processorExternal) { + processorExternals.addFirst(processorExternal); + return this; + } + + private static class DefaultProcessorExternal extends ExternalSourceProviderBinderProcessor { + + @Override public void processSourceProviders(ExternalSourceProviderBindings externalSourceProviderBindings) { + // No built-in providers for now + // externalSourceProviderBindings.processExternalSourceProvider("default", DefaultSourceFilterParser.class); + } + } +} diff --git a/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderService.java b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderService.java new file mode 100644 index 0000000000000..76a62b96cbf70 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderService.java @@ -0,0 +1,77 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.source; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.settings.IndexSettings; + +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; + +/** + * + */ +public class ExternalSourceProviderService extends AbstractIndexComponent { + + public static class Defaults { + public static final String SOURCE_PROVIDER_PREFIX = "index.source.provider"; + } + + private final Map externalSourceProviders; + + @Inject + public ExternalSourceProviderService(Index index, @IndexSettings Settings indexSettings, + @Nullable Map externalSourceProviderFactories) { + super(index, indexSettings); + + Map providerMap = newHashMap(); + if (externalSourceProviderFactories != null) { + Map providersSettings = indexSettings.getGroups(Defaults.SOURCE_PROVIDER_PREFIX); + for (Map.Entry entry : externalSourceProviderFactories.entrySet()) { + String providerName = entry.getKey(); + ExternalSourceProviderFactory externalSourceProviderFactory = entry.getValue(); + + Settings providerSettings = providersSettings.get(providerName); + if (providerSettings == null) { + providerSettings = ImmutableSettings.Builder.EMPTY_SETTINGS; + } + + ExternalSourceProvider provider = externalSourceProviderFactory.create(providerName, providerSettings); + providerMap.put(providerName, provider); + providerMap.put(Strings.toCamelCase(providerName), provider); + } + } + this.externalSourceProviders = ImmutableMap.copyOf(providerMap); + } + + + public ExternalSourceProvider externalSourceProvider(String name) { + return externalSourceProviders.get(name); + } + +} diff --git a/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderSettingsRequired.java b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderSettingsRequired.java new file mode 100644 index 0000000000000..d31515519ae6a --- /dev/null +++ b/src/main/java/org/elasticsearch/index/source/ExternalSourceProviderSettingsRequired.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * + */ +package org.elasticsearch.index.source; + +import java.lang.annotation.*; + +/** + * A marker annotation on {@link ExternalSourceProvider} which will cause the provider to only be created + * when explicit settings are provided. + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface ExternalSourceProviderSettingsRequired { +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/source/ParsingExternalSourceProvider.java b/src/main/java/org/elasticsearch/index/source/ParsingExternalSourceProvider.java new file mode 100644 index 0000000000000..3ddc3ee85ea62 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/source/ParsingExternalSourceProvider.java @@ -0,0 +1,71 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.source; + +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.Map; + +/** + * + */ +public abstract class ParsingExternalSourceProvider implements ExternalSourceProvider { + + private String name; + + protected ParsingExternalSourceProvider(String name) { + this.name = name; + } + + @Override + public String name() { + return name; + } + + @Override + public BytesHolder dehydrateSource(String type, String id, byte[] source, int sourceOffset, int sourceLength) throws IOException { + Tuple> mapTuple = XContentHelper.convertToMap(source, sourceOffset, sourceLength, true); + Map parsedSource = dehydrateSource(type, id, mapTuple.v2()); + if(parsedSource != null) { + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + StreamOutput streamOutput = cachedEntry.cachedBytes(); + XContentBuilder builder = XContentFactory.contentBuilder(mapTuple.v1(), streamOutput).map(parsedSource); + builder.close(); + return new BytesHolder(cachedEntry.bytes().copiedByteArray()); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } else { + return null; + } + } + + protected abstract Map dehydrateSource(String type, String id, Map source) throws IOException; + +} diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 603f74a5e7933..797b7ad5a9c54 100644 --- a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -62,6 +62,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.indices.analysis.IndicesAnalysisService; @@ -280,6 +281,7 @@ public synchronized IndexService createIndex(String sIndexName, Settings setting modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class))); modules.add(new IndexModule(indexSettings)); modules.add(new PercolatorModule()); + modules.add(new ExternalSourceProviderModule(indexSettings)); Injector indexInjector; try { diff --git a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 3caeaf4a53a48..3e9f9b8c14741 100644 --- a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -24,6 +24,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Fieldable; import org.apache.lucene.index.IndexReader; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.document.ResetFieldSelector; @@ -164,7 +165,7 @@ public void execute(SearchContext context) { throw new TypeMissingException(new Index(context.shardTarget().index()), uid.type(), "failed to find type loaded for doc [" + uid.id() + "]"); } - byte[] source = extractSource(doc, documentMapper); + BytesHolder source = extractSource(uid.type(), uid.id(), doc, documentMapper); // get the version @@ -222,7 +223,7 @@ public void execute(SearchContext context) { context.lookup().setNextReader(subReader); context.lookup().setNextDocId(subDoc); if (source != null) { - context.lookup().source().setNextSource(source, 0, source.length); + context.lookup().source().setNextSource(source.bytes(), source.offset(), source.length()); } if (extractFieldNames != null) { for (String extractFieldName : extractFieldNames) { @@ -260,12 +261,12 @@ public void execute(SearchContext context) { context.fetchResult().hits(new InternalSearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore())); } - private byte[] extractSource(Document doc, DocumentMapper documentMapper) { + private BytesHolder extractSource(String type, String id, Document doc, DocumentMapper documentMapper) { Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME); if (sourceField != null) { - return documentMapper.sourceMapper().nativeValue(sourceField); + return documentMapper.sourceMapper().extractSource(type, id, sourceField); } - return null; + return null; } private Uid extractUid(SearchContext context, Document doc) { diff --git a/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java index 489a4b6a9e3e7..0d095a0bacfef 100644 --- a/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java +++ b/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java @@ -89,10 +89,14 @@ private InternalSearchHit() { } public InternalSearchHit(int docId, String id, String type, byte[] source, Map fields) { + this(docId, id, type, source == null ? null : new BytesHolder(source), fields); + } + + public InternalSearchHit(int docId, String id, String type, BytesHolder source, Map fields) { this.docId = docId; this.id = id; this.type = type; - this.source = source == null ? null : new BytesHolder(source); + this.source = source; this.fields = fields; } diff --git a/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java b/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java index 2c07e507f4051..31b8fd8080131 100644 --- a/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java +++ b/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java @@ -40,7 +40,7 @@ public class SearchLookup { public SearchLookup(MapperService mapperService, FieldDataCache fieldDataCache) { docMap = new DocLookup(mapperService, fieldDataCache); - sourceLookup = new SourceLookup(); + sourceLookup = new SourceLookup(mapperService); fieldsLookup = new FieldsLookup(mapperService); asMap = ImmutableMap.of("doc", docMap, "_doc", docMap, "_source", sourceLookup, "_fields", fieldsLookup); } diff --git a/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java b/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java index 54e4637887707..99c7b2ac2db90 100644 --- a/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java +++ b/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java @@ -24,10 +24,15 @@ import org.apache.lucene.document.Fieldable; import org.apache.lucene.index.IndexReader; import org.elasticsearch.ElasticSearchParseException; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; -import org.elasticsearch.index.mapper.internal.SourceFieldSelector; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.mapper.selector.UidAndSourceFieldSelector; import java.util.Collection; import java.util.List; @@ -48,6 +53,12 @@ public class SourceLookup implements Map { private int sourceAsBytesOffset; private int sourceAsBytesLength; private Map source; + + private MapperService mapperService; + + public SourceLookup(MapperService mapperService) { + this.mapperService = mapperService; + } public Map source() { return source; @@ -62,12 +73,15 @@ private Map loadSourceIfNeeded() { return source; } try { - Document doc = reader.document(docId, SourceFieldSelector.INSTANCE); + Document doc = reader.document(docId, new UidAndSourceFieldSelector()); Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME); if (sourceField == null) { source = ImmutableMap.of(); } else { - this.source = sourceAsMap(sourceField.getBinaryValue(), sourceField.getBinaryOffset(), sourceField.getBinaryLength()); + Uid uid = Uid.createUid(doc.get(UidFieldMapper.NAME)); + DocumentMapper documentMapper = mapperService.documentMapper(uid.type()); + BytesHolder extractedSource = documentMapper.sourceMapper().extractSource(uid.type(), uid.id(), sourceField); + this.source = sourceAsMap(extractedSource); } } catch (Exception e) { throw new ElasticSearchParseException("failed to parse / load source", e); @@ -75,6 +89,10 @@ private Map loadSourceIfNeeded() { return this.source; } + public static Map sourceAsMap(BytesHolder bytesHolder) throws ElasticSearchParseException { + return XContentHelper.convertToMap(bytesHolder.bytes(), bytesHolder.offset(), bytesHolder.length(), false).v2(); + } + public static Map sourceAsMap(byte[] bytes, int offset, int length) throws ElasticSearchParseException { return XContentHelper.convertToMap(bytes, offset, length, false).v2(); } diff --git a/src/test/java/org/elasticsearch/test/integration/indices/source/CustomSourceMappingIntegrationTests.java b/src/test/java/org/elasticsearch/test/integration/indices/source/CustomSourceMappingIntegrationTests.java new file mode 100644 index 0000000000000..a504dfdb82f41 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/source/CustomSourceMappingIntegrationTests.java @@ -0,0 +1,345 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.source; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.elasticsearch.test.unit.index.mapper.source.TestExternalSourceProvider; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.elasticsearch.client.Requests.createIndexRequest; +import static org.elasticsearch.client.Requests.deleteIndexRequest; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.queryString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +/** + * + */ +public class CustomSourceMappingIntegrationTests extends AbstractNodesTests { + + + protected Settings indexSettings(int numOfReplicas, int numOfShards) { + return settingsBuilder() + .put("index.number_of_replicas", numOfReplicas) + .put("index.number_of_shards", numOfShards) + .build(); + + } + + protected void deleteIndex(String index) { + try { + client("node1").admin().indices().delete(deleteIndexRequest(index)).actionGet(); + } catch (ElasticSearchException ex) { + // Ignore + } + } + + + protected void createIndex(String index, Settings indexSettings) throws IOException { + + logger.info("Creating index test"); + client("node1").admin().indices().create(createIndexRequest(index) + .settings( + settingsBuilder() + .put(indexSettings) + .put("index.source.provider.test.type", TestExternalSourceProvider.class.getName()) + ) + .mapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_source") + .field("provider", "test") + .endObject() + .endObject().endObject().string())).actionGet(); + } + + @AfterMethod + public void closeNodes() { + closeAllNodes(); + } + + @Test + public void testSearchWithCustomSource() throws Exception { + startNode("node1"); + deleteIndex("test"); + createIndex("test", indexSettings(0, 1)); + + for (int i = 0; i < 10; i++) { + client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource(jsonBuilder().startObject() + .field("value", "test" + i) + .endObject()).execute().actionGet(); + } + + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + + SearchResponse searchResponse = client("node1").prepareSearch().setQuery(matchAllQuery()).execute().actionGet(); + SearchHits hits = searchResponse.hits(); + assertThat(hits.totalHits(), equalTo(10l)); + for (int i = 0; i < 10; i++) { + assertThat(hits.getAt(i).sourceAsString(), equalTo("--type1--" + hits.getAt(i).id() + "--{\"dehydrated\":true}--")); + } + } + + @Test + public void testGetWithCustomSource() throws Exception { + startNode("node1"); + deleteIndex("test"); + createIndex("test", indexSettings(0, 1)); + + for (int i = 0; i < 10; i++) { + client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource(jsonBuilder().startObject() + .field("value", "test" + i) + .endObject()).execute().actionGet(); + } + + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + client("node1").admin().indices().prepareFlush().execute().actionGet(); + + for (int i = 0; i < 10; i++) { + GetResponse getResponse = client("node1").prepareGet("test", "type1", Integer.toString(i)).execute().actionGet(); + assertThat(getResponse.sourceAsString(), equalTo("--type1--" + getResponse.id() + "--{\"dehydrated\":true}--")); + } + } + + @Test + public void testReplication() throws Exception { + startNode("node1"); + startNode("node2"); + deleteIndex("test"); + createIndex("test", indexSettings(1, 4)); + + client("node1").prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject() + .field("value", "test1") + .endObject()).execute().actionGet(); + + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + client("node1").admin().indices().prepareFlush().execute().actionGet(); + for (int i = 0; i < 10; i++) { + GetResponse getResponse = client("node2").prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.sourceAsString(), equalTo("--type1--1--{\"dehydrated\":true}--")); + } + } + + @Test + public void testMappingSettings() throws Exception { + startNode("node1"); + startNode("node2"); + deleteIndex("test"); + createIndex("test", indexSettings(1, 4)); + + ClusterStateResponse clusterStateResponse = client("node2").admin().cluster().prepareState() + .setFilterAll().setFilterIndices("test").setFilterMetaData(false) + .execute().actionGet(); + + MetaData metaData = clusterStateResponse.state().metaData(); + MappingMetaData mappingMetaData = metaData.getIndices().get("test").getMappings().get("type1"); + String mapping = mappingMetaData.source().string(); + assertThat(mapping, containsString("\"provider\":\"test\"")); + + } + + @Test + public void testIdBasedSourceProvider() throws Exception { + startNode("node1"); + try { + client("node1").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + } catch (ElasticSearchException ex) { + // Ignore + } + logger.info("Creating index test"); + client("node1").admin().indices().create(createIndexRequest("test") + .settings( + settingsBuilder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + .put("index.source.provider.idbased.type", IdBasedExternalSourceProvider.class.getName()) + .put("index.source.provider.idbased.source_pattern", "{\"_id\":\"%2$s\", \"_type\":\"%1$s\", " + + "\"body\":\"This record has id %2$s and type %1$s\", \"generated\":true}") + ) + .mapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_source") + .field("provider", "idbased") + .endObject() + .startObject("properties") + + .startObject("body") + .field("type", "string") + .field("store", "no") + .endObject() + + .endObject() + .endObject().endObject().string())).actionGet(); + + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + + for (int i = 0; i < 10; i++) { + client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource(jsonBuilder().startObject() + .field("_id", i) + .field("body", "This record has id " + i + " and type type1") + .endObject()).execute().actionGet(); + } + + // Test get before flush + GetResponse getResponse = client("node1").prepareGet("test", "type1", "1") + .setFields("body", "_source") + .setRealtime(true) + .execute().actionGet(); + assertThat(getResponse.sourceAsString(), containsString("This record has id 1 and type type1")); + assertThat((String) getResponse.field("body").value(), equalTo("This record has id 1 and type type1")); + + client("node1").admin().indices().prepareFlush().execute().actionGet(); + + SearchResponse searchResponse = client("node1").prepareSearch() + .setQuery(queryString("body:\"has id 3\"")) + .addField("generated") + .addField("body") + .addHighlightedField("body", 100, 1) + .setHighlighterPreTags("!--") + .setHighlighterPostTags("--!") + .addScriptField("script_body", "'(' + _source.body + ')'") + .execute().actionGet(); + SearchHits hits = searchResponse.hits(); + // Check that body field was indexed + assertThat(hits.totalHits(), equalTo(1l)); + // Check that generated sources is accessible + assertThat(hits.getAt(0).field("generated").value(), equalTo(true)); + String[] fragments = hits.getAt(0).highlightFields().get("body").fragments(); + assertThat(fragments.length, equalTo(1)); + assertThat(fragments[0], containsString("!--has--! !--id--! !--3--!")); + assertThat(hits.getAt(0).field("script_body").value(), equalTo("(This record has id 3 and type type1)")); + + // Test get after flush + getResponse = client("node1").prepareGet("test", "type1", "2") + .setFields("body", "_source", "generated", "'[' + _source.body + ']'") + .execute().actionGet(); + assertThat(getResponse.sourceAsString(), containsString("This record has id 2 and type type1")); + assertThat((String) getResponse.field("body").value(), equalTo("This record has id 2 and type type1")); + assertThat((String) getResponse.field("'[' + _source.body + ']'").value(), equalTo("[This record has id 2 and type type1]")); + assertThat((Boolean) getResponse.field("generated").value(), equalTo(true)); + } + + @Test + public void testTransformingSourceProvider() throws Exception { + startNode("node1"); + try { + client("node1").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + } catch (ElasticSearchException ex) { + // Ignore + } + logger.info("Creating index test"); + client("node1").admin().indices().create(createIndexRequest("test") + .settings( + settingsBuilder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + .put("index.source.provider.transforming.type", TransformingExternalSourceProviderParser.class.getName()) + ) + .mapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_source") + .field("provider", "transforming") + .endObject() + .startObject("properties") + + .startObject("body") + .field("type", "string") + .field("store", "no") + .endObject() + + .endObject() + .endObject().endObject().string())).actionGet(); + + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + + for (int i = 0; i < 10; i++) { + client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource(jsonBuilder().startObject() + .field("_id", i) + .field("body", "Content of a file file" + i + ".txt") + .field("file", "file" + i + ".txt") + .endObject()).execute().actionGet(); + } + + // Test get before flush + GetResponse getResponse = client("node1").prepareGet("test", "type1", "1") + .setFields("body", "_source") + .setRealtime(true) + .execute().actionGet(); + assertThat(getResponse.sourceAsString(), containsString("Content of a file file1.txt")); + assertThat((String) getResponse.field("body").value(), equalTo("Content of a file file1.txt")); + + client("node1").admin().indices().prepareFlush().execute().actionGet(); + + SearchResponse searchResponse = client("node1").prepareSearch() + .setQuery(queryString("body:\"file file3.txt\"")) + .addField("generated") + .addHighlightedField("body", 100, 1) + .setHighlighterPreTags("!--") + .setHighlighterPostTags("--!") + .addScriptField("script_body", "'(' + _source.body + ')'") + .execute().actionGet(); + SearchHits hits = searchResponse.hits(); + // Check that body field was indexed + assertThat(hits.totalHits(), equalTo(1l)); + // Check that generated sources is accessible + assertThat(hits.getAt(0).field("generated").value(), equalTo(true)); + String[] fragments = hits.getAt(0).highlightFields().get("body").fragments(); + assertThat(fragments.length, equalTo(1)); + assertThat(fragments[0], containsString("!--file--! !--file3--!.!--txt--!")); + assertThat(hits.getAt(0).field("script_body").value(), equalTo("(Content of a file file3.txt)")); + + // Test get after flush + getResponse = client("node1").prepareGet("test", "type1", "2") + .setFields("body", "_source", "generated", "'[' + _source.body + ']'") + .execute().actionGet(); + assertThat(getResponse.sourceAsString(), containsString("Content of a file file2.txt")); + assertThat((String) getResponse.field("body").value(), equalTo("Content of a file file2.txt")); + assertThat((String) getResponse.field("'[' + _source.body + ']'").value(), equalTo("[Content of a file file2.txt]")); + assertThat((Boolean) getResponse.field("generated").value(), equalTo(true)); + + // Test get after flush without requesting source + getResponse = client("node1").prepareGet("test", "type1", "2") + .setFields("body", "generated", "'[' + _source.body + ']'") + .execute().actionGet(); + assertThat((String) getResponse.field("body").value(), equalTo("Content of a file file2.txt")); + assertThat((String) getResponse.field("'[' + _source.body + ']'").value(), equalTo("[Content of a file file2.txt]")); + assertThat((Boolean) getResponse.field("generated").value(), equalTo(true)); + + } +} diff --git a/src/test/java/org/elasticsearch/test/integration/indices/source/IdBasedExternalSourceProvider.java b/src/test/java/org/elasticsearch/test/integration/indices/source/IdBasedExternalSourceProvider.java new file mode 100644 index 0000000000000..715db72f9ac3d --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/source/IdBasedExternalSourceProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.source; + +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.assistedinject.Assisted; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.source.ParsingExternalSourceProvider; + +import java.io.IOException; +import java.util.Map; + +import static java.util.Collections.emptyMap; + +/** + * Emulates a source provider that can rehydrate source based on type and id + */ +public class IdBasedExternalSourceProvider extends ParsingExternalSourceProvider { + + public static class Defaults { + public static String NAME = "idbased"; + public static String SOURCE_PATTERN = "{\"_id\":\"%2$s\", \"_type\":\"%1$s\", \"body\":\"This record has id %2$s and type %1$s\"}"; + } + + private final String sourcePattern; + + @Inject + public IdBasedExternalSourceProvider(@Assisted String name, @Assisted Settings settings) { + super(name); + this.sourcePattern = settings.get("source_pattern", Defaults.SOURCE_PATTERN); + } + + @Override + public Map dehydrateSource(String type, String id, Map source) throws IOException { + return emptyMap(); + } + + @Override + public BytesHolder rehydrateSource(String type, String id, byte[] source, int sourceOffset, int sourceLength) { + byte[] buffer = String.format(sourcePattern, type, id).getBytes(); + return new BytesHolder(buffer); + } + +} diff --git a/src/test/java/org/elasticsearch/test/integration/indices/source/TransformingExternalSourceProviderParser.java b/src/test/java/org/elasticsearch/test/integration/indices/source/TransformingExternalSourceProviderParser.java new file mode 100644 index 0000000000000..c559fc7480d0e --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/source/TransformingExternalSourceProviderParser.java @@ -0,0 +1,96 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.source; + +import com.google.common.collect.Maps; +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.assistedinject.Assisted; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.source.ParsingExternalSourceProvider; + +import java.io.IOException; +import java.util.Map; + +import static java.util.Collections.emptyMap; + +/** + * Emulates source provider that needs a content of a field in order to restore source + */ +public class TransformingExternalSourceProviderParser extends ParsingExternalSourceProvider { + + public static class Defaults { + public static String NAME = "transform"; + public static String SOURCE_PATTERN = "{\"_id\":\"%1$s\", \"_type\":\"%2$s\", \"body\":\"Content of a file %3$s\", " + + "\"file\":\"%3$s\", \"generated\":true}"; + public static String PATH_FIELD = "file"; + } + + private final String sourcePattern; + + private final String pathField; + + + @Inject + public TransformingExternalSourceProviderParser(@Assisted String name, @Assisted Settings settings) { + super(name); + this.sourcePattern = settings.get("source_pattern", Defaults.SOURCE_PATTERN); + this.pathField = settings.get("path_field", Defaults.PATH_FIELD); + } + + @Override + public Map dehydrateSource(String type, String id, Map source) throws IOException { + Object pathFieldObject = source.get(pathField); + if (pathFieldObject != null && pathFieldObject instanceof String) { + // Replace path with just the portion that is needed to restore source in the future + Map dehydratedMap = Maps.newHashMap(); + dehydratedMap.put(pathField, pathFieldObject); + return dehydratedMap; + } else { + // Path field is not found - don't store source at all + return emptyMap(); + } + } + + @Override + public BytesHolder rehydrateSource(String type, String id, byte[] source, int sourceOffset, int sourceLength) { + Tuple> mapTuple = XContentHelper.convertToMap(source, sourceOffset, sourceLength, true); + Map sourceMap = mapTuple.v2(); + Object pathFieldObject = sourceMap.get(pathField); + if (pathFieldObject != null && pathFieldObject instanceof String) { + // Load source from the path + return loadFile(id, type, (String) pathFieldObject); + } else { + // Path field is not found - don't load source + return null; + } + } + + private BytesHolder loadFile(String id, String type, String path) { + // Emulate loading source from the path + byte[] buffer = String.format(sourcePattern, id, type, path).getBytes(); + return new BytesHolder(buffer); + } + +} + diff --git a/src/test/java/org/elasticsearch/test/unit/index/aliases/IndexAliasesServiceTests.java b/src/test/java/org/elasticsearch/test/unit/index/aliases/IndexAliasesServiceTests.java index 0b41403acfbfc..e3e9095f309f6 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/aliases/IndexAliasesServiceTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/aliases/IndexAliasesServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.script.ScriptModule; import org.testng.annotations.Test; @@ -71,6 +72,7 @@ public static IndexQueryParserService newIndexQueryParserService() { new SettingsModule(ImmutableSettings.Builder.EMPTY_SETTINGS), new IndexEngineModule(ImmutableSettings.Builder.EMPTY_SETTINGS), new IndexCacheModule(ImmutableSettings.Builder.EMPTY_SETTINGS), + new ExternalSourceProviderModule(ImmutableSettings.Builder.EMPTY_SETTINGS), new AbstractModule() { @Override protected void configure() { diff --git a/src/test/java/org/elasticsearch/test/unit/index/mapper/MapperTests.java b/src/test/java/org/elasticsearch/test/unit/index/mapper/MapperTests.java index bd642ef69c48f..26bebb21b39b2 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/mapper/MapperTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/mapper/MapperTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.env.Environment; import org.elasticsearch.env.EnvironmentModule; @@ -32,6 +33,8 @@ import org.elasticsearch.index.mapper.DocumentMapperParser; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.settings.IndexSettingsModule; +import org.elasticsearch.index.source.ExternalSourceProviderService; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.analysis.IndicesAnalysisService; @@ -41,20 +44,44 @@ public class MapperTests { public static DocumentMapperParser newParser() { - return new DocumentMapperParser(new Index("test"), newAnalysisService()); + return new DocumentMapperParser(new Index("test"), newAnalysisService(), newSourceProviderService()); + } + + public static DocumentMapperParser newParser(Settings indexSettings) { + return new DocumentMapperParser(new Index("test"), indexSettings, newAnalysisService(indexSettings), newSourceProviderService(indexSettings)); } public static MapperService newMapperService() { - return new MapperService(new Index("test"), ImmutableSettings.Builder.EMPTY_SETTINGS, new Environment(), newAnalysisService()); + return new MapperService(new Index("test"), ImmutableSettings.Builder.EMPTY_SETTINGS, new Environment(), + newAnalysisService(), newSourceProviderService()); } + public static AnalysisService newAnalysisService() { + return newAnalysisService(ImmutableSettings.Builder.EMPTY_SETTINGS); + } + + public static AnalysisService newAnalysisService(Settings indexSettings) { Injector parentInjector = new ModulesBuilder().add(new SettingsModule(ImmutableSettings.Builder.EMPTY_SETTINGS), new EnvironmentModule(new Environment(ImmutableSettings.Builder.EMPTY_SETTINGS)), new IndicesAnalysisModule()).createInjector(); Injector injector = new ModulesBuilder().add( - new IndexSettingsModule(new Index("test"), ImmutableSettings.Builder.EMPTY_SETTINGS), + new IndexSettingsModule(new Index("test"), indexSettings), new IndexNameModule(new Index("test")), - new AnalysisModule(ImmutableSettings.Builder.EMPTY_SETTINGS, parentInjector.getInstance(IndicesAnalysisService.class))).createChildInjector(parentInjector); + new AnalysisModule(indexSettings, parentInjector.getInstance(IndicesAnalysisService.class))).createChildInjector(parentInjector); return injector.getInstance(AnalysisService.class); } + + public static ExternalSourceProviderService newSourceProviderService() { + return newSourceProviderService(ImmutableSettings.Builder.EMPTY_SETTINGS); + } + + public static ExternalSourceProviderService newSourceProviderService(Settings indexSettings) { + Injector parentInjector = new ModulesBuilder().add(new SettingsModule(ImmutableSettings.Builder.EMPTY_SETTINGS), new EnvironmentModule(new Environment(ImmutableSettings.Builder.EMPTY_SETTINGS))).createInjector(); + Injector injector = new ModulesBuilder().add( + new IndexSettingsModule(new Index("test"), indexSettings), + new IndexNameModule(new Index("test")), + new ExternalSourceProviderModule(indexSettings)).createChildInjector(parentInjector); + + return injector.getInstance(ExternalSourceProviderService.class); + } } diff --git a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/CustomSourceMappingTests.java b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/CustomSourceMappingTests.java new file mode 100644 index 0000000000000..5c66d3de18de6 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/CustomSourceMappingTests.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.unit.index.mapper.source; + +import org.apache.lucene.document.Document; +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.test.unit.index.mapper.MapperTests; +import org.testng.annotations.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +/** + * + */ +public class CustomSourceMappingTests { + + private Settings defaultSettings() { + return ImmutableSettings.settingsBuilder().put("index.source.provider.test.type", TestExternalSourceProvider.class.getName()).build(); + } + + @Test + public void testNoFormat() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_source") + .field("provider", "test") + .endObject() + .endObject().endObject().string(); + + DocumentMapper documentMapper = MapperTests.newParser(defaultSettings()).parse(mapping); + ParsedDocument doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject() + .field("field", "value") + .endObject().copiedBytes()); + + assertThat(doc.docs().size(), equalTo(1)); + Document document = doc.docs().get(0); + assertThat(new String(document.getBinaryValue("_source")), equalTo("{\"dehydrated\":true}")); + BytesHolder rehydratedSource = documentMapper.sourceMapper().extractSource("type", "1", document.getFieldable("_source")); + String rehydratedSourceString = new String(rehydratedSource.bytes(), rehydratedSource.offset(), rehydratedSource.length()); + assertThat(rehydratedSourceString, equalTo("--type--1--{\"dehydrated\":true}--")); + } + +} diff --git a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/TestExternalSourceProvider.java b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/TestExternalSourceProvider.java new file mode 100644 index 0000000000000..6a41261c03efe --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/TestExternalSourceProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.unit.index.mapper.source; + +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.source.ExternalSourceProvider; + +import java.io.IOException; +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; + +/** + * + */ +public class TestExternalSourceProvider implements ExternalSourceProvider { + + @Override + public String name() { + return "test"; + } + + @Override + public BytesHolder dehydrateSource(String type, String id, byte[] source, int sourceOffset, int sourceLength) throws IOException { + Map dehydratedSource = newHashMap(); + dehydratedSource.put("dehydrated", true); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + StreamOutput streamOutput = cachedEntry.cachedBytes(); + XContentBuilder builder = XContentFactory.jsonBuilder(streamOutput).map(dehydratedSource); + builder.close(); + return new BytesHolder(cachedEntry.bytes().copiedByteArray()); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } + + @Override public BytesHolder rehydrateSource(String type, String id, byte[] source, int sourceOffset, int sourceLength) { + StringBuilder builder = new StringBuilder(); + builder.append("--"); + builder.append(type); + builder.append("--"); + builder.append(id); + builder.append("--"); + builder.append(new String(source, sourceOffset, sourceLength)); + builder.append("--"); + return new BytesHolder(builder.toString().getBytes()); + } +} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/unit/index/percolator/PercolatorExecutorTests.java b/src/test/java/org/elasticsearch/test/unit/index/percolator/PercolatorExecutorTests.java index d132ed556ce96..c228b85098991 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/percolator/PercolatorExecutorTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/percolator/PercolatorExecutorTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.query.IndexQueryParserModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -76,6 +77,7 @@ public void buildPercolatorService() { new SimilarityModule(settings), new IndexQueryParserModule(settings), new IndexNameModule(index), + new ExternalSourceProviderModule(settings), new AbstractModule() { @Override protected void configure() { diff --git a/src/test/java/org/elasticsearch/test/unit/index/query/SimpleIndexQueryParserTests.java b/src/test/java/org/elasticsearch/test/unit/index/query/SimpleIndexQueryParserTests.java index a12c31e9aee68..d0037ed357355 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/query/SimpleIndexQueryParserTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/query/SimpleIndexQueryParserTests.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.search.geo.InMemoryGeoBoundingBoxFilter; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -94,6 +95,7 @@ public void setupQueryParser() throws IOException { new SimilarityModule(settings), new IndexQueryParserModule(settings), new IndexNameModule(index), + new ExternalSourceProviderModule(settings), new AbstractModule() { @Override protected void configure() { diff --git a/src/test/java/org/elasticsearch/test/unit/index/query/guice/IndexQueryParserModuleTests.java b/src/test/java/org/elasticsearch/test/unit/index/query/guice/IndexQueryParserModuleTests.java index d8ca138e14571..a2f6ab8c2f4fe 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/query/guice/IndexQueryParserModuleTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/query/guice/IndexQueryParserModuleTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -72,6 +73,7 @@ public void testCustomInjection() { new SimilarityModule(settings), new IndexQueryParserModule(settings), new IndexNameModule(index), + new ExternalSourceProviderModule(settings), new AbstractModule() { @Override protected void configure() { diff --git a/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPlugin2Tests.java b/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPlugin2Tests.java index 6f98b2fd16bcc..149a59f34bb39 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPlugin2Tests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPlugin2Tests.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -70,6 +71,7 @@ public void testCustomInjection() { new SimilarityModule(settings), queryParserModule, new IndexNameModule(index), + new ExternalSourceProviderModule(settings), new AbstractModule() { @Override protected void configure() { diff --git a/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPluginTests.java b/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPluginTests.java index 94621cc81e1d0..3a44533de5cea 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPluginTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/query/plugin/IndexQueryParserPluginTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.source.ExternalSourceProviderModule; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.threadpool.ThreadPoolModule; @@ -79,6 +80,7 @@ public void processXContentFilterParsers(XContentFilterParsersBindings bindings) new SimilarityModule(settings), queryParserModule, new IndexNameModule(index), + new ExternalSourceProviderModule(settings), new AbstractModule() { @Override protected void configure() {