Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup LocalGatewayShardsState #8852

Merged
merged 1 commit into from Dec 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,6 +53,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> activeShards;
final ImmutableList<ShardRouting> assignedShards;
final boolean allShardsStarted;

/**
* The initializing list, including ones that are initializing on a target node because of relocation.
Expand All @@ -73,7 +74,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ImmutableList.Builder<ShardRouting> activeShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> assignedShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> allInitializingShards = ImmutableList.builder();

boolean allShardsStarted = true;
for (ShardRouting shard : shards) {
if (shard.primary()) {
primary = shard;
Expand All @@ -93,7 +94,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
if (shard.assignedToNode()) {
assignedShards.add(shard);
}
if (shard.state() != ShardRoutingState.STARTED) {
allShardsStarted = false;
}
}
this.allShardsStarted = allShardsStarted;

this.primary = primary;
if (primary != null) {
Expand Down Expand Up @@ -240,22 +245,6 @@ public ImmutableList<ShardRouting> getAssignedShards() {
return this.assignedShards;
}

/**
* Returns the number of shards in a specific state
*
* @param state state of the shards to count
* @return number of shards in <code>state</code>
*/
public int countWithState(ShardRoutingState state) {
int count = 0;
for (ShardRouting shard : this) {
if (state == shard.state()) {
count++;
}
}
return count;
}

public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(shards));
}
Expand All @@ -268,18 +257,6 @@ public ShardIterator shardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(shards, seed));
}

public ShardIterator activeShardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards));
}

public ShardIterator activeShardsIt() {
return new PlainShardIterator(shardId, activeShards);
}

public ShardIterator activeShardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
}

/**
* Returns an iterator over active and initializing shards. Making sure though that
* its random within the active shards, and initializing shards are the last to iterate through.
Expand All @@ -302,18 +279,6 @@ public ShardIterator activeInitializingShardsIt(int seed) {
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator assignedShardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(assignedShards));
}

public ShardIterator assignedShardsIt() {
return new PlainShardIterator(shardId, assignedShards);
}

public ShardIterator assignedShardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(assignedShards, seed));
}

/**
* Returns an iterator only on the primary shard.
*/
Expand Down Expand Up @@ -382,6 +347,13 @@ public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns <code>true</code> iff all shards in the routing table are started otherwise <code>false</code>
*/
public boolean allShardsStarted() {
return allShardsStarted;
}

static class AttributesKey {

final String[] attributes;
Expand Down
Expand Up @@ -50,6 +50,8 @@ public MutableShardRouting(String index, int shardId, String currentNodeId,
super(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version);
}



/**
* Assign this shard to a node.
*
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
Expand All @@ -39,7 +40,6 @@

import java.io.*;
import java.nio.file.*;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
Expand All @@ -61,7 +61,9 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
listGatewayStartedShards.initGateway(this);
if (listGatewayStartedShards != null) { // for testing
listGatewayStartedShards.initGateway(this);
}
if (DiscoveryNode.dataNode(settings)) {
try {
ensureNoPre019State();
Expand All @@ -81,80 +83,87 @@ public ShardStateInfo loadShardInfo(ShardId shardId) throws Exception {

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
return;
}

if (!event.state().nodes().localNode().dataNode()) {
return;
}

if (!event.routingTableChanged()) {
return;
}

Map<ShardId, ShardStateInfo> newState = Maps.newHashMap();
newState.putAll(this.currentState);

final ClusterState state = event.state();
if (state.blocks().disableStatePersistence() == false
&& state.nodes().localNode().dataNode()
&& event.routingTableChanged()) {
// now, add all the ones that are active and on this node
RoutingNode routingNode = state.readOnlyRoutingNodes().node(state.nodes().localNodeId());
final Map<ShardId, ShardStateInfo> newState;
if (routingNode != null) {
newState = persistRoutingNodeState(routingNode);
} else {
newState = Maps.newHashMap();
}

// remove from the current state all the shards that are completely started somewhere, we won't need them anymore
// and if they are still here, we will add them in the next phase
// Also note, this works well when closing an index, since a closed index will have no routing shards entries
// so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed)
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) {
newState.remove(indexShardRoutingTable.shardId());
// preserve all shards that:
// * are not already in the new map AND
// * belong to an active index AND
// * used to be on this node but are not yet completely stated on any other node
// since these shards are NOT active on this node the won't need to be written above - we just preserve these
// in this map until they are fully started anywhere else or are re-assigned and we need to update the state
final RoutingTable indexRoutingTables = state.routingTable();
for (Map.Entry<ShardId, ShardStateInfo> entry : this.currentState.entrySet()) {
ShardId shardId = entry.getKey();
if (newState.containsKey(shardId) == false) { // this shard used to be here
String indexName = shardId.index().getName();
if (state.metaData().hasIndex(indexName)) { // it's index is not deleted
IndexRoutingTable index = indexRoutingTables.index(indexName);
if (index != null && index.shard(shardId.id()).allShardsStarted() == false) {
// not all shards are active on another node so we put it back until they are active
newState.put(shardId, entry.getValue());
}
}
}
}
this.currentState = newState;
}
// remove deleted indices from the started shards
for (ShardId shardId : currentState.keySet()) {
if (!event.state().metaData().hasIndex(shardId.index().name())) {
newState.remove(shardId);
}
}
// now, add all the ones that are active and on this node
RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId());
if (routingNode != null) {
// our node is not in play yet...
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version(), shardRouting.primary()));
}

Map<ShardId, ShardStateInfo> persistRoutingNodeState(RoutingNode routingNode) {
final Map<ShardId, ShardStateInfo> newState = Maps.newHashMap();
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
ShardId shardId = shardRouting.shardId();
ShardStateInfo shardStateInfo = new ShardStateInfo(shardRouting.version(), shardRouting.primary());
final ShardStateInfo previous = currentState.get(shardId);
if(maybeWriteShardState(shardId, shardStateInfo, previous) ) {
newState.put(shardId, shardStateInfo);
} else if (previous != null) {
currentState.put(shardId, previous);
}
}
}
return newState;
}

// go over the write started shards if needed
for (Iterator<Map.Entry<ShardId, ShardStateInfo>> it = newState.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<ShardId, ShardStateInfo> entry = it.next();
ShardId shardId = entry.getKey();
ShardStateInfo shardStateInfo = entry.getValue();

String writeReason = null;
ShardStateInfo currentShardStateInfo = currentState.get(shardId);
if (currentShardStateInfo == null) {
writeReason = "freshly started, version [" + shardStateInfo.version + "]";
} else if (currentShardStateInfo.version != shardStateInfo.version) {
writeReason = "version changed from [" + currentShardStateInfo.version + "] to [" + shardStateInfo.version + "]";
}
Map<ShardId, ShardStateInfo> getCurrentState() {
return currentState;
}

// we update the write reason if we really need to write a new one...
if (writeReason == null) {
continue;
}
boolean maybeWriteShardState(ShardId shardId, ShardStateInfo shardStateInfo, ShardStateInfo previousState) {
final String writeReason;
if (previousState == null) {
writeReason = "freshly started, version [" + shardStateInfo.version + "]";
} else if (previousState.version < shardStateInfo.version) {
writeReason = "version changed from [" + previousState.version + "] to [" + shardStateInfo.version + "]";
} else {
logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]");
assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]";
return previousState.version == shardStateInfo.version;
}

try {
writeShardState(writeReason, shardId, shardStateInfo, currentShardStateInfo);
} catch (Exception e) {
// we failed to write the shard state, remove it from our builder, we will try and write
// it next time...
it.remove();
}
try {
writeShardState(writeReason, shardId, shardStateInfo, previousState);
} catch (Exception e) {
logger.warn("failed to write shard state for shard " + shardId, e);
// we failed to write the shard state, we will try and write
// it next time...
}
this.currentState = newState;
return true;
}


private Map<ShardId, ShardStateInfo> loadShardsStateInfo() throws Exception {
Set<ShardId> shardIds = nodeEnv.findAllShardIds();
long highestVersion = -1;
Expand Down
Expand Up @@ -23,7 +23,7 @@

/**
*/
public class ShardStateInfo {
public final class ShardStateInfo {

public final long version;

Expand All @@ -35,4 +35,32 @@ public ShardStateInfo(long version, Boolean primary) {
this.version = version;
this.primary = primary;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ShardStateInfo that = (ShardStateInfo) o;

if (version != that.version) return false;
if (primary != null ? !primary.equals(that.primary) : that.primary != null) return false;

return true;
}

@Override
public int hashCode() {
int result = (int) (version ^ (version >>> 32));
result = 31 * result + (primary != null ? primary.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardStateInfo{" +
"version=" + version +
", primary=" + primary +
'}';
}
}
14 changes: 0 additions & 14 deletions src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java
Expand Up @@ -283,18 +283,4 @@ public void run() {
}
env.close();
}

public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(ImmutableSettings.EMPTY);
}

public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
Settings build = ImmutableSettings.builder()
.put(settings)
.put("path.home", newTempDirPath().toAbsolutePath())
.putArray("path.data", tmpPaths()).build();
return new NodeEnvironment(build, new Environment(build));
}


}