Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify write failure handling #19105

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1b1f484
Distinguish primary and replica request types in TransportWriteAction
areek Oct 13, 2016
415fdee
Distinguish environment failures from transient operation failures fo…
areek Oct 14, 2016
71dc417
Handle transient write failure in transport replication action
areek Oct 14, 2016
1bdeada
Generify index shard method to execute engine write operation
areek Oct 14, 2016
26f5118
remove declaring unchecked exception due to engine write operations
areek Oct 15, 2016
e195f7d
Documentation for handling engine write failures
areek Oct 15, 2016
63c0728
Simplify TransportWriteAction request handling
areek Oct 21, 2016
dac9856
Merge branch 'master' into enhancement/replicate_primary_write_failures
areek Oct 21, 2016
4396348
fix internal engine unit tests
areek Oct 21, 2016
7c11a2b
cleanup and improve documentation for TWA
areek Oct 21, 2016
1587a77
Revert "Generify index shard method to execute engine write operation"
areek Oct 25, 2016
1aee578
add operation result as a parameter to postIndex/delete in indexing o…
areek Oct 25, 2016
168946a
Improve documentation for handling write operation failure
areek Oct 25, 2016
bb78548
cleanup indexing operation listener
areek Oct 25, 2016
64a897e
add setters for translog location and took in engine operation result
areek Oct 25, 2016
1ad1e27
fix wildcard import
areek Oct 25, 2016
7a6f56a
fix tests
areek Oct 25, 2016
c237263
fix computing took for write operation result
areek Oct 25, 2016
65832b9
Revert "cleanup indexing operation listener"
areek Oct 26, 2016
a3fcfe8
add constructor overloads for primary result
areek Oct 26, 2016
fa3ee6b
Incorporate feedback
areek Oct 26, 2016
7fb44a3
add tests
areek Oct 27, 2016
947a17e
cleanup operation listener handling of failure in results
areek Oct 27, 2016
2f883fc
Rethrow original exception when it fails the engine during write oper…
areek Oct 27, 2016
eafd3df
Merge branch 'master' into enhancement/replicate_primary_write_failures
areek Oct 31, 2016
02ecff1
incorporate feedback
areek Nov 1, 2016
603d506
Merge branch 'master' into enhancement/replicate_primary_write_failures
areek Nov 1, 2016
cf3e2d1
documentation and minor fixes for engine level index/delete operations
areek Nov 1, 2016
ee0b273
add back index and delete engine failure exceptions as deprecated for…
areek Nov 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -487,8 +486,7 @@ enum ElasticsearchExceptionHandle {
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26),
SNAPSHOT_CREATION_EXCEPTION(org.elasticsearch.snapshots.SnapshotCreationException.class,
org.elasticsearch.snapshots.SnapshotCreationException::new, 27),
DELETE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.DeleteFailedEngineException.class,
org.elasticsearch.index.engine.DeleteFailedEngineException::new, 28),
// 28 was DeleteFailedEngineException
DOCUMENT_MISSING_EXCEPTION(org.elasticsearch.index.engine.DocumentMissingException.class,
Copy link
Contributor

@bleskes bleskes Sep 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you leave a comment as to where ordinal 28 went?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment for documentation.

org.elasticsearch.index.engine.DocumentMissingException::new, 29),
SNAPSHOT_EXCEPTION(org.elasticsearch.snapshots.SnapshotException.class,
Expand Down Expand Up @@ -581,8 +579,7 @@ enum ElasticsearchExceptionHandle {
org.elasticsearch.action.TimestampParsingException::new, 78),
ROUTING_MISSING_EXCEPTION(org.elasticsearch.action.RoutingMissingException.class,
org.elasticsearch.action.RoutingMissingException::new, 79),
INDEX_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.IndexFailedEngineException.class,
org.elasticsearch.index.engine.IndexFailedEngineException::new, 80),
// 80 used to be for IndexFailedEngineException, removed in 6.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a bwc here no? or is it somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes we need bwc here if we care about rolling restarts from 5.x. I am not sure how to add bwc for this (i.e. might be missing something). We could simply add the exceptions back (but not use them anywhere) so they can be serialized/deserialized when a 6.0 node is acting as a coordinating node?

Another question: The index/delete operation failures were communicated as exceptions, so do we even need a bwc for these failures for serialization/deserialization or can we just rely on generic exception serialization/deserialization like we currently do for persistent engine failures during index/delete operations?

INDEX_SHARD_RESTORE_FAILED_EXCEPTION(org.elasticsearch.index.snapshots.IndexShardRestoreFailedException.class,
org.elasticsearch.index.snapshots.IndexShardRestoreFailedException::new, 81),
REPOSITORY_EXCEPTION(org.elasticsearch.repositories.RepositoryException.class,
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
Expand All @@ -49,7 +48,7 @@
/**
* Performs the delete operation.
*/
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteResponse> {
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest,DeleteResponse> {

private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
Expand All @@ -61,7 +60,7 @@ public TransportDeleteAction(Settings settings, TransportService transportServic
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, DeleteRequest::new, ThreadPool.Names.INDEX);
indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
}
Expand All @@ -70,7 +69,11 @@ public TransportDeleteAction(Settings settings, TransportService transportServic
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
CreateIndexRequest createIndexRequest = new CreateIndexRequest()
.index(request.index())
.cause("auto(delete api)")
.masterNodeTimeout(request.timeout());
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(task, request, listener);
Expand Down Expand Up @@ -119,30 +122,33 @@ protected DeleteResponse newResponseInstance() {
}

@Override
protected WriteResult<DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard indexShard) {
return executeDeleteRequestOnPrimary(request, indexShard);
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response = result.hasFailure() ? null :
new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
}

@Override
protected Location onReplicaShard(DeleteRequest request, IndexShard indexShard) {
return executeDeleteRequestOnReplica(request, indexShard).getTranslogLocation();
protected WriteReplicaResult shardOperationOnReplica(DeleteRequest request, IndexShard replica) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnReplica(request, replica);
return new WriteReplicaResult(request, result.getTranslogLocation(), result.getFailure(), replica);
}

public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());

assert request.versionType().validateVersionForWrites(request.version());
DeleteResponse response = new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found());
return new WriteResult<>(response, delete.getTranslogLocation());
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
Engine.DeleteResult result = primary.delete(delete);
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't wait for this to go away :)

request.version(result.getVersion());
assert request.versionType().validateVersionForWrites(request.version());
}
return result;
}

public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
indexShard.delete(delete);
return delete;
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
return replica.delete(delete);
}
}
Expand Up @@ -39,11 +39,11 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
Expand All @@ -60,7 +60,7 @@
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*/
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexResponse> {
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {

private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;
Expand All @@ -76,7 +76,7 @@ public TransportIndexAction(Settings settings, TransportService transportService
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, ThreadPool.Names.INDEX);
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
this.mappingUpdatedAction = mappingUpdatedAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
Expand Down Expand Up @@ -140,65 +140,88 @@ protected IndexResponse newResponseInstance() {
}

@Override
protected WriteResult<IndexResponse> onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception {
return executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction);
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response = indexResult.hasFailure() ? null :
new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
indexResult.isCreated());
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
}

@Override
protected Location onReplicaShard(IndexRequest request, IndexShard indexShard) {
return executeIndexRequestOnReplica(request, indexShard).getTranslogLocation();
protected WriteReplicaResult shardOperationOnReplica(IndexRequest request, IndexShard replica) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnReplica(request, replica);
return new WriteReplicaResult(request, indexResult.getTranslogLocation(), indexResult.getFailure(), replica);
}

/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());

final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
final Engine.Index operation;
try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, request.version());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(operation);
return operation;
return replica.index(operation);
}

/** Utility method to prepare an index operation on primary shards */
public static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}

public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question regarding IAE?

return new Engine.IndexResult(e, request.version());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId();
final ShardId shardId = primary.shardId();
if (update != null) {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
operation = prepareIndexOperationOnPrimary(request, indexShard);
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
try {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version());
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version());
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
indexShard.index(operation);

// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());

assert request.versionType().validateVersionForWrites(request.version());

IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
return new WriteResult<>(response, operation.getTranslogLocation());
Engine.IndexResult result = primary.index(operation);
if (result.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = result.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
}
return result;
}
}

Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.VersionConflictEngineException;
Expand Down Expand Up @@ -112,21 +113,23 @@ public void execute() throws Exception {
pendingActions.incrementAndGet();
primaryResult = primary.perform(request);
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
if (replicaRequest != null) {
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}

// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
ClusterState clusterState = clusterStateSupplier.get();
final List<ShardRouting> shards = getShards(primaryId, clusterState);
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
ClusterState clusterState = clusterStateSupplier.get();
final List<ShardRouting> shards = getShards(primaryId, clusterState);
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);

markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);

performOnReplicas(replicaRequest, shards);
performOnReplicas(replicaRequest, shards);
}

successfulShards.incrementAndGet();
decPendingAndFinishIfNeeded();
Expand Down Expand Up @@ -419,7 +422,11 @@ public RetryOnPrimaryException(StreamInput in) throws IOException {

public interface PrimaryResult<R extends ReplicationRequest<R>> {

R replicaRequest();
/**
* @return null if no operation needs to be sent to a replica
* (for example when the operation failed on the primary due to a parsing exception)
*/
@Nullable R replicaRequest();

void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
}
Expand Down