Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Index creation waits for write consistency shards #18759

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -30,22 +30,31 @@
*/
public class CreateIndexResponse extends AcknowledgedResponse {

private boolean writeConsistencyShardsAvailable;

protected CreateIndexResponse() {
}

protected CreateIndexResponse(boolean acknowledged) {
protected CreateIndexResponse(boolean acknowledged, boolean writeConsistencyShardsAvailable) {
super(acknowledged);
this.writeConsistencyShardsAvailable = writeConsistencyShardsAvailable;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
writeConsistencyShardsAvailable = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeBoolean(writeConsistencyShardsAvailable);
}

public boolean isWriteConsistencyShardsAvailable() {
return writeConsistencyShardsAvailable;
}
}
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexCreationResponseWaiter;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -41,13 +42,22 @@
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetaDataCreateIndexService createIndexService;
private final IndexCreationResponseWaiter<CreateIndexResponse> responseWaiter;

@Inject
public TransportCreateIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, CreateIndexRequest::new);
super(settings, CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, CreateIndexRequest::new);
this.createIndexService = createIndexService;
this.responseWaiter =
new IndexCreationResponseWaiter<CreateIndexResponse>(settings, clusterService, threadPool.getThreadContext()) {
@Override
protected CreateIndexResponse newResponse(boolean acknowledged, boolean writeConsistencyShardsAvailable) {
return new CreateIndexResponse(acknowledged, writeConsistencyShardsAvailable);
}
};
}

@Override
Expand All @@ -67,14 +77,17 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt
}

@Override
protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) {
protected void masterOperation(final CreateIndexRequest request,
final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
}

final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes())
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
Expand All @@ -83,7 +96,10 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
// the cluster state that includes the newly created index has been acknowledged,
// now wait for the write consistency number of shards to be allocated before returning,
// as that is when indexing operations can take place on the newly created index
responseWaiter.waitOnShards(indexName, response, listener, request.timeout());
}

@Override
Expand Down
Expand Up @@ -25,7 +25,7 @@ public final class ShrinkResponse extends CreateIndexResponse {
ShrinkResponse() {
}

ShrinkResponse(boolean acknowledged) {
super(acknowledged);
ShrinkResponse(boolean acknowledged, boolean writeConsistencyShardsAvailable) {
super(acknowledged, writeConsistencyShardsAvailable);
}
}
Expand Up @@ -31,38 +31,26 @@
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexCreationResponseWaiter;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

/**
* Main class to initiate shrinking an index into a new index with a single shard
*/
public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkRequest, ShrinkResponse> {

private final MetaDataCreateIndexService createIndexService;
private final Client client;
private final IndexCreationResponseWaiter<ShrinkResponse> responseWaiter;

@Inject
public TransportShrinkAction(Settings settings, TransportService transportService, ClusterService clusterService,
Expand All @@ -72,6 +60,12 @@ public TransportShrinkAction(Settings settings, TransportService transportServic
ShrinkRequest::new);
this.createIndexService = createIndexService;
this.client = client;
this.responseWaiter = new IndexCreationResponseWaiter<ShrinkResponse>(settings, clusterService, threadPool.getThreadContext()) {
@Override
protected ShrinkResponse newResponse(boolean acknowledged, boolean writeConsistencyShardsAvailable) {
return new ShrinkResponse(acknowledged, writeConsistencyShardsAvailable);
}
};
}

@Override
Expand Down Expand Up @@ -102,7 +96,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new ShrinkResponse(response.isAcknowledged()));
// wait for necessary shards to become available before returning the response
responseWaiter.waitOnShards(updateRequest.index(), response, listener, shrinkRequest.timeout());
}

@Override
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -42,13 +43,21 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ReplicationOperation<Request extends ReplicationRequest<Request>, ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse> {

public static final Setting<WriteConsistencyLevel> WRITE_CONSISTENCY_LEVEL_SETTING =
new Setting<>("action.write_consistency",
WriteConsistencyLevel.QUORUM.name().toLowerCase(Locale.ROOT),
WriteConsistencyLevel::fromString,
Setting.Property.NodeScope);

final private ESLogger logger;
final private Request request;
final private Supplier<ClusterState> clusterStateSupplier;
Expand Down Expand Up @@ -183,38 +192,50 @@ String checkWriteConsistency() {
final ShardId shardId = primary.routingEntry().shardId();
final ClusterState state = clusterStateSupplier.get();
final WriteConsistencyLevel consistencyLevel = request.consistencyLevel();
final IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName());
if (indexRoutingTable == null) {
logger.trace("[{}] not enough active copies to meet write consistency of [{}] (index [{}] not founded in cluster state), " +
"scheduling a retry. op [{}], request [{}]", shardId, consistencyLevel, shardId.getIndexName(), opType, request);
return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (index [" +
shardId.getIndexName() + "] not found in cluster state).";
}
final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
Optional<Tuple<Integer, Integer>> failureCheck = checkWriteConsistency(consistencyLevel, shardRoutingTable);
if (failureCheck.isPresent()) {
final int sizeActive = failureCheck.get().v1();
final int requiredNumber = failureCheck.get().v2();
logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." +
" op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request);
return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive +
", needed " + requiredNumber + ").";
} else {
return null;
}
}

public static Optional<Tuple<Integer, Integer>>
checkWriteConsistency(final WriteConsistencyLevel consistencyLevel, final IndexShardRoutingTable shardRoutingTable) {
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica,
// quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardRoutingTable.getSize();
} else {
requiredNumber = 1;
}
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica,
// quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardRoutingTable.getSize();
} else {
sizeActive = 0;
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}

if (sizeActive < requiredNumber) {
logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." +
" op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request);
return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed "
+ requiredNumber + ").";
return Optional.of(Tuple.tuple(sizeActive, requiredNumber));
} else {
return null;
return Optional.empty();
}
}

Expand Down
Expand Up @@ -73,6 +73,8 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.action.support.replication.ReplicationOperation.WRITE_CONSISTENCY_LEVEL_SETTING;

/**
* Base class for requests that should be executed on a primary copy followed by replica copies.
* Subclasses can resolve the target shard and provide implementation for primary and replica operations.
Expand Down Expand Up @@ -119,7 +121,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans

this.transportOptions = transportOptions();

this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
this.defaultWriteConsistencyLevel = WRITE_CONSISTENCY_LEVEL_SETTING.get(settings);

this.replicasProxy = new ReplicasProxy();
}
Expand Down