Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert upgrade action to broadcast by node #13205

Merged
merged 1 commit into from
Aug 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -68,7 +68,7 @@ protected EmptyResult readShardResult(StreamInput in) throws IOException {
}

@Override
protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures) {
protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new ClearIndicesCacheResponse(totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Expand Up @@ -62,7 +62,7 @@ protected EmptyResult readShardResult(StreamInput in) throws IOException {
}

@Override
protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures) {
protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new OptimizeResponse(totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Expand Up @@ -69,7 +69,7 @@ protected RecoveryState readShardResult(StreamInput in) throws IOException {


@Override
protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List<RecoveryState> responses, List<ShardOperationFailedException> shardFailures) {
protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List<RecoveryState> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
Map<String, List<RecoveryState>> shardResponses = Maps.newHashMap();
for (RecoveryState recoveryState : responses) {
if (recoveryState == null) {
Expand Down
Expand Up @@ -80,7 +80,7 @@ protected ShardSegments readShardResult(StreamInput in) throws IOException {
}

@Override
protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardSegments> results, List<ShardOperationFailedException> shardFailures) {
protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardSegments> results, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ protected ShardStats readShardResult(StreamInput in) throws IOException {
}

@Override
protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardStats> responses, List<ShardOperationFailedException> shardFailures) {
protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardStats> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new IndicesStatsResponse(responses.toArray(new ShardStats[responses.size()]), totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Expand Up @@ -82,7 +82,7 @@ protected ShardUpgradeStatus readShardResult(StreamInput in) throws IOException
}

@Override
protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List<ShardUpgradeStatus> responses, List<ShardOperationFailedException> shardFailures) {
protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List<ShardUpgradeStatus> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new UpgradeStatusResponse(responses.toArray(new ShardUpgradeStatus[responses.size()]), totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Expand Up @@ -20,9 +20,9 @@
package org.elasticsearch.action.admin.indices.upgrade.post;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand All @@ -31,7 +31,9 @@
/**
*
*/
class ShardUpgradeResponse extends BroadcastShardResponse {
class ShardUpgradeResult implements Streamable {

private ShardId shardId;

private org.apache.lucene.util.Version oldestLuceneSegment;

Expand All @@ -40,16 +42,20 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
private boolean primary;


ShardUpgradeResponse() {
ShardUpgradeResult() {
}

ShardUpgradeResponse(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
super(shardId);
ShardUpgradeResult(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
this.shardId = shardId;
this.primary = primary;
this.upgradeVersion = upgradeVersion;
this.oldestLuceneSegment = oldestLuceneSegment;
}

public ShardId getShardId() {
return shardId;
}

public org.apache.lucene.util.Version oldestLuceneSegment() {
return this.oldestLuceneSegment;
}
Expand All @@ -65,7 +71,7 @@ public boolean primary() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
primary = in.readBoolean();
upgradeVersion = Version.readVersion(in);
try {
Expand All @@ -78,10 +84,9 @@ public void readFrom(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeBoolean(primary);
Version.writeVersion(upgradeVersion, out);
out.writeString(oldestLuceneSegment.toString());
}

}
Expand Up @@ -24,40 +24,38 @@
import org.elasticsearch.action.PrimaryMissingActionException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;

import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;

/**
* Upgrade index/indices action.
*/
public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequest, UpgradeResponse, ShardUpgradeRequest, ShardUpgradeResponse> {
public class TransportUpgradeAction extends TransportBroadcastByNodeAction<UpgradeRequest, UpgradeResponse, ShardUpgradeResult> {

private final IndicesService indicesService;

Expand All @@ -67,56 +65,40 @@ public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequ
public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TransportUpgradeSettingsAction upgradeSettingsAction) {
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
UpgradeRequest.class, ShardUpgradeRequest.class, ThreadPool.Names.OPTIMIZE);
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpgradeRequest.class, ThreadPool.Names.OPTIMIZE);
this.indicesService = indicesService;
this.upgradeSettingsAction = upgradeSettingsAction;
}

@Override
protected UpgradeResponse newResponse(UpgradeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, int successfulShards, int failedShards, List<ShardUpgradeResult> shardUpgradeResults, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
Map<String, Integer> successfulPrimaryShards = newHashMap();
Map<String, Tuple<Version, org.apache.lucene.util.Version>> versions = newHashMap();
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// a non active shard, ignore...
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
for (ShardUpgradeResult result : shardUpgradeResults) {
successfulShards++;
String index = result.getShardId().getIndex();
if (result.primary()) {
Integer count = successfulPrimaryShards.get(index);
successfulPrimaryShards.put(index, count == null ? 1 : count + 1);
}
Tuple<Version, org.apache.lucene.util.Version> versionTuple = versions.get(index);
if (versionTuple == null) {
versions.put(index, new Tuple<>(result.upgradeVersion(), result.oldestLuceneSegment()));
} else {
successfulShards++;
ShardUpgradeResponse shardUpgradeResponse = (ShardUpgradeResponse) shardResponse;
String index = shardUpgradeResponse.getIndex();
if (shardUpgradeResponse.primary()) {
Integer count = successfulPrimaryShards.get(index);
successfulPrimaryShards.put(index, count == null ? 1 : count + 1);
// We already have versions for this index - let's see if we need to update them based on the current shard
Version version = versionTuple.v1();
org.apache.lucene.util.Version luceneVersion = versionTuple.v2();
// For the metadata we are interested in the _latest_ Elasticsearch version that was processing the metadata
// Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version
if (result.upgradeVersion().after(versionTuple.v1())) {
version = result.upgradeVersion();
}
Tuple<Version, org.apache.lucene.util.Version> versionTuple = versions.get(index);
if (versionTuple == null) {
versions.put(index, new Tuple<>(shardUpgradeResponse.upgradeVersion(), shardUpgradeResponse.oldestLuceneSegment()));
} else {
// We already have versions for this index - let's see if we need to update them based on the current shard
Version version = versionTuple.v1();
org.apache.lucene.util.Version luceneVersion = versionTuple.v2();
// For the metadata we are interested in the _latest_ elasticsearch version that was processing the metadata
// Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version
if (shardUpgradeResponse.upgradeVersion().after(versionTuple.v1())) {
version = shardUpgradeResponse.upgradeVersion();
}
// For the lucene version we are interested in the _oldest_ lucene version since it determines the
// oldest version that we need to support
if (shardUpgradeResponse.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) {
luceneVersion = shardUpgradeResponse.oldestLuceneSegment();
}
versions.put(index, new Tuple<>(version, luceneVersion));
// For the lucene version we are interested in the _oldest_ lucene version since it determines the
// oldest version that we need to support
if (result.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) {
luceneVersion = result.oldestLuceneSegment();
}
versions.put(index, new Tuple<>(version, luceneVersion));
}
}
Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = newHashMap();
Expand All @@ -133,33 +115,37 @@ protected UpgradeResponse newResponse(UpgradeRequest request, AtomicReferenceArr
}
}

return new UpgradeResponse(updatedVersions, shardsResponses.length(), successfulShards, failedShards, shardFailures);
return new UpgradeResponse(updatedVersions, totalShards, successfulShards, failedShards, shardFailures);
}

@Override
protected ShardUpgradeRequest newShardRequest(int numShards, ShardRouting shard, UpgradeRequest request) {
return new ShardUpgradeRequest(shard.shardId(), request);
protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request);
// We are using the current version of Elasticsearch as upgrade version since we update mapping to match the current version
return new ShardUpgradeResult(shardRouting.shardId(), indexShard.routingEntry().primary(), Version.CURRENT, oldestLuceneSegment);
}

@Override
protected ShardUpgradeResponse newShardResponse() {
return new ShardUpgradeResponse();
protected ShardUpgradeResult readShardResult(StreamInput in) throws IOException {
ShardUpgradeResult result = new ShardUpgradeResult();
result.readFrom(in);
return result;
}

@Override
protected ShardUpgradeResponse shardOperation(ShardUpgradeRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request.upgradeRequest());
// We are using the current version of elasticsearch as upgrade version since we update mapping to match the current version
return new ShardUpgradeResponse(request.shardId(), indexShard.routingEntry().primary(), Version.CURRENT, oldestLuceneSegment);
protected UpgradeRequest readRequestFrom(StreamInput in) throws IOException {
UpgradeRequest request = new UpgradeRequest();
request.readFrom(in);
return request;
}

/**
* The upgrade request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(ClusterState clusterState, UpgradeRequest request, String[] concreteIndices) {
GroupShardsIterator iterator = clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
protected ShardsIterator shards(ClusterState clusterState, UpgradeRequest request, String[] concreteIndices) {
ShardsIterator iterator = clusterState.routingTable().allShards(concreteIndices);
Set<String> indicesWithMissingPrimaries = indicesWithMissingPrimaries(clusterState, concreteIndices);
if (indicesWithMissingPrimaries.isEmpty()) {
return iterator;
Expand Down Expand Up @@ -231,5 +217,4 @@ public void onFailure(Throwable e) {
}
});
}

}