From b13c5296a8be4cf87371f96fcb6f285b99976179 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 2 Jun 2015 12:31:04 -0400 Subject: [PATCH] When some shards are in unassigned state, it is useful to know the statuses of their copies in the cluster for shard recovery. This PR adds an API that reports the nodes that hold copies of unassigned shards for specific indices, the shard versions (which indicate how recent the copy is) and any exceptions encountered while trying to open the shard indices. The action backing the API is implemented as a master read operation, reading the list of unassigned shards for specified indicies from the cluster state and then fetching shard metadata from all the nodes in the cluster. closes #10952 --- .../elasticsearch/action/ActionModule.java | 3 + .../IndicesUnassigedShardsRequestBuilder.java | 31 +++ .../IndicesUnassigedShardsResponse.java | 166 ++++++++++++++++ .../shards/IndicesUnassignedShardsAction.java | 49 +++++ .../IndicesUnassignedShardsRequest.java | 88 +++++++++ ...ransportIndicesUnassignedShardsAction.java | 179 ++++++++++++++++++ .../client/IndicesAdminClient.java | 26 +++ .../org/elasticsearch/client/Requests.java | 10 + .../client/support/AbstractClient.java | 19 ++ ...ransportNodesListGatewayStartedShards.java | 42 +++- .../org/elasticsearch/index/store/Store.java | 17 +- .../rest/action/RestActionModule.java | 2 + .../RestIndicesUnassignedShardsAction.java | 62 ++++++ .../IndicesUnassignedShardsRequestTests.java | 115 +++++++++++ 14 files changed, 796 insertions(+), 13 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsRequestBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsResponse.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsRequest.java create mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesUnassignedShardsAction.java create mode 100644 core/src/main/java/org/elasticsearch/rest/action/admin/indices/shards/RestIndicesUnassignedShardsAction.java create mode 100644 core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesUnassignedShardsRequestTests.java diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index ee48de4fc2342..50d25e5fa8597 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -95,6 +95,8 @@ import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsAction; +import org.elasticsearch.action.admin.indices.shards.TransportIndicesUnassignedShardsAction; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; @@ -239,6 +241,7 @@ protected void configure() { registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); + registerAction(IndicesUnassignedShardsAction.INSTANCE, TransportIndicesUnassignedShardsAction.class); registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class); registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsRequestBuilder.java new file mode 100644 index 0000000000000..c2712c6635d01 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsRequestBuilder.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class IndicesUnassigedShardsRequestBuilder extends MasterNodeReadOperationRequestBuilder { + + public IndicesUnassigedShardsRequestBuilder(ElasticsearchClient client, Action action, String... indices) { + super(client, action, new IndicesUnassignedShardsRequest(indices)); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsResponse.java new file mode 100644 index 0000000000000..ccf87aef4e15c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsResponse.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shards; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IndicesUnassigedShardsResponse extends ActionResponse implements ToXContent { + + public static class ShardStatus { + + /** + * Id of the node the shard belongs to + */ + public final String nodeID; + + /** + * Version of the shard, the highest + * version of the replica shards are + * promoted to primary if needed + */ + public final long version; + + /** + * Any exception that occurred + * while trying to open the + * shard index + */ + public final String exception; + + ShardStatus(String nodeID, long version, String exception) { + this.nodeID = nodeID; + this.version = version; + this.exception = exception; + } + } + + private ImmutableOpenMap>> shardStatuses = ImmutableOpenMap.of(); + + public IndicesUnassigedShardsResponse(ImmutableOpenMap>> shardStatuses) { + this.shardStatuses = shardStatuses; + } + + IndicesUnassigedShardsResponse() { + } + + /** + * Returns {@link org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse.ShardStatus}s + * grouped by their index names and shard ids. + */ + public ImmutableOpenMap>> getShardStatuses() { + return shardStatuses; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + ImmutableOpenMap.Builder>> replicaMetaDataBuilder = ImmutableOpenMap.builder(); + for (int i = 0; i < size; i++) { + String index = in.readString(); + int indexEntries = in.readVInt(); + Map> shardEntries = new HashMap<>(indexEntries); + for (int shardCount = 0; shardCount < indexEntries; shardCount++) { + int shardID = in.readInt(); + int nodeEntries = in.readVInt(); + List dataList = new ArrayList<>(nodeEntries); + for (int nodeCount = 0; nodeCount < nodeEntries; nodeCount++) { + String nodeID = in.readString(); + long version = in.readLong(); + String exception = in.readOptionalString(); + dataList.add(new ShardStatus(nodeID, version, exception)); + } + shardEntries.put(shardID, dataList); + } + replicaMetaDataBuilder.put(index, shardEntries); + } + shardStatuses = replicaMetaDataBuilder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shardStatuses.size()); + for (ObjectObjectCursor>> cursor : shardStatuses) { + out.writeString(cursor.key); + out.writeVInt(cursor.value.size()); + for (Map.Entry> listEntry : cursor.value.entrySet()) { + out.writeInt(listEntry.getKey()); + out.writeVInt(listEntry.getValue().size()); + for (ShardStatus data : listEntry.getValue()) { + out.writeString(data.nodeID); + out.writeLong(data.version); + out.writeOptionalString(data.exception); + } + } + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.INDICES); + for (ObjectObjectCursor>> cursor : shardStatuses) { + builder.startObject(cursor.key); + + builder.startObject(Fields.SHARDS); + for (Map.Entry> listEntry : cursor.value.entrySet()) { + builder.startArray(String.valueOf(listEntry.getKey())); + + for (ShardStatus metaData : listEntry.getValue()) { + builder.startObject(); + builder.field(Fields.NODE, metaData.nodeID); + builder.field(Fields.VERSION, metaData.version); + if (metaData.exception != null) { + builder.field(Fields.EXCEPTION, metaData.exception); + } + builder.endObject(); + } + + builder.endArray(); + } + builder.endObject(); + + builder.endObject(); + } + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + static final XContentBuilderString SHARDS = new XContentBuilderString("shards"); + static final XContentBuilderString NODE = new XContentBuilderString("node"); + static final XContentBuilderString VERSION = new XContentBuilderString("version"); + static final XContentBuilderString EXCEPTION = new XContentBuilderString("exception"); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsAction.java new file mode 100644 index 0000000000000..1e9f0c53b83ca --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsAction.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * Action for {@link TransportIndicesUnassignedShardsAction} + * + * Exposes information about where unassigned shards are located, how recent they are + * and reports if the shards can be opened + */ +public class IndicesUnassignedShardsAction extends Action { + + public static final IndicesUnassignedShardsAction INSTANCE = new IndicesUnassignedShardsAction(); + public static final String NAME = "indices:monitor/unassigned_shards"; + + private IndicesUnassignedShardsAction() { + super(NAME); + } + + @Override + public IndicesUnassigedShardsResponse newResponse() { + return new IndicesUnassigedShardsResponse(); + } + + @Override + public IndicesUnassigedShardsRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new IndicesUnassigedShardsRequestBuilder(client, this); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsRequest.java new file mode 100644 index 0000000000000..a52f4975e17d3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsRequest.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class IndicesUnassignedShardsRequest extends MasterNodeReadRequest implements IndicesRequest.Replaceable { + + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); + + /** + * Create a request for unassigned shards info for indices + */ + public IndicesUnassignedShardsRequest(String... indices) { + this.indices = indices; + } + + public IndicesUnassignedShardsRequest(String index) { + this.indices = new String[]{index}; + } + + public IndicesUnassignedShardsRequest() { + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public void indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + } + + @Override + public IndicesUnassignedShardsRequest indices(String... indices) { + this.indices = indices; + return this; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArrayNullable(indices); + indicesOptions.writeIndicesOptions(out); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesUnassignedShardsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesUnassignedShardsAction.java new file mode 100644 index 0000000000000..500238e0bd9ed --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesUnassignedShardsAction.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse.ShardStatus; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.AsyncShardFetch; +import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Transport action that reads the cluster state for any unassigned shards for specific indices + * and fetches the shard version, host node id and any exception while opening the shard index + * from all the nodes. + */ +public class TransportIndicesUnassignedShardsAction extends TransportMasterNodeReadAction { + + private final TransportNodesListGatewayStartedShards listShardsInfo; + + @Inject + public TransportIndicesUnassignedShardsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, + TransportNodesListGatewayStartedShards listShardsInfo) { + super(settings, IndicesUnassignedShardsAction.NAME, transportService, clusterService, threadPool, actionFilters, IndicesUnassignedShardsRequest.class); + this.listShardsInfo = listShardsInfo; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected IndicesUnassigedShardsResponse newResponse() { + return new IndicesUnassigedShardsResponse(); + } + + @Override + protected void masterOperation(IndicesUnassignedShardsRequest request, ClusterState state, ActionListener listener) { + // collect all unassigned shards for the requested indices + DiscoveryNodes nodes = state.nodes(); + MetaData metaData = state.metaData(); + Set requestedIndices = new HashSet<>(); + requestedIndices.addAll(Arrays.asList(request.indices())); + List shardIdsToFetch = new ArrayList<>(state.routingNodes().unassigned().size()); + for (MutableShardRouting shard : state.routingNodes().unassigned()) { + if (requestedIndices.size() == 0 || requestedIndices.contains(shard.shardId().index().getName())) { + shardIdsToFetch.add(shard.shardId()); + } + } + + // async fetch shard status from all the nodes + if (shardIdsToFetch.size() > 0) { + AsyncShardFetches asyncShardFetches = new AsyncShardFetches(nodes, metaData, shardIdsToFetch.size(), listener); + asyncShardFetches.fetch(shardIdsToFetch); + } else { + listener.onResponse(new IndicesUnassigedShardsResponse()); + } + } + + @Override + protected ClusterBlockException checkBlock(IndicesUnassignedShardsRequest request, ClusterState state) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, request.indices()); + } + + private class AsyncShardFetches { + private final DiscoveryNodes nodes; + private final MetaData metaData; + private final int expectedOps; + private final ActionListener listener; + private final AtomicInteger opsCount; + private final Queue fetchResponses; + + AsyncShardFetches(DiscoveryNodes nodes, MetaData metaData, int expectedOps, ActionListener listener) { + this.nodes = nodes; + this.metaData = metaData; + this.listener = listener; + this.expectedOps = expectedOps; + this.opsCount = new AtomicInteger(0); + this.fetchResponses = new ConcurrentLinkedQueue<>(); + } + + void fetch(Collection shardIds) { + for (ShardId shardId : shardIds) { + InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "monitor_shard_copies", shardId, listShardsInfo); + fetch.fetchData(nodes, metaData, Collections.emptySet()); + } + } + + private class InternalAsyncFetch extends AsyncShardFetch { + + InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, TransportNodesListGatewayStartedShards action) { + super(logger, type, shardId, action); + } + + @Override + protected synchronized void processAsyncFetch(ShardId shardId, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards[] responses, FailedNodeException[] failures) { + super.processAsyncFetch(shardId, responses, failures); + fetchResponses.add(new Response(shardId, responses, failures)); + if (opsCount.incrementAndGet() == expectedOps) { + finish(); + } + } + + void finish() { + ImmutableOpenMap.Builder>> resBuilder = ImmutableOpenMap.builder(); + while (!fetchResponses.isEmpty()) { + Response res = fetchResponses.remove(); + Map> shardMap = resBuilder.get(res.shardId.getIndex()); + if (shardMap == null) { + shardMap = new HashMap<>(); + } + java.util.List shardDataList = shardMap.get(res.shardId.id()); + if (shardDataList == null) { + shardDataList = new ArrayList<>(); + } + for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards response : res.responses) { + shardDataList.add(new ShardStatus(response.getNode().id(), response.version(), response.exception())); + } + shardMap.put(res.shardId.id(), shardDataList); + resBuilder.put(res.shardId.getIndex(), shardMap); + } + listener.onResponse(new IndicesUnassigedShardsResponse(resBuilder.build())); + } + + @Override + protected void reroute(ShardId shardId, String reason) { + // no-op + } + + public class Response { + private final ShardId shardId; + private final TransportNodesListGatewayStartedShards.NodeGatewayStartedShards[] responses; + private final FailedNodeException[] failures; + + public Response(ShardId shardId, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards[] responses, FailedNodeException[] failures) { + this.shardId = shardId; + this.responses = responses; + this.failures = failures; + } + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 05bcc56711f48..e3aa73bcd5e67 100644 --- a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -81,6 +81,9 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsRequestBuilder; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -218,6 +221,29 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ IndicesSegmentsRequestBuilder prepareSegments(String... indices); + /** + * The un-assigned shards info of one or more indices. + * + * @param request The indices un-assigned shards request + * @return The result future + * @see Requests#indicesUnassignedShardsRequest(String...) + */ + ActionFuture unassignedShards(IndicesUnassignedShardsRequest request); + + /** + * The un-assigned shards info of one or more indices. + * + * @param request The indices un-assigned shards request + * @param listener A listener to be notified with a result + * @see Requests#indicesUnassignedShardsRequest(String...) + */ + void unassignedShards(IndicesUnassignedShardsRequest request, ActionListener listener); + + /** + * The un-assigned shards of one or more indices. + */ + IndicesUnassigedShardsRequestBuilder prepareUnassignedShards(String... indices); + /** * Creates an index using an explicit request allowing to specify the settings of the index. * diff --git a/core/src/main/java/org/elasticsearch/client/Requests.java b/core/src/main/java/org/elasticsearch/client/Requests.java index 8a70c18b374ff..b87be9b93e75e 100644 --- a/core/src/main/java/org/elasticsearch/client/Requests.java +++ b/core/src/main/java/org/elasticsearch/client/Requests.java @@ -49,6 +49,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.count.CountRequest; @@ -184,6 +185,15 @@ public static IndicesSegmentsRequest indicesSegmentsRequest(String... indices) { return new IndicesSegmentsRequest(indices); } + /** + * Creates an indices un-assigned shards info request. + * @param indices The indices to query un-assigned shards of + * @return The indices un-assigned shards request + * @see org.elasticsearch.client.IndicesAdminClient#unassignedShards(IndicesUnassignedShardsRequest) + */ + public static IndicesUnassignedShardsRequest indicesUnassignedShardsRequest(String... indices) { + return new IndicesUnassignedShardsRequest(indices); + } /** * Creates an indices exists request. * diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 761acd011d1f4..950e0545fbc38 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -176,6 +176,10 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsRequestBuilder; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsAction; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; @@ -1494,6 +1498,21 @@ public IndicesSegmentsRequestBuilder prepareSegments(String... indices) { return new IndicesSegmentsRequestBuilder(this, IndicesSegmentsAction.INSTANCE).setIndices(indices); } + @Override + public ActionFuture unassignedShards(IndicesUnassignedShardsRequest request) { + return execute(IndicesUnassignedShardsAction.INSTANCE, request); + } + + @Override + public void unassignedShards(IndicesUnassignedShardsRequest request, ActionListener listener) { + execute(IndicesUnassignedShardsAction.INSTANCE, request, listener); + } + + @Override + public IndicesUnassigedShardsRequestBuilder prepareUnassignedShards(String... indices) { + return new IndicesUnassigedShardsRequestBuilder(this, IndicesUnassignedShardsAction.INSTANCE, indices); + } + @Override public ActionFuture updateSettings(final UpdateSettingsRequest request) { return execute(UpdateSettingsAction.INSTANCE, request); diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index 8d9a7960f6ede..d2f2d630c34ee 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -118,9 +119,18 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId)); if (shardStateMetaData != null) { final IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name()); // it's a mystery why this is sometimes null - if (metaData != null && canOpenIndex(request.getShardId(), metaData) == false) { - logger.trace("{} can't open index for shard [{}]", shardId, shardStateMetaData); - return new NodeGatewayStartedShards(clusterService.localNode(), -1); + if (metaData != null) { + final boolean canOpenIndex; + try { + canOpenIndex = tryOpenIndex(request.getShardId(), metaData); + } catch (Exception ex) { + logger.trace("{} can't open index for shard [{}]", shardId, shardStateMetaData); + return new NodeGatewayStartedShards(clusterService.localNode(), -1, ExceptionsHelper.detailedMessage(ex)); + } + if (canOpenIndex == false) { + logger.trace("{} can't open index for shard [{}]", shardId, shardStateMetaData); + return new NodeGatewayStartedShards(clusterService.localNode(), -1); + } } // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata // is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index. @@ -139,16 +149,17 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { } } - private boolean canOpenIndex(ShardId shardId, IndexMetaData metaData) throws IOException { - // try and see if we an list unallocated - if (metaData == null) { - return false; - } + private boolean tryOpenIndex(ShardId shardId, IndexMetaData metaData) throws IOException { final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings()); if (shardPath == null) { return false; } - return Store.canOpenIndex(logger, shardPath.resolveIndex()); + try { + return Store.tryOpenIndex(shardPath.resolveIndex()); + } catch (Exception ex) { + logger.trace("Can't open index for path [{}]", ex, shardPath.resolveIndex()); + throw ex; + } } @Override @@ -269,29 +280,40 @@ public String getIndexUUID() { public static class NodeGatewayStartedShards extends BaseNodeResponse { private long version = -1; + private String exception = null; NodeGatewayStartedShards() { } - public NodeGatewayStartedShards(DiscoveryNode node, long version) { + this(node, version, null); + } + + public NodeGatewayStartedShards(DiscoveryNode node, long version, String exception) { super(node); this.version = version; + this.exception = exception; } public long version() { return this.version; } + public String exception() { + return this.exception; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); version = in.readLong(); + exception = in.readOptionalString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(version); + out.writeOptionalString(exception); } } } diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 2617e95b418d0..ebd5b0967561b 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -386,13 +386,24 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ESLogger * corruption markers. */ public static boolean canOpenIndex(ESLogger logger, Path indexLocation) throws IOException { + try { + return tryOpenIndex(indexLocation); + } catch (Exception ex) { + logger.trace("Can't open index for path [{}]", ex, indexLocation); + return false; + } + } + + /** + * Returns true iff the given location contains an index + * and the index can be successfully opened. If the index can not + * be opened, an exception is thrown + */ + public static boolean tryOpenIndex(Path indexLocation) throws IOException { try (Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir, new ShardId("", 1)); Lucene.readSegmentInfos(dir); return true; - } catch (Exception ex) { - logger.trace("Can't open index for path [{}]", ex, indexLocation); - return false; } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index a2d09f4f80e1e..53d9707f84310 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -66,6 +66,7 @@ import org.elasticsearch.rest.action.admin.indices.optimize.RestOptimizeAction; import org.elasticsearch.rest.action.admin.indices.recovery.RestRecoveryAction; import org.elasticsearch.rest.action.admin.indices.refresh.RestRefreshAction; +import org.elasticsearch.rest.action.admin.indices.shards.RestIndicesUnassignedShardsAction; import org.elasticsearch.rest.action.admin.indices.segments.RestIndicesSegmentsAction; import org.elasticsearch.rest.action.admin.indices.settings.RestGetSettingsAction; import org.elasticsearch.rest.action.admin.indices.settings.RestUpdateSettingsAction; @@ -153,6 +154,7 @@ protected void configure() { bind(RestGetIndicesAction.class).asEagerSingleton(); bind(RestIndicesStatsAction.class).asEagerSingleton(); bind(RestIndicesSegmentsAction.class).asEagerSingleton(); + bind(RestIndicesUnassignedShardsAction.class).asEagerSingleton(); bind(RestGetAliasesAction.class).asEagerSingleton(); bind(RestAliasesExistAction.class).asEagerSingleton(); bind(RestIndexDeleteAliasesAction.class).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/shards/RestIndicesUnassignedShardsAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/shards/RestIndicesUnassignedShardsAction.java new file mode 100644 index 0000000000000..7fd88cdf683e7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/shards/RestIndicesUnassignedShardsAction.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices.shards; + +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestBuilderListener; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestStatus.OK; + +/** + * Rest action for {@link org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsAction} + */ +public class RestIndicesUnassignedShardsAction extends BaseRestHandler { + + @Inject + public RestIndicesUnassignedShardsAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(GET, "/_unassigned_shards", this); + controller.registerHandler(GET, "/{index}/_unassigned_shards", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { + IndicesUnassignedShardsRequest indicesSegmentsRequest = new IndicesUnassignedShardsRequest(Strings.splitStringByCommaToArray(request.param("index"))); + indicesSegmentsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesSegmentsRequest.indicesOptions())); + client.admin().indices().unassignedShards(indicesSegmentsRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(IndicesUnassigedShardsResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); + return new BytesRestResponse(OK, builder); + } + }); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesUnassignedShardsRequestTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesUnassignedShardsRequestTests.java new file mode 100644 index 0000000000000..24092ab3f1bdd --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesUnassignedShardsRequestTests.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +/* + TODO: test exception reporting when shard index can not be opened + */ +public class IndicesUnassignedShardsRequestTests extends ElasticsearchIntegrationTest { + + @Override + protected int numberOfReplicas() { + return 1; + } + + @Override + protected int numberOfShards() { + return 5; + } + + @Before + public void setupIndex() { + prepareCreate("test", 2); + int numDocs = scaledRandomIntBetween(100, 1000); + for (int j = 0; j < numDocs; ++j) { + String id = Integer.toString(j); + client().prepareIndex("test", "type1", id).setSource("text", "sometext").get(); + } + client().admin().indices().prepareFlush("test").get(); + } + + @Test + public void testBasic() { + ensureGreen(); + IndicesUnassigedShardsResponse rsp = client().admin().indices().prepareUnassignedShards("test").get(); + assertThat(rsp.getShardStatuses().size(), equalTo(0)); + } + + @Test + public void testUnassignedShards() throws Exception { + ensureGreen(); + // TODO: there has to be a better way of having unassigned shards + internalCluster().stopRandomDataNode(); + RoutingNodes.UnassignedShards unassigned = clusterService().state().routingNodes().unassigned(); + IndicesUnassigedShardsResponse response = client().admin().indices().prepareUnassignedShards("test").get(); + + ImmutableOpenMap>> statuses = response.getShardStatuses(); + for (MutableShardRouting shardRouting : unassigned) { + assertTrue(statuses.containsKey(shardRouting.getIndex())); + Map> listMap = statuses.get(shardRouting.getIndex()); + assertTrue(listMap.containsKey(shardRouting.id())); + } + ensureYellow(); + } + + @Test + public void testSerialization() throws Exception { + ensureGreen(); + // TODO: there has to be a better way of having unassigned shards + internalCluster().stopRandomDataNode(); + RoutingNodes.UnassignedShards unassigned = clusterService().state().routingNodes().unassigned(); + IndicesUnassigedShardsResponse response = client().admin().indices().prepareUnassignedShards("test").get(); + XContentBuilder contentBuilder = XContentFactory.jsonBuilder(); + contentBuilder.startObject(); + response.toXContent(contentBuilder, ToXContent.EMPTY_PARAMS); + contentBuilder.endObject(); + BytesReference bytes = contentBuilder.bytes(); + final XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(bytes); + Map map = parser.mapAndClose(); + if (unassigned.size() > 0) { + Map indices = (Map) map.get("indices"); + assertTrue(indices.containsKey("test")); + Map shards = ((Map) ((Map) indices.get("test")).get("shards")); + int nUnassignedShardsForIndex = 0; + for (MutableShardRouting routing : unassigned) { + if (routing.getIndex().equals("test")) { + assertTrue(shards.containsKey(String.valueOf(routing.id()))); + nUnassignedShardsForIndex++; + } + } + assertThat(shards.size(), equalTo(nUnassignedShardsForIndex)); + } + } +}