Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PrimaryShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]TransportNodesListGatewayMetaState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]TransportNodesListGatewayStartedShards.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]HttpTransportSettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]HttpRequestHandler.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpChannel.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -110,7 +109,7 @@ protected void masterOperation(IndicesShardStoresRequest request, ClusterState s
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, state.metaData(), shardIdsToFetch, listener).start();
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardIdsToFetch, listener).start();
}

@Override
Expand All @@ -121,16 +120,14 @@ protected ClusterBlockException checkBlock(IndicesShardStoresRequest request, Cl
private class AsyncShardStoresInfoFetches {
private final DiscoveryNodes nodes;
private final RoutingNodes routingNodes;
private final MetaData metaData;
private final Set<ShardId> shardIds;
private final ActionListener<IndicesShardStoresResponse> listener;
private CountDown expectedOps;
private final Queue<InternalAsyncFetch.Response> fetchResponses;

AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, MetaData metaData, Set<ShardId> shardIds, ActionListener<IndicesShardStoresResponse> listener) {
AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set<ShardId> shardIds, ActionListener<IndicesShardStoresResponse> listener) {
this.nodes = nodes;
this.routingNodes = routingNodes;
this.metaData = metaData;
this.shardIds = shardIds;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
Expand All @@ -143,7 +140,7 @@ void start() {
} else {
for (ShardId shardId : shardIds) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shardId, listShardStoresInfo);
fetch.fetchData(nodes, metaData, Collections.<String>emptySet());
fetch.fetchData(nodes, Collections.<String>emptySet());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -60,7 +58,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface List<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener<NodesResponse> listener);
void list(ShardId shardId, String[] nodesIds, ActionListener<NodesResponse> listener);
}

protected final ESLogger logger;
Expand Down Expand Up @@ -104,7 +102,7 @@ public synchronized int getNumberOfInFlightFetches() {
* The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need
* to keep them around and make sure we add them back when all the responses are fetched and returned.
*/
public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, MetaData metaData, Set<String> ignoreNodes) {
public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Set<String> ignoreNodes) {
if (closed) {
throw new IllegalStateException(shardId + ": can't fetch data on closed async fetch");
}
Expand All @@ -121,7 +119,7 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, MetaData meta
for (NodeEntry<T> nodeEntry : nodesToFetch) {
nodesIds[index++] = nodeEntry.getNodeId();
}
asyncFetch(shardId, nodesIds, metaData);
asyncFetch(shardId, nodesIds);
}

// if we are still fetching, return null to indicate it
Expand Down Expand Up @@ -268,10 +266,9 @@ private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
* Async fetches data for the provided shard with the set of nodes that need to be fetched from.
*/
// visible for testing
void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) {
IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
void asyncFetch(final ShardId shardId, final String[] nodesIds) {
logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds);
action.list(shardId, indexMetaData, nodesIds, new ActionListener<BaseNodesResponse<T>>() {
action.list(shardId, nodesIds, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> response) {
processAsyncFetch(shardId, response.getNodes(), response.failures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
asyncFetchStarted.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState =
fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));

if (shardState.hasData() == true) {
shardState.processAllocation(allocation);
Expand All @@ -179,7 +179,7 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeS
asyncFetchStore.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores =
fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData() == true) {
shardStores.processAllocation(allocation);
}
Expand Down
Loading