Skip to content

Commit

Permalink
Internal: Wait for required mappings to be available on the replica b…
Browse files Browse the repository at this point in the history
…efore indexing.

Due to timing issues, mappings that are required to index a document might not
be available on the replica at indexing time. In that case the replica starts
listening to cluster state changes and re-parses the document until no dynamic
mappings updates are generated.
  • Loading branch information
jpountz committed Apr 24, 2015
1 parent c4f7385 commit 3e5b8a2
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 85 deletions.
Expand Up @@ -23,15 +23,18 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collections;

/**
* Base class for write action responses.
Expand Down Expand Up @@ -156,6 +159,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public String toString() {
return Strings.toString(this);
}

public static ShardInfo readShardInfo(StreamInput in) throws IOException {
ShardInfo shardInfo = new ShardInfo();
shardInfo.readFrom(in);
Expand Down
Expand Up @@ -291,6 +291,8 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Clu

}
}
} else {
throw new ElasticsearchIllegalStateException("Unexpected index operation: " + item.request());
}

assert item.getPrimaryResponse() != null;
Expand Down Expand Up @@ -532,7 +534,7 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe


@Override
protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) throws Exception {
protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
for (int i = 0; i < request.items().length; i++) {
Expand All @@ -548,28 +550,18 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request

if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
}
Expand All @@ -592,6 +584,8 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request
throw e;
}
}
} else {
throw new ElasticsearchIllegalStateException("Unexpected index operation: " + item.request());
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/elasticsearch/action/index/IndexResponse.java
Expand Up @@ -105,4 +105,17 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeBoolean(created);
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("IndexResponse[");
builder.append("index=").append(index);
builder.append(",type=").append(type);
builder.append(",id=").append(id);
builder.append(",version=").append(version);
builder.append(",created=").append(created);
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}
}
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.index;

import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
Expand Down Expand Up @@ -54,8 +53,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the index operation.
* <p/>
Expand All @@ -73,6 +70,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
private final TransportCreateIndexAction createIndexAction;
private final MappingUpdatedAction mappingUpdatedAction;

private final ClusterService clusterService;

@Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
Expand All @@ -83,6 +82,7 @@ public TransportIndexAction(Settings settings, TransportService transportService
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = new AutoCreateIndex(settings);
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -201,6 +201,7 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat
version = index.version();
created = index.created();
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
Expand Down Expand Up @@ -244,34 +245,24 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat
}

@Override
protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) throws IOException {
protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
}
Expand Down
Expand Up @@ -21,10 +21,11 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
Expand All @@ -35,11 +36,13 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -48,12 +51,21 @@
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -112,7 +124,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;

protected abstract void shardOperationOnReplica(ShardId shardId, ReplicaRequest shardRequest) throws Exception;
protected abstract void shardOperationOnReplica(ShardId shardId, ReplicaRequest shardRequest);

protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;

Expand Down Expand Up @@ -203,12 +215,77 @@ public void onFailure(Throwable e) {
class ReplicaOperationTransportHandler implements TransportRequestHandler<ReplicaRequest> {
@Override
public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception {
new AsyncReplicaAction(request, channel).run();
}
}

protected static class RetryOnReplicaException extends IndexShardException {

public RetryOnReplicaException(ShardId shardId, String msg) {
super(shardId, msg);
}

public RetryOnReplicaException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
}

private final class AsyncReplicaAction extends AbstractRunnable {
private final ReplicaRequest request;
private final TransportChannel channel;
// important: we pass null as a timeout as failing a replica is
// something we want to avoid at all costs
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);


AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) {
this.request = request;
this.channel = channel;
}

@Override
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica", t);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
threadPool.executor(executor).execute(AsyncReplicaAction.this);
}

@Override
public void onClusterServiceClose() {
responseWithFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
throw new AssertionError("Cannot happen: there is not timeout");
}
});
} else {
try {
failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t);
} catch (Throwable unexpected) {
logger.error("{} unexpected error while failing replica", request.internalShardId.id(), unexpected);
} finally {
responseWithFailure(t);
}
}
}

protected void responseWithFailure(Throwable t) {
try {
shardOperationOnReplica(request.internalShardId, request);
} catch (Throwable t) {
failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t);
throw t;
channel.sendResponse(t);
} catch (IOException responseException) {
logger.warn("failed to send error message back to client for action [" + transportReplicaAction + "]", responseException);
logger.warn("actual Exception", t);
}
}

@Override
protected void doRun() throws Exception {
shardOperationOnReplica(request.internalShardId, request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/elasticsearch/cluster/ClusterService.java
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -95,8 +96,10 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* Adds a cluster state listener that will timeout after the provided timeout,
* and is executed after the clusterstate has been successfully applied ie. is
* in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED}
* NOTE: a {@code null} timeout means that the listener will never be removed
* automatically
*/
void add(TimeValue timeout, TimeoutClusterStateListener listener);
void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener);

/**
* Submits a task that will update the cluster state.
Expand Down

0 comments on commit 3e5b8a2

Please sign in to comment.