diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index ee48de4fc2342..50d25e5fa8597 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -95,6 +95,8 @@ import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsAction; +import org.elasticsearch.action.admin.indices.shards.TransportIndicesUnassignedShardsAction; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; @@ -239,6 +241,7 @@ protected void configure() { registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); + registerAction(IndicesUnassignedShardsAction.INSTANCE, TransportIndicesUnassignedShardsAction.class); registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class); registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsRequestBuilder.java new file mode 100644 index 0000000000000..c2712c6635d01 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsRequestBuilder.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class IndicesUnassigedShardsRequestBuilder extends MasterNodeReadOperationRequestBuilder { + + public IndicesUnassigedShardsRequestBuilder(ElasticsearchClient client, Action action, String... indices) { + super(client, action, new IndicesUnassignedShardsRequest(indices)); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsResponse.java new file mode 100644 index 0000000000000..ccf87aef4e15c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassigedShardsResponse.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shards; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IndicesUnassigedShardsResponse extends ActionResponse implements ToXContent { + + public static class ShardStatus { + + /** + * Id of the node the shard belongs to + */ + public final String nodeID; + + /** + * Version of the shard, the highest + * version of the replica shards are + * promoted to primary if needed + */ + public final long version; + + /** + * Any exception that occurred + * while trying to open the + * shard index + */ + public final String exception; + + ShardStatus(String nodeID, long version, String exception) { + this.nodeID = nodeID; + this.version = version; + this.exception = exception; + } + } + + private ImmutableOpenMap>> shardStatuses = ImmutableOpenMap.of(); + + public IndicesUnassigedShardsResponse(ImmutableOpenMap>> shardStatuses) { + this.shardStatuses = shardStatuses; + } + + IndicesUnassigedShardsResponse() { + } + + /** + * Returns {@link org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse.ShardStatus}s + * grouped by their index names and shard ids. + */ + public ImmutableOpenMap>> getShardStatuses() { + return shardStatuses; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + ImmutableOpenMap.Builder>> replicaMetaDataBuilder = ImmutableOpenMap.builder(); + for (int i = 0; i < size; i++) { + String index = in.readString(); + int indexEntries = in.readVInt(); + Map> shardEntries = new HashMap<>(indexEntries); + for (int shardCount = 0; shardCount < indexEntries; shardCount++) { + int shardID = in.readInt(); + int nodeEntries = in.readVInt(); + List dataList = new ArrayList<>(nodeEntries); + for (int nodeCount = 0; nodeCount < nodeEntries; nodeCount++) { + String nodeID = in.readString(); + long version = in.readLong(); + String exception = in.readOptionalString(); + dataList.add(new ShardStatus(nodeID, version, exception)); + } + shardEntries.put(shardID, dataList); + } + replicaMetaDataBuilder.put(index, shardEntries); + } + shardStatuses = replicaMetaDataBuilder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shardStatuses.size()); + for (ObjectObjectCursor>> cursor : shardStatuses) { + out.writeString(cursor.key); + out.writeVInt(cursor.value.size()); + for (Map.Entry> listEntry : cursor.value.entrySet()) { + out.writeInt(listEntry.getKey()); + out.writeVInt(listEntry.getValue().size()); + for (ShardStatus data : listEntry.getValue()) { + out.writeString(data.nodeID); + out.writeLong(data.version); + out.writeOptionalString(data.exception); + } + } + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.INDICES); + for (ObjectObjectCursor>> cursor : shardStatuses) { + builder.startObject(cursor.key); + + builder.startObject(Fields.SHARDS); + for (Map.Entry> listEntry : cursor.value.entrySet()) { + builder.startArray(String.valueOf(listEntry.getKey())); + + for (ShardStatus metaData : listEntry.getValue()) { + builder.startObject(); + builder.field(Fields.NODE, metaData.nodeID); + builder.field(Fields.VERSION, metaData.version); + if (metaData.exception != null) { + builder.field(Fields.EXCEPTION, metaData.exception); + } + builder.endObject(); + } + + builder.endArray(); + } + builder.endObject(); + + builder.endObject(); + } + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + static final XContentBuilderString SHARDS = new XContentBuilderString("shards"); + static final XContentBuilderString NODE = new XContentBuilderString("node"); + static final XContentBuilderString VERSION = new XContentBuilderString("version"); + static final XContentBuilderString EXCEPTION = new XContentBuilderString("exception"); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsAction.java new file mode 100644 index 0000000000000..1e9f0c53b83ca --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsAction.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * Action for {@link TransportIndicesUnassignedShardsAction} + * + * Exposes information about where unassigned shards are located, how recent they are + * and reports if the shards can be opened + */ +public class IndicesUnassignedShardsAction extends Action { + + public static final IndicesUnassignedShardsAction INSTANCE = new IndicesUnassignedShardsAction(); + public static final String NAME = "indices:monitor/unassigned_shards"; + + private IndicesUnassignedShardsAction() { + super(NAME); + } + + @Override + public IndicesUnassigedShardsResponse newResponse() { + return new IndicesUnassigedShardsResponse(); + } + + @Override + public IndicesUnassigedShardsRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new IndicesUnassigedShardsRequestBuilder(client, this); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsRequest.java new file mode 100644 index 0000000000000..a52f4975e17d3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesUnassignedShardsRequest.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class IndicesUnassignedShardsRequest extends MasterNodeReadRequest implements IndicesRequest.Replaceable { + + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); + + /** + * Create a request for unassigned shards info for indices + */ + public IndicesUnassignedShardsRequest(String... indices) { + this.indices = indices; + } + + public IndicesUnassignedShardsRequest(String index) { + this.indices = new String[]{index}; + } + + public IndicesUnassignedShardsRequest() { + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public void indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + } + + @Override + public IndicesUnassignedShardsRequest indices(String... indices) { + this.indices = indices; + return this; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArrayNullable(indices); + indicesOptions.writeIndicesOptions(out); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesUnassignedShardsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesUnassignedShardsAction.java new file mode 100644 index 0000000000000..500238e0bd9ed --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesUnassignedShardsAction.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.shards; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse.ShardStatus; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.AsyncShardFetch; +import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Transport action that reads the cluster state for any unassigned shards for specific indices + * and fetches the shard version, host node id and any exception while opening the shard index + * from all the nodes. + */ +public class TransportIndicesUnassignedShardsAction extends TransportMasterNodeReadAction { + + private final TransportNodesListGatewayStartedShards listShardsInfo; + + @Inject + public TransportIndicesUnassignedShardsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, + TransportNodesListGatewayStartedShards listShardsInfo) { + super(settings, IndicesUnassignedShardsAction.NAME, transportService, clusterService, threadPool, actionFilters, IndicesUnassignedShardsRequest.class); + this.listShardsInfo = listShardsInfo; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected IndicesUnassigedShardsResponse newResponse() { + return new IndicesUnassigedShardsResponse(); + } + + @Override + protected void masterOperation(IndicesUnassignedShardsRequest request, ClusterState state, ActionListener listener) { + // collect all unassigned shards for the requested indices + DiscoveryNodes nodes = state.nodes(); + MetaData metaData = state.metaData(); + Set requestedIndices = new HashSet<>(); + requestedIndices.addAll(Arrays.asList(request.indices())); + List shardIdsToFetch = new ArrayList<>(state.routingNodes().unassigned().size()); + for (MutableShardRouting shard : state.routingNodes().unassigned()) { + if (requestedIndices.size() == 0 || requestedIndices.contains(shard.shardId().index().getName())) { + shardIdsToFetch.add(shard.shardId()); + } + } + + // async fetch shard status from all the nodes + if (shardIdsToFetch.size() > 0) { + AsyncShardFetches asyncShardFetches = new AsyncShardFetches(nodes, metaData, shardIdsToFetch.size(), listener); + asyncShardFetches.fetch(shardIdsToFetch); + } else { + listener.onResponse(new IndicesUnassigedShardsResponse()); + } + } + + @Override + protected ClusterBlockException checkBlock(IndicesUnassignedShardsRequest request, ClusterState state) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, request.indices()); + } + + private class AsyncShardFetches { + private final DiscoveryNodes nodes; + private final MetaData metaData; + private final int expectedOps; + private final ActionListener listener; + private final AtomicInteger opsCount; + private final Queue fetchResponses; + + AsyncShardFetches(DiscoveryNodes nodes, MetaData metaData, int expectedOps, ActionListener listener) { + this.nodes = nodes; + this.metaData = metaData; + this.listener = listener; + this.expectedOps = expectedOps; + this.opsCount = new AtomicInteger(0); + this.fetchResponses = new ConcurrentLinkedQueue<>(); + } + + void fetch(Collection shardIds) { + for (ShardId shardId : shardIds) { + InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "monitor_shard_copies", shardId, listShardsInfo); + fetch.fetchData(nodes, metaData, Collections.emptySet()); + } + } + + private class InternalAsyncFetch extends AsyncShardFetch { + + InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, TransportNodesListGatewayStartedShards action) { + super(logger, type, shardId, action); + } + + @Override + protected synchronized void processAsyncFetch(ShardId shardId, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards[] responses, FailedNodeException[] failures) { + super.processAsyncFetch(shardId, responses, failures); + fetchResponses.add(new Response(shardId, responses, failures)); + if (opsCount.incrementAndGet() == expectedOps) { + finish(); + } + } + + void finish() { + ImmutableOpenMap.Builder>> resBuilder = ImmutableOpenMap.builder(); + while (!fetchResponses.isEmpty()) { + Response res = fetchResponses.remove(); + Map> shardMap = resBuilder.get(res.shardId.getIndex()); + if (shardMap == null) { + shardMap = new HashMap<>(); + } + java.util.List shardDataList = shardMap.get(res.shardId.id()); + if (shardDataList == null) { + shardDataList = new ArrayList<>(); + } + for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards response : res.responses) { + shardDataList.add(new ShardStatus(response.getNode().id(), response.version(), response.exception())); + } + shardMap.put(res.shardId.id(), shardDataList); + resBuilder.put(res.shardId.getIndex(), shardMap); + } + listener.onResponse(new IndicesUnassigedShardsResponse(resBuilder.build())); + } + + @Override + protected void reroute(ShardId shardId, String reason) { + // no-op + } + + public class Response { + private final ShardId shardId; + private final TransportNodesListGatewayStartedShards.NodeGatewayStartedShards[] responses; + private final FailedNodeException[] failures; + + public Response(ShardId shardId, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards[] responses, FailedNodeException[] failures) { + this.shardId = shardId; + this.responses = responses; + this.failures = failures; + } + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 05bcc56711f48..e3aa73bcd5e67 100644 --- a/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/core/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -81,6 +81,9 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsRequestBuilder; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -218,6 +221,29 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ IndicesSegmentsRequestBuilder prepareSegments(String... indices); + /** + * The un-assigned shards info of one or more indices. + * + * @param request The indices un-assigned shards request + * @return The result future + * @see Requests#indicesUnassignedShardsRequest(String...) + */ + ActionFuture unassignedShards(IndicesUnassignedShardsRequest request); + + /** + * The un-assigned shards info of one or more indices. + * + * @param request The indices un-assigned shards request + * @param listener A listener to be notified with a result + * @see Requests#indicesUnassignedShardsRequest(String...) + */ + void unassignedShards(IndicesUnassignedShardsRequest request, ActionListener listener); + + /** + * The un-assigned shards of one or more indices. + */ + IndicesUnassigedShardsRequestBuilder prepareUnassignedShards(String... indices); + /** * Creates an index using an explicit request allowing to specify the settings of the index. * diff --git a/core/src/main/java/org/elasticsearch/client/Requests.java b/core/src/main/java/org/elasticsearch/client/Requests.java index 8a70c18b374ff..b87be9b93e75e 100644 --- a/core/src/main/java/org/elasticsearch/client/Requests.java +++ b/core/src/main/java/org/elasticsearch/client/Requests.java @@ -49,6 +49,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.count.CountRequest; @@ -184,6 +185,15 @@ public static IndicesSegmentsRequest indicesSegmentsRequest(String... indices) { return new IndicesSegmentsRequest(indices); } + /** + * Creates an indices un-assigned shards info request. + * @param indices The indices to query un-assigned shards of + * @return The indices un-assigned shards request + * @see org.elasticsearch.client.IndicesAdminClient#unassignedShards(IndicesUnassignedShardsRequest) + */ + public static IndicesUnassignedShardsRequest indicesUnassignedShardsRequest(String... indices) { + return new IndicesUnassignedShardsRequest(indices); + } /** * Creates an indices exists request. * diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 761acd011d1f4..950e0545fbc38 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -176,6 +176,10 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsRequestBuilder; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsAction; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; @@ -1494,6 +1498,21 @@ public IndicesSegmentsRequestBuilder prepareSegments(String... indices) { return new IndicesSegmentsRequestBuilder(this, IndicesSegmentsAction.INSTANCE).setIndices(indices); } + @Override + public ActionFuture unassignedShards(IndicesUnassignedShardsRequest request) { + return execute(IndicesUnassignedShardsAction.INSTANCE, request); + } + + @Override + public void unassignedShards(IndicesUnassignedShardsRequest request, ActionListener listener) { + execute(IndicesUnassignedShardsAction.INSTANCE, request, listener); + } + + @Override + public IndicesUnassigedShardsRequestBuilder prepareUnassignedShards(String... indices) { + return new IndicesUnassigedShardsRequestBuilder(this, IndicesUnassignedShardsAction.INSTANCE, indices); + } + @Override public ActionFuture updateSettings(final UpdateSettingsRequest request) { return execute(UpdateSettingsAction.INSTANCE, request); diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index 8d9a7960f6ede..d2f2d630c34ee 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -118,9 +119,18 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId)); if (shardStateMetaData != null) { final IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name()); // it's a mystery why this is sometimes null - if (metaData != null && canOpenIndex(request.getShardId(), metaData) == false) { - logger.trace("{} can't open index for shard [{}]", shardId, shardStateMetaData); - return new NodeGatewayStartedShards(clusterService.localNode(), -1); + if (metaData != null) { + final boolean canOpenIndex; + try { + canOpenIndex = tryOpenIndex(request.getShardId(), metaData); + } catch (Exception ex) { + logger.trace("{} can't open index for shard [{}]", shardId, shardStateMetaData); + return new NodeGatewayStartedShards(clusterService.localNode(), -1, ExceptionsHelper.detailedMessage(ex)); + } + if (canOpenIndex == false) { + logger.trace("{} can't open index for shard [{}]", shardId, shardStateMetaData); + return new NodeGatewayStartedShards(clusterService.localNode(), -1); + } } // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata // is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index. @@ -139,16 +149,17 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { } } - private boolean canOpenIndex(ShardId shardId, IndexMetaData metaData) throws IOException { - // try and see if we an list unallocated - if (metaData == null) { - return false; - } + private boolean tryOpenIndex(ShardId shardId, IndexMetaData metaData) throws IOException { final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings()); if (shardPath == null) { return false; } - return Store.canOpenIndex(logger, shardPath.resolveIndex()); + try { + return Store.tryOpenIndex(shardPath.resolveIndex()); + } catch (Exception ex) { + logger.trace("Can't open index for path [{}]", ex, shardPath.resolveIndex()); + throw ex; + } } @Override @@ -269,29 +280,40 @@ public String getIndexUUID() { public static class NodeGatewayStartedShards extends BaseNodeResponse { private long version = -1; + private String exception = null; NodeGatewayStartedShards() { } - public NodeGatewayStartedShards(DiscoveryNode node, long version) { + this(node, version, null); + } + + public NodeGatewayStartedShards(DiscoveryNode node, long version, String exception) { super(node); this.version = version; + this.exception = exception; } public long version() { return this.version; } + public String exception() { + return this.exception; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); version = in.readLong(); + exception = in.readOptionalString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(version); + out.writeOptionalString(exception); } } } diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 2617e95b418d0..ebd5b0967561b 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -386,13 +386,24 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ESLogger * corruption markers. */ public static boolean canOpenIndex(ESLogger logger, Path indexLocation) throws IOException { + try { + return tryOpenIndex(indexLocation); + } catch (Exception ex) { + logger.trace("Can't open index for path [{}]", ex, indexLocation); + return false; + } + } + + /** + * Returns true iff the given location contains an index + * and the index can be successfully opened. If the index can not + * be opened, an exception is thrown + */ + public static boolean tryOpenIndex(Path indexLocation) throws IOException { try (Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir, new ShardId("", 1)); Lucene.readSegmentInfos(dir); return true; - } catch (Exception ex) { - logger.trace("Can't open index for path [{}]", ex, indexLocation); - return false; } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index a2d09f4f80e1e..53d9707f84310 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/core/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -66,6 +66,7 @@ import org.elasticsearch.rest.action.admin.indices.optimize.RestOptimizeAction; import org.elasticsearch.rest.action.admin.indices.recovery.RestRecoveryAction; import org.elasticsearch.rest.action.admin.indices.refresh.RestRefreshAction; +import org.elasticsearch.rest.action.admin.indices.shards.RestIndicesUnassignedShardsAction; import org.elasticsearch.rest.action.admin.indices.segments.RestIndicesSegmentsAction; import org.elasticsearch.rest.action.admin.indices.settings.RestGetSettingsAction; import org.elasticsearch.rest.action.admin.indices.settings.RestUpdateSettingsAction; @@ -153,6 +154,7 @@ protected void configure() { bind(RestGetIndicesAction.class).asEagerSingleton(); bind(RestIndicesStatsAction.class).asEagerSingleton(); bind(RestIndicesSegmentsAction.class).asEagerSingleton(); + bind(RestIndicesUnassignedShardsAction.class).asEagerSingleton(); bind(RestGetAliasesAction.class).asEagerSingleton(); bind(RestAliasesExistAction.class).asEagerSingleton(); bind(RestIndexDeleteAliasesAction.class).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/shards/RestIndicesUnassignedShardsAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/shards/RestIndicesUnassignedShardsAction.java new file mode 100644 index 0000000000000..7fd88cdf683e7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/shards/RestIndicesUnassignedShardsAction.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices.shards; + +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestBuilderListener; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestStatus.OK; + +/** + * Rest action for {@link org.elasticsearch.action.admin.indices.shards.IndicesUnassignedShardsAction} + */ +public class RestIndicesUnassignedShardsAction extends BaseRestHandler { + + @Inject + public RestIndicesUnassignedShardsAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(GET, "/_unassigned_shards", this); + controller.registerHandler(GET, "/{index}/_unassigned_shards", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { + IndicesUnassignedShardsRequest indicesSegmentsRequest = new IndicesUnassignedShardsRequest(Strings.splitStringByCommaToArray(request.param("index"))); + indicesSegmentsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesSegmentsRequest.indicesOptions())); + client.admin().indices().unassignedShards(indicesSegmentsRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(IndicesUnassigedShardsResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); + return new BytesRestResponse(OK, builder); + } + }); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesUnassignedShardsRequestTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesUnassignedShardsRequestTests.java new file mode 100644 index 0000000000000..24092ab3f1bdd --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesUnassignedShardsRequestTests.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.action.admin.indices.shards.IndicesUnassigedShardsResponse; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +/* + TODO: test exception reporting when shard index can not be opened + */ +public class IndicesUnassignedShardsRequestTests extends ElasticsearchIntegrationTest { + + @Override + protected int numberOfReplicas() { + return 1; + } + + @Override + protected int numberOfShards() { + return 5; + } + + @Before + public void setupIndex() { + prepareCreate("test", 2); + int numDocs = scaledRandomIntBetween(100, 1000); + for (int j = 0; j < numDocs; ++j) { + String id = Integer.toString(j); + client().prepareIndex("test", "type1", id).setSource("text", "sometext").get(); + } + client().admin().indices().prepareFlush("test").get(); + } + + @Test + public void testBasic() { + ensureGreen(); + IndicesUnassigedShardsResponse rsp = client().admin().indices().prepareUnassignedShards("test").get(); + assertThat(rsp.getShardStatuses().size(), equalTo(0)); + } + + @Test + public void testUnassignedShards() throws Exception { + ensureGreen(); + // TODO: there has to be a better way of having unassigned shards + internalCluster().stopRandomDataNode(); + RoutingNodes.UnassignedShards unassigned = clusterService().state().routingNodes().unassigned(); + IndicesUnassigedShardsResponse response = client().admin().indices().prepareUnassignedShards("test").get(); + + ImmutableOpenMap>> statuses = response.getShardStatuses(); + for (MutableShardRouting shardRouting : unassigned) { + assertTrue(statuses.containsKey(shardRouting.getIndex())); + Map> listMap = statuses.get(shardRouting.getIndex()); + assertTrue(listMap.containsKey(shardRouting.id())); + } + ensureYellow(); + } + + @Test + public void testSerialization() throws Exception { + ensureGreen(); + // TODO: there has to be a better way of having unassigned shards + internalCluster().stopRandomDataNode(); + RoutingNodes.UnassignedShards unassigned = clusterService().state().routingNodes().unassigned(); + IndicesUnassigedShardsResponse response = client().admin().indices().prepareUnassignedShards("test").get(); + XContentBuilder contentBuilder = XContentFactory.jsonBuilder(); + contentBuilder.startObject(); + response.toXContent(contentBuilder, ToXContent.EMPTY_PARAMS); + contentBuilder.endObject(); + BytesReference bytes = contentBuilder.bytes(); + final XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(bytes); + Map map = parser.mapAndClose(); + if (unassigned.size() > 0) { + Map indices = (Map) map.get("indices"); + assertTrue(indices.containsKey("test")); + Map shards = ((Map) ((Map) indices.get("test")).get("shards")); + int nUnassignedShardsForIndex = 0; + for (MutableShardRouting routing : unassigned) { + if (routing.getIndex().equals("test")) { + assertTrue(shards.containsKey(String.valueOf(routing.id()))); + nUnassignedShardsForIndex++; + } + } + assertThat(shards.size(), equalTo(nUnassignedShardsForIndex)); + } + } +}