Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk cluster state updates on index deletion #11258

Merged
merged 1 commit into from
Oct 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -77,42 +76,22 @@ protected ClusterBlockException checkBlock(DeleteIndexRequest request, ClusterSt

@Override
protected void masterOperation(final DeleteIndexRequest request, final ClusterState state, final ActionListener<DeleteIndexResponse> listener) {
String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices.length == 0) {
listener.onResponse(new DeleteIndexResponse(true));
return;
}
// TODO: this API should be improved, currently, if one delete index failed, we send a failure, we should send a response array that includes all the indices that were deleted
final CountDown count = new CountDown(concreteIndices.length);
for (final String index : concreteIndices) {
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
deleteIndexService.deleteIndices(new MetaDataDeleteIndexService.Request(concreteIndices).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {

private volatile Throwable lastFailure;
private volatile boolean ack = true;
@Override
public void onResponse(MetaDataDeleteIndexService.Response response) {
listener.onResponse(new DeleteIndexResponse(response.acknowledged()));
}

@Override
public void onResponse(MetaDataDeleteIndexService.Response response) {
if (!response.acknowledged()) {
ack = false;
}
if (count.countDown()) {
if (lastFailure != null) {
listener.onFailure(lastFailure);
} else {
listener.onResponse(new DeleteIndexResponse(ack));
}
}
}

@Override
public void onFailure(Throwable t) {
logger.debug("[{}] failed to delete index", t, index);
lastFailure = t;
if (count.countDown()) {
listener.onFailure(t);
}
}
});
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -66,9 +66,11 @@ public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, Clus
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
}

public void deleteIndex(final Request request, final Listener userListener) {
public void deleteIndices(final Request request, final Listener userListener) {
Collection<String> indices = Arrays.asList(request.indices);
final DeleteIndexListener listener = new DeleteIndexListener(userListener);
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new ClusterStateUpdateTask() {

clusterService.submitStateUpdateTask("delete-index " + indices, Priority.URGENT, new ClusterStateUpdateTask() {

@Override
public TimeValue timeout() {
Expand All @@ -82,34 +84,32 @@ public void onFailure(String source, Throwable t) {

@Override
public ClusterState execute(final ClusterState currentState) {
if (!currentState.metaData().hasConcreteIndex(request.index)) {
throw new IndexNotFoundException(request.index);
}

logger.info("[{}] deleting index", request.index);

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
routingTableBuilder.remove(request.index);
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());

MetaData newMetaData = MetaData.builder(currentState.metaData())
.remove(request.index)
.build();

RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).routingTable(routingTableBuilder.build()).metaData(newMetaData).build());
for (final String index: indices) {
if (!currentState.metaData().hasConcreteIndex(index)) {
throw new IndexNotFoundException(index);
}

ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
logger.debug("[{}] deleting index", index);

routingTableBuilder.remove(index);
clusterBlocksBuilder.removeIndexBlocks(index);
metaDataBuilder.remove(index);
}
// wait for events from all nodes that it has been removed from their respective metadata...
int count = currentState.nodes().size();
// add the notifications that the store was deleted from *data* nodes
count += currentState.nodes().dataNodes().size();
final AtomicInteger counter = new AtomicInteger(count);
final AtomicInteger counter = new AtomicInteger(count * indices.size());

// this listener will be notified once we get back a notification based on the cluster state change below.
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
@Override
public void onNodeIndexDeleted(String index, String nodeId) {
if (index.equals(request.index)) {
public void onNodeIndexDeleted(String deleted, String nodeId) {
if (indices.contains(deleted)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexDeletedAction.remove(this);
Expand All @@ -118,8 +118,8 @@ public void onNodeIndexDeleted(String index, String nodeId) {
}

@Override
public void onNodeIndexStoreDeleted(String index, String nodeId) {
if (index.equals(request.index)) {
public void onNodeIndexStoreDeleted(String deleted, String nodeId) {
if (indices.contains(deleted)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexDeletedAction.remove(this);
Expand All @@ -128,15 +128,15 @@ public void onNodeIndexStoreDeleted(String index, String nodeId) {
}
};
nodeIndexDeletedAction.add(nodeIndexDeleteListener);

listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override
public void run() {
listener.onResponse(new Response(false));
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
}
listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, () -> {
listener.onResponse(new Response(false));
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
});

MetaData newMetaData = metaDataBuilder.build();
ClusterBlocks blocks = clusterBlocksBuilder.build();
RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).routingTable(routingTableBuilder.build()).metaData(newMetaData).build());
return ClusterState.builder(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
}

Expand Down Expand Up @@ -173,7 +173,6 @@ public void onFailure(Throwable t) {
}
}


public interface Listener {

void onResponse(Response response);
Expand All @@ -183,13 +182,13 @@ public interface Listener {

public static class Request {

final String index;
final String[] indices;

TimeValue timeout = TimeValue.timeValueSeconds(10);
TimeValue masterTimeout = MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT;

public Request(String index) {
this.index = index;
public Request(String[] indices) {
this.indices = indices;
}

public Request timeout(TimeValue timeout) {
Expand Down