Skip to content

Commit

Permalink
Merge remote-tracking branch 'dakrone/explain-add-fetch-in-progress'
Browse files Browse the repository at this point in the history
  • Loading branch information
dakrone committed May 23, 2016
2 parents 31e4c16 + 8040ed0 commit bfce901
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,18 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable

private final ShardId shard;
private final boolean primary;
private final boolean hasPendingAsyncFetch;
private final String assignedNodeId;
private final UnassignedInfo unassignedInfo;
private final long remainingDelayMillis;
private final Map<DiscoveryNode, NodeExplanation> nodeExplanations;

public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, long remainingDelayMillis,
@Nullable UnassignedInfo unassignedInfo, Map<DiscoveryNode, NodeExplanation> nodeExplanations) {
@Nullable UnassignedInfo unassignedInfo, boolean hasPendingAsyncFetch,
Map<DiscoveryNode, NodeExplanation> nodeExplanations) {
this.shard = shard;
this.primary = primary;
this.hasPendingAsyncFetch = hasPendingAsyncFetch;
this.assignedNodeId = assignedNodeId;
this.unassignedInfo = unassignedInfo;
this.remainingDelayMillis = remainingDelayMillis;
Expand All @@ -60,6 +63,7 @@ public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable St
public ClusterAllocationExplanation(StreamInput in) throws IOException {
this.shard = ShardId.readShardId(in);
this.primary = in.readBoolean();
this.hasPendingAsyncFetch = in.readBoolean();
this.assignedNodeId = in.readOptionalString();
this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
this.remainingDelayMillis = in.readVLong();
Expand All @@ -77,6 +81,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
this.getShard().writeTo(out);
out.writeBoolean(this.isPrimary());
out.writeBoolean(this.isStillFetchingShardData());
out.writeOptionalString(this.getAssignedNodeId());
out.writeOptionalWriteable(this.getUnassignedInfo());
out.writeVLong(remainingDelayMillis);
Expand All @@ -97,6 +102,11 @@ public boolean isPrimary() {
return this.primary;
}

/** Return turn if shard data is still being fetched for the allocation */
public boolean isStillFetchingShardData() {
return this.hasPendingAsyncFetch;
}

/** Return turn if the shard is assigned to a node */
public boolean isAssigned() {
return this.assignedNodeId != null;
Expand Down Expand Up @@ -138,6 +148,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (assignedNodeId != null) {
builder.field("assigned_node_id", this.assignedNodeId);
}
builder.field("shard_state_fetch_pending", this.hasPendingAsyncFetch);
// If we have unassigned info, show that
if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -69,19 +70,22 @@ public class TransportClusterAllocationExplainAction
private final AllocationDeciders allocationDeciders;
private final ShardsAllocator shardAllocator;
private final TransportIndicesShardStoresAction shardStoresAction;
private final GatewayAllocator gatewayAllocator;

@Inject
public TransportClusterAllocationExplainAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterInfoService clusterInfoService, AllocationDeciders allocationDeciders,
ShardsAllocator shardAllocator, TransportIndicesShardStoresAction shardStoresAction) {
ShardsAllocator shardAllocator, TransportIndicesShardStoresAction shardStoresAction,
GatewayAllocator gatewayAllocator) {
super(settings, ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, ClusterAllocationExplainRequest::new);
this.clusterInfoService = clusterInfoService;
this.allocationDeciders = allocationDeciders;
this.shardAllocator = shardAllocator;
this.shardStoresAction = shardStoresAction;
this.gatewayAllocator = gatewayAllocator;
}

@Override
Expand Down Expand Up @@ -130,7 +134,8 @@ public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
Float nodeWeight,
IndicesShardStoresResponse.StoreStatus storeStatus,
String assignedNodeId,
Set<String> activeAllocationIds) {
Set<String> activeAllocationIds,
boolean hasPendingAsyncFetch) {
final ClusterAllocationExplanation.FinalDecision finalDecision;
final ClusterAllocationExplanation.StoreCopy storeCopy;
final String finalExplanation;
Expand Down Expand Up @@ -161,6 +166,19 @@ public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
if (node.getId().equals(assignedNodeId)) {
finalDecision = ClusterAllocationExplanation.FinalDecision.ALREADY_ASSIGNED;
finalExplanation = "the shard is already assigned to this node";
} else if (hasPendingAsyncFetch &&
shard.primary() == false &&
shard.unassigned() &&
shard.allocatedPostIndexCreate(indexMetaData) &&
nodeDecision.type() != Decision.Type.YES) {
finalExplanation = "the shard cannot be assigned because allocation deciders return a " + nodeDecision.type().name() +
" decision and the shard's state is still being fetched";
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
} else if (hasPendingAsyncFetch &&
shard.unassigned() &&
shard.allocatedPostIndexCreate(indexMetaData)) {
finalExplanation = "the shard's state is still being fetched so it cannot be allocated";
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
} else if (shard.primary() && shard.unassigned() && shard.allocatedPostIndexCreate(indexMetaData) &&
storeCopy == ClusterAllocationExplanation.StoreCopy.STALE) {
finalExplanation = "the copy of the shard is stale, allocation ids do not match";
Expand All @@ -180,6 +198,7 @@ public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
finalExplanation = "the shard cannot be assigned because one or more allocation decider returns a 'NO' decision";
} else {
// TODO: handle throttling decision better here
finalDecision = ClusterAllocationExplanation.FinalDecision.YES;
if (storeCopy == ClusterAllocationExplanation.StoreCopy.AVAILABLE) {
finalExplanation = "the shard can be assigned and the node contains a valid copy of the shard data";
Expand All @@ -198,7 +217,8 @@ public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
*/
public static ClusterAllocationExplanation explainShard(ShardRouting shard, RoutingAllocation allocation, RoutingNodes routingNodes,
boolean includeYesDecisions, ShardsAllocator shardAllocator,
List<IndicesShardStoresResponse.StoreStatus> shardStores) {
List<IndicesShardStoresResponse.StoreStatus> shardStores,
GatewayAllocator gatewayAllocator) {
// don't short circuit deciders, we want a full explanation
allocation.debugDecision(true);
// get the existing unassigned info if available
Expand Down Expand Up @@ -238,11 +258,12 @@ public static ClusterAllocationExplanation explainShard(ShardRouting shard, Rout
Float weight = weights.get(node);
IndicesShardStoresResponse.StoreStatus storeStatus = nodeToStatus.get(node);
NodeExplanation nodeExplanation = calculateNodeExplanation(shard, indexMetaData, node, decision, weight,
storeStatus, shard.currentNodeId(), indexMetaData.activeAllocationIds(shard.getId()));
storeStatus, shard.currentNodeId(), indexMetaData.activeAllocationIds(shard.getId()),
allocation.hasPendingAsyncFetch());
explanations.put(node, nodeExplanation);
}
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(),
shard.currentNodeId(), remainingDelayMillis, ui, explanations);
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(),
remainingDelayMillis, ui, gatewayAllocator.hasFetchPending(shard.shardId(), shard.primary()), explanations);
}

@Override
Expand Down Expand Up @@ -297,7 +318,7 @@ public void onResponse(IndicesShardStoresResponse shardStoreResponse) {
shardStoreResponse.getStoreStatuses().get(shardRouting.getIndexName());
List<IndicesShardStoresResponse.StoreStatus> shardStoreStatus = shardStatuses.get(shardRouting.id());
ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes,
request.includeYesDecisions(), shardAllocator, shardStoreStatus);
request.includeYesDecisions(), shardAllocator, shardStoreStatus, gatewayAllocator);
listener.onResponse(new ClusterAllocationExplainResponse(cae));
}

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ public GatewayAllocator(Settings settings, final TransportNodesListGatewayStarte
this.replicaShardAllocator = new InternalReplicaShardAllocator(settings, storeAction);
}

/**
* Returns true if the given shard has an async fetch pending
*/
public boolean hasFetchPending(ShardId shardId, boolean primary) {
if (primary) {
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch = asyncFetchStarted.get(shardId);
if (fetch != null) {
return fetch.getNumberOfInFlightFetches() > 0;
}
} else {
AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch = asyncFetchStore.get(shardId);
if (fetch != null) {
return fetch.getNumberOfInFlightFetches() > 0;
}
}
return false;
}

public void setReallocation(final ClusterService clusterService, final RoutingService routingService) {
this.routingService = routingService;
clusterService.add(new ClusterStateListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void testUnassignedShards() throws Exception {
assertThat(cae.getShard().getIndexName(), equalTo("only-foo"));
assertFalse(cae.isPrimary());
assertFalse(cae.isAssigned());
assertFalse(cae.isStillFetchingShardData());
assertThat(UnassignedInfo.Reason.INDEX_CREATED, equalTo(cae.getUnassignedInfo().getReason()));
assertThat("expecting no remaining delay: " + cae.getRemainingDelayMillis(), cae.getRemainingDelayMillis(), equalTo(0L));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void testShardExplain() throws Exception {
assertEquals(0, cae.getShard().getId());
assertEquals(false, cae.isPrimary());
assertNull(cae.getAssignedNodeId());
assertFalse(cae.isStillFetchingShardData());
assertNotNull(cae.getUnassignedInfo());
NodeExplanation explanation = cae.getNodeExplanations().values().iterator().next();
ClusterAllocationExplanation.FinalDecision fd = explanation.getFinalDecision();
Expand All @@ -68,6 +69,7 @@ public void testShardExplain() throws Exception {
assertEquals("test", cae.getShard().getIndexName());
assertEquals(0, cae.getShard().getId());
assertEquals(true, cae.isPrimary());
assertFalse(cae.isStillFetchingShardData());
assertNotNull("shard should have assigned node id", cae.getAssignedNodeId());
assertNull("assigned shard should not have unassigned info", cae.getUnassignedInfo());
explanation = cae.getNodeExplanations().values().iterator().next();
Expand Down
Loading

0 comments on commit bfce901

Please sign in to comment.