Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify dynamic mappings updates. #10720

Merged
merged 1 commit into from Apr 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -352,23 +353,6 @@ <T extends ActionWriteResponse> T response() {

}

private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
// HACK: Rivers seem to have something specific that triggers potential
// deadlocks when doing concurrent indexing. So for now they keep the
// old behaviour of updating mappings locally first and then
// asynchronously notifying the master
// this can go away when rivers are removed
final String indexName = indexService.index().name();
final String indexUUID = indexService.indexUUID();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
}
}

private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable {

Expand All @@ -392,20 +376,54 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, indexRequest.type(), index.parsedDoc().dynamicMappingsUpdate());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
indexShard.index(index);
version = index.version();
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, indexRequest.type(), create.parsedDoc().dynamicMappingsUpdate());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
indexShard.create(create);
version = create.version();
op = create;
created = true;
Expand Down Expand Up @@ -528,8 +546,9 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe


@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
Expand All @@ -544,11 +563,29 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {

if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.create(create);
}
} catch (Throwable e) {
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.index;

import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
Expand All @@ -42,6 +43,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -51,6 +53,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the index operation.
* <p/>
Expand Down Expand Up @@ -167,23 +171,6 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques
.indexShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
}

private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
// HACK: Rivers seem to have something specific that triggers potential
// deadlocks when doing concurrent indexing. So for now they keep the
// old behaviour of updating mappings locally first and then
// asynchronously notifying the master
// this can go away when rivers are removed
final String indexName = indexService.index().name();
final String indexUUID = indexService.indexUUID();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
}
}

@Override
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
final IndexRequest request = shardRequest.request;
Expand All @@ -206,19 +193,53 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat

if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, request.type(), index.parsedDoc().dynamicMappingsUpdate());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
indexShard.index(index);
version = index.version();
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, request.type(), create.parsedDoc().dynamicMappingsUpdate());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
indexShard.create(create);
version = create.version();
created = true;
}
Expand All @@ -239,17 +260,36 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat
}

@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.create(create);
}
if (request.refresh()) {
Expand Down
Expand Up @@ -117,7 +117,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;

protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception;

protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;

Expand Down