Skip to content

Commit

Permalink
Automatic index creation can still cause "index missing" failures, cl…
Browse files Browse the repository at this point in the history
…oses elastic#1199.
  • Loading branch information
kimchy committed Aug 3, 2011
1 parent 18a7e41 commit 32fae6a
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 90 deletions.
Expand Up @@ -20,6 +20,10 @@
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;

import java.util.List;

/**
* @author kimchy (shay.banon)
Expand Down Expand Up @@ -70,6 +74,50 @@ public boolean indexRoutingTableChanged(String index) {
return true;
}

/**
* Returns the indices created in this event
*/
public List<String> indicesCreated() {
if (previousState == null) {
return Lists.newArrayList(state.metaData().indices().keySet());
}
if (!metaDataChanged()) {
return ImmutableList.of();
}
List<String> created = null;
for (String index : state.metaData().indices().keySet()) {
if (!previousState.metaData().hasIndex(index)) {
if (created == null) {
created = Lists.newArrayList();
}
created.add(index);
}
}
return created == null ? ImmutableList.<String>of() : created;
}

/**
* Returns the indices deleted in this event
*/
public List<String> indicesDeleted() {
if (previousState == null) {
return ImmutableList.of();
}
if (!metaDataChanged()) {
return ImmutableList.of();
}
List<String> deleted = null;
for (String index : previousState.metaData().indices().keySet()) {
if (!state.metaData().hasIndex(index)) {
if (deleted == null) {
deleted = Lists.newArrayList();
}
deleted.add(index);
}
}
return deleted == null ? ImmutableList.<String>of() : deleted;
}

public boolean metaDataChanged() {
return state.metaData() != previousState.metaData();
}
Expand Down
Expand Up @@ -27,9 +27,7 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -63,7 +61,11 @@
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -118,8 +120,6 @@ public void createIndex(final Request request, final Listener userListener) {


clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() {
final Set<String> allocatedNodes = Sets.newHashSet();

@Override public ClusterState execute(ClusterState currentState) {
try {
try {
Expand Down Expand Up @@ -271,43 +271,28 @@ public void createIndex(final Request request, final Listener userListener) {
updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
}

// initialize the counter only for nodes the shards are allocated to
if (updatedState.routingTable().hasIndex(request.index)) {
for (IndexShardRoutingTable indexShardRoutingTable : updatedState.routingTable().index(request.index)) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
// if we have a routing for this shard on a node, and its not the master node (since we already created
// an index on it), then add it
if (shardRouting.currentNodeId() != null && !updatedState.nodes().localNodeId().equals(shardRouting.currentNodeId())) {
allocatedNodes.add(shardRouting.currentNodeId());
}
}
}
}

if (!allocatedNodes.isEmpty()) {
final AtomicInteger counter = new AtomicInteger(allocatedNodes.size());
// we wait for events from all nodes that the index has been added to the metadata
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());

final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true, indexMetaData));
nodeIndexCreatedAction.remove(this);
}
final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true, indexMetaData));
nodeIndexCreatedAction.remove(this);
}
}
};

nodeIndexCreatedAction.add(nodeIndexCreatedListener);
}
};

listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
listener.onResponse(new Response(false, indexMetaData));
nodeIndexCreatedAction.remove(nodeIndexCreatedListener);
}
});
}
nodeIndexCreatedAction.add(nodeIndexCreatedListener);

listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
listener.onResponse(new Response(false, indexMetaData));
nodeIndexCreatedAction.remove(nodeIndexCreatedListener);
}
});

return updatedState;
} catch (Exception e) {
Expand All @@ -318,9 +303,6 @@ public void createIndex(final Request request, final Listener userListener) {
}

@Override public void clusterStateProcessed(ClusterState clusterState) {
if (allocatedNodes.isEmpty()) {
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
}
}
});
}
Expand Down
Expand Up @@ -24,12 +24,9 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -38,7 +35,6 @@
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -93,46 +89,26 @@ public void deleteIndex(final Request request, final Listener userListener) {

ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();

// initialize the counter only for nodes the shards are allocated to
Set<String> allocatedNodes = Sets.newHashSet();
if (currentState.routingTable().hasIndex(request.index)) {
for (IndexShardRoutingTable indexShardRoutingTable : currentState.routingTable().index(request.index)) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.currentNodeId() != null) {
allocatedNodes.add(shardRouting.currentNodeId());
}
if (shardRouting.relocatingNodeId() != null) {
allocatedNodes.add(shardRouting.relocatingNodeId());
}
}
}
}
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());

if (allocatedNodes.isEmpty()) {
// no nodes allocated, don't wait for a response
listener.onResponse(new Response(true));
} else {
final AtomicInteger counter = new AtomicInteger(allocatedNodes.size());

final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
@Override public void onNodeIndexDeleted(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexDeletedAction.remove(this);
}
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
@Override public void onNodeIndexDeleted(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexDeletedAction.remove(this);
}
}
};
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
}
};
nodeIndexDeletedAction.add(nodeIndexDeleteListener);

listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
listener.onResponse(new Response(false));
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
}
});
}
listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
listener.onResponse(new Response(false));
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
}
});

return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
} catch (Exception e) {
Expand Down
Expand Up @@ -180,6 +180,24 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
applyDeletedShards(event);
applyCleanedIndices(event);
applySettings(event);
sendIndexLifecycleEvents(event);
}
}

private void sendIndexLifecycleEvents(final ClusterChangedEvent event) {
for (String index : event.indicesCreated()) {
try {
nodeIndexCreatedAction.nodeIndexCreated(index, event.state().nodes().localNodeId());
} catch (Exception e) {
logger.debug("failed to send to master index {} created event", index);
}
}
for (String index : event.indicesDeleted()) {
try {
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
} catch (Exception e) {
logger.debug("failed to send to master index {} deleted event", index);
}
}
}

Expand Down Expand Up @@ -223,11 +241,6 @@ private void applyDeletedIndices(final ClusterChangedEvent event) {
}
try {
indicesService.deleteIndex(index, "deleting index");
threadPool.cached().execute(new Runnable() {
@Override public void run() {
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
}
});
} catch (Exception e) {
logger.warn("failed to delete index", e);
}
Expand Down Expand Up @@ -292,11 +305,6 @@ private void applyNewIndices(final ClusterChangedEvent event) {
logger.debug("[{}] creating index", indexMetaData.index());
}
indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), event.state().nodes().localNode().id());
threadPool.cached().execute(new Runnable() {
@Override public void run() {
nodeIndexCreatedAction.nodeIndexCreated(indexMetaData.index(), event.state().nodes().localNodeId());
}
});
}
}
}
Expand Down

0 comments on commit 32fae6a

Please sign in to comment.