Write state also on data nodes if not master eligible #9952

Closed
wants to merge 34 commits into
from

Projects

None yet

6 participants

@brwe
Contributor
brwe commented Mar 2, 2015

When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

I am a little lost with this. I found that the index can still be deleted
from a data node if the state was written but the node gets a new cluster state from a
master that does not have it, for example because it was restarted without data folder. Happens
if the data node does not get the initial cluster state from the new but a later one and state
persistence is not disabled.
I avoid this now by this: https://github.com/elasticsearch/elasticsearch/pull/9952/files#diff-f0f71bedb3d7e6f1cec54e8dddf5c3d3R109
but am worried about side effects this might have. Any feedback appreciated.

closes #8823

@kimchy kimchy commented on an outdated diff Mar 3, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -219,6 +217,22 @@ public void clusterChanged(ClusterChangedEvent event) {
// check and write changes in indices
for (IndexMetaData indexMetaData : newMetaData) {
+
+ boolean shardsAllocatedOnThisNodeInLastClusterState = true;
+ if (isDataOnlyNode(state)) {
+ boolean shardsCurrentlyAllocatedOnThisNode = shardsAllocatedOnLocalNode(state, indexMetaData);
+ shardsAllocatedOnThisNodeInLastClusterState = shardsAllocatedOnLocalNode(event.previousState(), indexMetaData);
+
+ if (shardsCurrentlyAllocatedOnThisNode == false) {
+ // remove the index state for this index if it is only a data node
+ // only delete if the last shard was removed
+ if (shardsAllocatedOnThisNodeInLastClusterState) {
+ removeIndexState(indexMetaData);
@kimchy
kimchy Mar 3, 2015 Member

I am not sure we need this removal logic, we already have that in IndicesClusterStateService#applyCleanedIndices, where an index is removed is no shards are around any more, its nice that it would be in a single place?

@kimchy
kimchy Mar 4, 2015 Member

ohh, I think I see what happens, in IndicesClusterStateService, we just remove the index and not delete it from the file system, which is good. In IndicesStore, only there we actually delete the shard content once the shard is allocated on all the other nodes. I think that we need to add a logic there that if the there are no more shards around, we need to also delete the index itself (which will delete its metadata).

@kimchy kimchy and 1 other commented on an outdated diff Mar 3, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -219,6 +217,22 @@ public void clusterChanged(ClusterChangedEvent event) {
// check and write changes in indices
for (IndexMetaData indexMetaData : newMetaData) {
+
+ boolean shardsAllocatedOnThisNodeInLastClusterState = true;
+ if (isDataOnlyNode(state)) {
@kimchy
kimchy Mar 3, 2015 Member

I think this logic can be simplified a lot if we don't need to deal with deletes? That continue block at the end is confusing to me?

@s1monw
s1monw Mar 4, 2015 Contributor

I don't think we need to check this everytime. we can just assign a boolean in the ctor if we need to write the state of not and if we need to write all states or not and centralize the decision. Then we can also remove the isDataOnlyNode method

@s1monw s1monw commented on an outdated diff Mar 4, 2015
...va/org/elasticsearch/gateway/MetaDataStateFormat.java
final DirectoryStream.Filter<Path> filter = new DirectoryStream.Filter<Path>() {
@Override
public boolean accept(Path entry) throws IOException {
final String entryFileName = entry.getFileName().toString();
return Files.isRegularFile(entry)
&& entryFileName.startsWith(prefix) // only state files
- && currentStateFile.equals(entryFileName) == false; // keep the current state file around
+ && entryFileName.equals(currentStateFile) == false; // keep the current state file around
@s1monw
s1monw Mar 4, 2015 Contributor

why is this change needed?

@s1monw s1monw commented on an outdated diff Mar 4, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -219,6 +217,22 @@ public void clusterChanged(ClusterChangedEvent event) {
// check and write changes in indices
for (IndexMetaData indexMetaData : newMetaData) {
+
+ boolean shardsAllocatedOnThisNodeInLastClusterState = true;
+ if (isDataOnlyNode(state)) {
+ boolean shardsCurrentlyAllocatedOnThisNode = shardsAllocatedOnLocalNode(state, indexMetaData);
@s1monw
s1monw Mar 4, 2015 Contributor

can we replace this entire block with just this:

 if (indicesService.hasIndex(indexMetaData.index())) {

}

and drop the entire delete logic here I think we can just fix the writing here. If we don't wipe the stuff correctly that's a different issue.

@s1monw s1monw and 2 others commented on an outdated diff Mar 4, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ for (IntObjectCursor it : indexRoutingTable.shards()) {
+ IndexShardRoutingTable shardRoutingTable = (IndexShardRoutingTable) it.value;
+ for (ShardRouting shardRouting : shardRoutingTable.shards()) {
+ if (shardRouting.currentNodeId() != null && shardRouting.currentNodeId().equals(state.nodes().localNode().getId())) {
+ shardsAllocatedOnThisNode = true;
+ }
+ }
+ }
+ return shardsAllocatedOnThisNode;
+ }
+
+ protected boolean isDataOnlyNode(ClusterState state) {
+ return ((state.nodes().localNode().masterNode() == false) && (state.nodes().localNode().dataNode() == true));
+ }
+
+ private void removeIndexState(IndexMetaData indexMetaData) {
@s1monw
s1monw Mar 4, 2015 Contributor

this method must go away. We do index cleaning in IndicesService once indices are deleted. If it's buggy please open a different issue

@kimchy
kimchy Mar 4, 2015 Member

@s1monw see my relevant comment for it above, I don't think we need this removal logic at all in this class? IndicesClusterStateService#applyCleanedIndices already removes an index is no shards are allocated on a node.

@brwe
brwe Mar 4, 2015 Contributor

I think it does not currently, only removes shards, the folder and meta data remain even if shards are allocated away. But I can change it to do this, would indeed simplify the code.

@s1monw
s1monw Mar 4, 2015 Contributor

yeah lets remove it and open a new issue if needed.

@s1monw s1monw and 1 other commented on an outdated diff Mar 4, 2015
.../java/org/elasticsearch/test/InternalTestCluster.java
@@ -569,7 +569,12 @@ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, Versio
assert Thread.holdsLock(this);
ensureOpen();
settings = getSettings(nodeId, seed, settings);
- String name = buildNodeName(nodeId);
+ String name;
+ if (settings.get("node.name") != null) {
@s1monw
s1monw Mar 4, 2015 Contributor

can we prevent this. I think we have to lock this naming thing down here and it should be owned by the test cluster

@brwe
brwe Mar 4, 2015 Contributor

It does make it easy to read the test and the logs if we have full control over naming. It is not often needed I guess but for the tests above I found it incredible helpful. What is the harm when we allow to provide custom names?

@s1monw
s1monw Mar 4, 2015 Contributor

yes I really want this to be predictable and deterministic. we have to prevent arbitrary naming here IMO

@s1monw s1monw commented on an outdated diff Mar 4, 2015
...va/org/elasticsearch/cluster/ClusterChangedEvent.java
@@ -165,4 +169,16 @@ public boolean nodesAdded() {
public boolean nodesChanged() {
return nodesRemoved() || nodesAdded();
}
+
+ public boolean newMaster() {
@s1monw
s1monw Mar 4, 2015 Contributor

can this be a private method?

@s1monw s1monw commented on an outdated diff Mar 4, 2015
...va/org/elasticsearch/cluster/ClusterChangedEvent.java
@@ -105,6 +106,9 @@ public boolean indexRoutingTableChanged(String index) {
* Returns the indices deleted in this event
*/
public List<String> indicesDeleted() {
+ if (newMaster()) {
+ return ImmutableList.of();
+ }
if (previousState == null) {
@s1monw
s1monw Mar 4, 2015 Contributor

can this be if (previousState == null || newMaster())?

@s1monw
Contributor
s1monw commented Mar 4, 2015

I left some comments...

@s1monw s1monw self-assigned this Mar 4, 2015
@s1monw s1monw added the review label Mar 4, 2015
@brwe
Contributor
brwe commented Mar 4, 2015

Made a pr for the deletion of index folders here: #9985 Should be easy to remove all the additional deletion code from this pr.

@brwe
Contributor
brwe commented Mar 5, 2015

#9985 is merged, I rebased on latest master and changed the code accordingly. I wanted to remove the change in ClusterStateEvent also because I was unable to reproduce the failures I'd seen before without it. But now I found that without the change the tests only pass on my Linux machine but fail every 10 iterations or so on my mac so something is still fishy. I'll try to come up with a detailed failure analysis tomorrow.

@brwe
Contributor
brwe commented Mar 6, 2015

I think I know what is going on now: The fresh master with the empty cluster state does (rarely) not send the first cluster state due to a race condition in lifecycles of DiscoveryService and its member Discovery. In DiscoveryService.doStart() the Discovery is started but the lifecyle for DiscoveryService is started only after that. This is why when the first cluster state reaches DiscoveryService.publish the lifecycle might or might not have started.

I added a commit d69f2cf where I removed the ClusterStateEvent change and added an artificial delay to the DiscoveryService.doStart() so that the tests fail reliably just so you can check if you want.

I would suggest we remove the ClusterStateEvent workaround and open another issue for this because this behavior is not a result of this pull request.

@s1monw
Contributor
s1monw commented Mar 6, 2015

I agree with your idea of opening a new issue for the ClusterSTateEvent problem

@brwe
Contributor
brwe commented Mar 10, 2015

Chatted with @s1monw and now rewrote it so that the selection of what to write is not done in GatewayMetaState anymore. I tried to do it similar to #10016. It is still a little raw but but would be great if you could let me know if this is the right direction.

@s1monw s1monw commented on an outdated diff Mar 12, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -90,6 +94,26 @@ public MetaData loadMetaState() throws Exception {
return metaStateService.loadFullState();
}
+ public static class IndexMetaWriteInfo {
+ IndexMetaData newMetaData;
@s1monw
s1monw Mar 12, 2015 Contributor

can they all be package private and final please? just like a struct

@s1monw
s1monw Mar 12, 2015 Contributor

also you might wanna put this to the end of the file

@s1monw s1monw commented on an outdated diff Mar 12, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
- }
- } else {
- currentIndexMetaData = currentMetaData.index(indexMetaData.index());
- }
- if (currentIndexMetaData == null) {
- writeReason = "freshly created";
- } else if (currentIndexMetaData.version() != indexMetaData.version()) {
- writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
- }
-
- // we update the writeReason only if we really need to write it
- if (writeReason == null) {
- continue;
- }
+ Iterable<IndexMetaWriteInfo> writeInfo = new ArrayList<>();
+ if (isDataOnlyNode(event.state())) {
@s1monw
s1monw Mar 12, 2015 Contributor

can we maybe do this like this:

final Iterable<IndexMetaWriteInfo> writeInfo;
if (a) {

} else if (b) {

} else {
  writeInfo = Collections.emptyList();
}
@s1monw s1monw commented on an outdated diff Mar 12, 2015
...in/java/org/elasticsearch/gateway/IndexMetaState.java
+
+ }
+ return indicesToWrite.values();
+ }
+
+ protected IndexMetaData maybeLoadIndexState(MetaData currentMetaData, IndexMetaData indexMetaData) {
+ IndexMetaData currentIndexMetaData = null;
+ if (currentMetaData != null) {
+ currentIndexMetaData = currentMetaData.index(indexMetaData.index());
+ } else {
+ try {
+ currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index());
+ } catch (IOException e) {
+ logger.debug("failed to load index state ", e);
+ }
+ } return currentIndexMetaData;
@s1monw
s1monw Mar 12, 2015 Contributor

extra newline here after }

@s1monw s1monw commented on an outdated diff Mar 12, 2015
...in/java/org/elasticsearch/gateway/IndexMetaState.java
+ indicesToWrite.put(shardRouting.index(),
+ new GatewayMetaState.IndexMetaWriteInfo(indexMetaData, currentIndexMetaData,
+ writeReason));
+ }
+ }
+ return indicesToWrite.values();
+ }
+ public Iterable<GatewayMetaState.IndexMetaWriteInfo> getIndicesToWriteMasterNode(ClusterChangedEvent event, MetaData currentMetaData) {
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ MetaData newMetaData = event.state().metaData();
+ // iterate over all indices but only write if ...
+ for (IndexMetaData indexMetaData : newMetaData) {
+ String writeReason = null;
+ IndexMetaData currentIndexMetaData = maybeLoadIndexState(currentMetaData, indexMetaData);
+ // ... new index or state persistence was disabled?
+ if ( currentIndexMetaData == null) {
@s1monw
s1monw Mar 12, 2015 Contributor

extra space after (

@s1monw s1monw commented on an outdated diff Mar 12, 2015
...in/java/org/elasticsearch/gateway/IndexMetaState.java
+ }
+ public Iterable<GatewayMetaState.IndexMetaWriteInfo> getIndicesToWriteMasterNode(ClusterChangedEvent event, MetaData currentMetaData) {
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ MetaData newMetaData = event.state().metaData();
+ // iterate over all indices but only write if ...
+ for (IndexMetaData indexMetaData : newMetaData) {
+ String writeReason = null;
+ IndexMetaData currentIndexMetaData = maybeLoadIndexState(currentMetaData, indexMetaData);
+ // ... new index or state persistence was disabled?
+ if ( currentIndexMetaData == null) {
+ writeReason = "freshly created";
+ // ... version changed
+ } else if (currentIndexMetaData.version() != indexMetaData.version()) {
+ writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
+ }
+ if (writeReason!=null) {
@s1monw
s1monw Mar 12, 2015 Contributor

spaces needed before and after !=

@s1monw s1monw commented on an outdated diff Mar 12, 2015
...in/java/org/elasticsearch/gateway/IndexMetaState.java
+
+ private final NodeEnvironment nodeEnv;
+ private final MetaStateService metaStateService;
+
+
+ @Inject
+ public IndexMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService) {
+ super(settings);
+ this.nodeEnv = nodeEnv;
+ this.metaStateService = metaStateService;
+ }
+
+ public Iterable<GatewayMetaState.IndexMetaWriteInfo> getIndicesToWriteDataOnlyNode(ClusterChangedEvent event, MetaData currentMetaData) {
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ RoutingNode thisNode = event.state().getRoutingNodes().node(event.state().nodes().localNodeId());
+ if ( thisNode == null) {
@s1monw
s1monw Mar 12, 2015 Contributor

space after (?

@s1monw s1monw commented on an outdated diff Mar 12, 2015
...in/java/org/elasticsearch/gateway/IndexMetaState.java
+ * them into the cluster.
+ */
+public class IndexMetaState extends AbstractComponent {
+
+ private final NodeEnvironment nodeEnv;
+ private final MetaStateService metaStateService;
+
+
+ @Inject
+ public IndexMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService) {
+ super(settings);
+ this.nodeEnv = nodeEnv;
+ this.metaStateService = metaStateService;
+ }
+
+ public Iterable<GatewayMetaState.IndexMetaWriteInfo> getIndicesToWriteDataOnlyNode(ClusterChangedEvent event, MetaData currentMetaData) {
@s1monw
s1monw Mar 12, 2015 Contributor

can we have some javadoc here? also the name is pretty long maybe we can rename to something like filterStatesOnMaster / filterStateOnDataNode

@s1monw s1monw and 1 other commented on an outdated diff Mar 12, 2015
...in/java/org/elasticsearch/gateway/IndexMetaState.java
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.env.NodeEnvironment;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The dangling indices state is responsible for finding new dangling indices (indices that have
+ * their state written on disk, but don't exists in the metadata of the cluster), and importing
+ * them into the cluster.
+ */
+public class IndexMetaState extends AbstractComponent {
@s1monw
s1monw Mar 12, 2015 Contributor

I think this file can just be merged into GatewayMetaState I don't think those two methods justify a new component?

@brwe
brwe Mar 15, 2015 Contributor

I thought it might be easier to test? I could write a single node test instead but this way it seemed easier to me. I have no strong feelings about it though so you decide.

@s1monw s1monw commented on an outdated diff Mar 12, 2015
.../org/elasticsearch/gateway/GatewayMetaStateTests.java
+ if (masterEligible) {
+ indices = indexMetaState.getIndicesToWriteMasterNode(event, inMemoryMetaData).iterator();
+ } else {
+ indices = indexMetaState.getIndicesToWriteDataOnlyNode(event, inMemoryMetaData).iterator();
+ }
+ if (expectMetaData) {
+ assertThat(indices.hasNext(), equalTo(true));
+ assertThat(indices.next().getNewMetaData().index(), equalTo("test"));
+ assertThat(indices.hasNext(), equalTo(false));
+ } else {
+ assertThat(indices.hasNext(), equalTo(false));
+ }
+ }
+
+ @Test
+ public void testAllCombinations() throws Exception {
@s1monw
s1monw Mar 12, 2015 Contributor

can we have a test for each of these names after what they are doing? that would be very helpful I think! I am ok with duplicating code here and there but these unittests should be more readable

@s1monw
Contributor
s1monw commented Mar 12, 2015

I left a bunch of comments

@brwe brwe commented on the diff Mar 17, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -114,33 +113,19 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
- // check and write changes in indices
- for (IndexMetaData indexMetaData : newMetaData) {
- String writeReason = null;
- IndexMetaData currentIndexMetaData;
- if (currentMetaData == null) {
- // a new event..., check from the state stored
- try {
- currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index());
@brwe
brwe Mar 17, 2015 Contributor

Could we skip this loading from disk? I made the change so that we still load (see maybeLoadIndexState() below) if the meta data is not in memory but fail to understand why we do it. Is it an optimization?

@s1monw
s1monw Mar 18, 2015 Contributor

as far as I am concerned we could just skip it since we write the new one anyway. Yet, we might have a new meta on disk but that would be a bug too no? so I think we can drop it?

@brwe
brwe Mar 18, 2015 Contributor

Ok, removed this check now.

@brwe brwe and 1 other commented on an outdated diff Mar 17, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStateOnDataNode(ClusterChangedEvent event, MetaData currentMetaData, MetaStateService metaStateService, ESLogger logger) {
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ RoutingNode thisNode = event.state().getRoutingNodes().node(event.state().nodes().localNodeId());
+ if (thisNode == null) {
+ // this needs some other handling
+ return indicesToWrite.values();
+ }
+ // iterate over all shards allocated on this node in the new cluster state but only write if ...
+ for (MutableShardRouting shardRouting : thisNode) {
+ IndexMetaData indexMetaData = event.state().metaData().index(shardRouting.index());
+ IndexMetaData currentIndexMetaData = maybeLoadIndexState(currentMetaData, indexMetaData, metaStateService, logger);
+ String writeReason = null;
+ // ... state persistence was disabled or index was newly created
+ if (currentIndexMetaData == null) {
+ writeReason = "freshly created";
+ // ... new shard is allocated on node (we could optimize here and make sure only written once and not for each shard per index -> do later)
@brwe
brwe Mar 17, 2015 Contributor

Not sure if we should check here if we wrote the state already.

@s1monw
s1monw Mar 18, 2015 Contributor

I think if you move the version check above this we only do it once?

@brwe
brwe Mar 18, 2015 Contributor

cannot do that. version is incremented when mapping changes or setting etc, not when relocated. We need to somehow check for initializing.

@brwe
brwe Mar 19, 2015 Contributor

see below. check now if shard was allocated on node in previous cluster state and if not do not write.

@brwe
Contributor
brwe commented Mar 17, 2015

Addressed all comments. I am unsure about two things, left comments above about it: We did check on disk if we have the state already before writing in case the in memory state is null. However, this should be a rare event so I am unsure if we need this optimization. In addition, I made it so that whenever a shard is initializing on a node we write the meta state of this index and perform no check if we wrote before already. Should I check instead?

@s1monw s1monw commented on an outdated diff Mar 18, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -154,6 +139,14 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
+ protected boolean isDataOnlyNode(ClusterState state) {
+ return ((isMasterEligibleNode(state) == false) && (state.nodes().localNode().dataNode() == true));
+ }
+
+ protected boolean isMasterEligibleNode(ClusterState state) {
+ return state.nodes().localNode().masterNode() == true;
@s1monw
s1monw Mar 18, 2015 Contributor

the == true is obsolete

@s1monw s1monw commented on an outdated diff Mar 18, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -154,6 +139,14 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
+ protected boolean isDataOnlyNode(ClusterState state) {
+ return ((isMasterEligibleNode(state) == false) && (state.nodes().localNode().dataNode() == true));
@s1monw
s1monw Mar 18, 2015 Contributor

the == true is obsolete

@s1monw s1monw and 1 other commented on an outdated diff Mar 18, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ writeReason));
+ }
+
+ }
+ return indicesToWrite.values();
+ }
+
+ protected static IndexMetaData maybeLoadIndexState(MetaData currentMetaData, IndexMetaData indexMetaData, MetaStateService metaStateService, ESLogger logger) {
+ IndexMetaData currentIndexMetaData = null;
+ if (currentMetaData != null) {
+ currentIndexMetaData = currentMetaData.index(indexMetaData.index());
+ } else {
+ try {
+ currentIndexMetaData = metaStateService.loadIndexState(indexMetaData.index());
+ } catch (IOException e) {
+ logger.debug("failed to load index state ", e);
@s1monw
s1monw Mar 18, 2015 Contributor

can we put a TODO here to maybe bubble this Exception up in the future it seems risky to just swallow it....

@brwe
brwe Mar 18, 2015 Contributor

if I remove loading the state this method will go away anyway

@s1monw s1monw commented on an outdated diff Mar 18, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they "
+ "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE);
}
}
+
+ /**
+ * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
+ * Each index state that should be written to disk will be returned. This is only run for data only nodes.
+ * It will return only the states for indices that actually have a shard allocated on the current node.
+ *
+ * @param event the cluster state event from which we figure out what is new in each index and should potentially be written
+ * @param currentMetaData the current index state in memory.
+ * @return iterable over all indices states that should be written to disk
+ */
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStateOnDataNode(ClusterChangedEvent event, MetaData currentMetaData, MetaStateService metaStateService, ESLogger logger) {
@s1monw
s1monw Mar 18, 2015 Contributor

since we only use the event().state() can we just pass the state to it instead?

@s1monw s1monw commented on an outdated diff Mar 18, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ writeReason));
+ }
+ }
+ return indicesToWrite.values();
+ }
+
+ /**
+ * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
+ * Each index state that is part of the new cluster state will be considered even if this node has no shard of the
+ * index allocated on it. This is only run for master nodes.
+ *
+ * @param event the cluster state event from which we figure out what is new in each index and should potentially be written
+ * @param currentMetaData the current index state in memory.
+ * @return iterable over all indices states that should be written to disk
+ */
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStatesOnMaster(ClusterChangedEvent event, MetaData currentMetaData, MetaStateService metaStateService, ESLogger logger) {
@s1monw
s1monw Mar 18, 2015 Contributor

since we only use the event().state() can we just pass the state to it instead?

@s1monw
Contributor
s1monw commented Mar 18, 2015

I like this a lot - left some minor comments

@brwe
Contributor
brwe commented Mar 18, 2015

Addressed all comments except for the version check thing above. We cannot use index version to ensure that the state is written because the version is not necessarily updated for the index metadata if shards are relocated. But shards should initialize too often on a node so it might be ok if we write the same state each time we see an initializing shard?

@brwe
Contributor
brwe commented Mar 19, 2015

Chatted with @s1monw who reminded me that shards can be in initializing state for a while and while they are we would always write, so that is not a good idea. Will now instead check in the event if the last state had shards allocated and if not then write if the new one has.

@brwe
Contributor
brwe commented Mar 19, 2015

pushed a new commit to address this. we now check if a shard was already present in last cluster state and if so not write.

@s1monw
Contributor
s1monw commented Mar 19, 2015

it LGTM I think @bleskes should take one more look

@brwe brwe assigned bleskes and unassigned s1monw Mar 26, 2015
@bleskes bleskes commented on the diff Mar 30, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -72,7 +71,7 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
nodeEnv.ensureAtomicMoveSupported();
}
- if (DiscoveryNode.masterNode(settings)) {
+ if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
@bleskes
bleskes Mar 30, 2015 Member

Wondering until when do we need the BWC bellow? (not saying remove now)

@brwe
brwe Mar 30, 2015 Contributor

The pre20Upgrade() we should still do because this change is going to 1.x as well. But we probably need not check on data nodes for pre 019 state. Would it harm to keep this check or should I explicitly only check pre 019 on master nodes and pre 20 on master and data nodes?

@bleskes bleskes commented on an outdated diff Mar 30, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ * Each index state that should be written to disk will be returned. This is only run for data only nodes.
+ * It will return only the states for indices that actually have a shard allocated on the current node.
+ *
+ * @param event the cluster state event from which we figure out what is new in each index and should potentially be written
+ * @param currentMetaData the current index state in memory.
+ * @return iterable over all indices states that should be written to disk
+ */
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStateOnDataNode(ClusterChangedEvent event, MetaData currentMetaData) {
+ ClusterState newState = event.state();
+ ClusterState previousState = event.previousState();
+
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ RoutingNode newRoutingNode = newState.getRoutingNodes().node(newState.nodes().localNodeId());
+ RoutingNode previousRoutingNode = previousState.getRoutingNodes().node(newState.nodes().localNodeId());
+ if (newRoutingNode == null) {
+ // this needs some other handling
@bleskes
bleskes Mar 30, 2015 Member

I think this is correct and we need an exception here. It means the routing node is not consistent or we have gotten here while the current node is not part of the CS, which is also broken.

@bleskes bleskes commented on an outdated diff Mar 30, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ * @param event the cluster state event from which we figure out what is new in each index and should potentially be written
+ * @param currentMetaData the current index state in memory.
+ * @return iterable over all indices states that should be written to disk
+ */
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStateOnDataNode(ClusterChangedEvent event, MetaData currentMetaData) {
+ ClusterState newState = event.state();
+ ClusterState previousState = event.previousState();
+
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ RoutingNode newRoutingNode = newState.getRoutingNodes().node(newState.nodes().localNodeId());
+ RoutingNode previousRoutingNode = previousState.getRoutingNodes().node(newState.nodes().localNodeId());
+ if (newRoutingNode == null) {
+ // this needs some other handling
+ return indicesToWrite.values();
+ }
+ // iterate over all shards allocated on this node in the new cluster state but only write if ...
@bleskes
bleskes Mar 30, 2015 Member

only write if ... , what?

@bleskes
bleskes Mar 30, 2015 Member

Oh. I see. Nevermind :)

@bleskes bleskes commented on an outdated diff Mar 30, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ /**
+ * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
+ * Each index state that should be written to disk will be returned. This is only run for data only nodes.
+ * It will return only the states for indices that actually have a shard allocated on the current node.
+ *
+ * @param event the cluster state event from which we figure out what is new in each index and should potentially be written
+ * @param currentMetaData the current index state in memory.
+ * @return iterable over all indices states that should be written to disk
+ */
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStateOnDataNode(ClusterChangedEvent event, MetaData currentMetaData) {
+ ClusterState newState = event.state();
+ ClusterState previousState = event.previousState();
+
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ RoutingNode newRoutingNode = newState.getRoutingNodes().node(newState.nodes().localNodeId());
+ RoutingNode previousRoutingNode = previousState.getRoutingNodes().node(newState.nodes().localNodeId());
@bleskes
bleskes Mar 30, 2015 Member

this one should go away. I see why it's here (because we only cache the currentMetaData which doesn't tell us anything about allocation) but now we have too ways of deciding what to do . One based on last written state and the other based on the last CS in the event. This is an issue. Can we cache a set of last written indices next to currentMetaData and use that?

@bleskes bleskes commented on an outdated diff Mar 30, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ writeReason = "newly allocated on node";
+ // ... version changed
+ } else if (indexMetaData.version() != currentIndexMetaData.version()) {
+ writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
+ }
+ if (writeReason != null) {
+ indicesToWrite.put(shardRouting.index(),
+ new GatewayMetaState.IndexMetaWriteInfo(indexMetaData, currentIndexMetaData,
+ writeReason));
+ }
+ }
+ return indicesToWrite.values();
+ }
+
+ public static boolean shardsOfThisIndexExistedOnNodeAlready(RoutingNode previousRoutingNode, MutableShardRouting shardRouting) {
+ return previousRoutingNode.shardsWithState(shardRouting.index(), ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size() != 0;
@bleskes
bleskes Mar 30, 2015 Member

this is tricky as it misses ShardRoutingState#RELOCATING. Can we add a method that just returns all the shards of an index? or alternatively, if we follow my previously suggestion of caching written indices - just check the new index if it is not part of the written indices set.

@bleskes bleskes commented on an outdated diff Mar 30, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ }
+
+ /**
+ * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
+ * Each index state that is part of the new cluster state will be considered even if this node has no shard of the
+ * index allocated on it. This is only run for master nodes.
+ *
+ * @param state the cluster state from which we figure out what is new in each index and should potentially be written
+ * @param currentMetaData the current index state in memory.
+ * @return iterable over all indices states that should be written to disk
+ */
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStatesOnMaster(ClusterState state, MetaData currentMetaData) {
+ Map<String, GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new HashMap<>();
+ MetaData newMetaData = state.metaData();
+ // iterate over all indices but only write if ...
+ for (IndexMetaData indexMetaData : newMetaData) {
@bleskes
bleskes Mar 30, 2015 Member

I think we can merge this method body with the different one, no? the only difference is which indices we begin with (all of them or just those allocated to this node).

@bleskes bleskes and 1 other commented on an outdated diff Mar 30, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
- } else {
- currentIndexMetaData = currentMetaData.index(indexMetaData.index());
- }
- if (currentIndexMetaData == null) {
- writeReason = "freshly created";
- } else if (currentIndexMetaData.version() != indexMetaData.version()) {
- writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
- }
-
- // we update the writeReason only if we really need to write it
- if (writeReason == null) {
- continue;
- }
+ Iterable<IndexMetaWriteInfo> writeInfo;
+ if (isDataOnlyNode(event.state())) {
+ writeInfo = filterStateOnDataNode(event, currentMetaData);
@bleskes
bleskes Mar 30, 2015 Member

I know this was already there, but can we rename currentMetaData to previouslyWrittenMetaData ? I think it will be easy to understand what's what here..

@brwe
brwe Mar 30, 2015 Contributor

Agree, the name is confusing. But the meta data is not always written, so maybe call it previousMetaData?

@bleskes
Member
bleskes commented Mar 30, 2015

Looks good in general. Left some comments that I think will simplify things.

@brwe
Contributor
brwe commented Apr 1, 2015

Implemented all suggestions.
@bleskes thanks for the tip with the indices list caching, it simplifies think indeed! Please take another look.

@bleskes bleskes commented on an outdated diff Apr 7, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
@@ -57,7 +57,9 @@
private final DanglingIndicesState danglingIndicesState;
@Nullable
- private volatile MetaData currentMetaData;
+ private volatile MetaData previousMetaData;
+
+ private final Set<String> previouslyWrittenIndices = Collections.synchronizedSet(new HashSet<String>());
@bleskes
bleskes Apr 7, 2015 Member

can we change this to volatile and use an ImmutableSet when we assign it (the members shouldn't be changed)

@bleskes bleskes commented on an outdated diff Apr 28, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
}
}
+ public static Set<String> getRelevantIndices(ClusterState state, Set<String> previouslyWrittenIndices) {
@bleskes
bleskes Apr 28, 2015 Member

can we add some java docs about what relevant indices mean?

@bleskes bleskes commented on an outdated diff Apr 28, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ }
+ }
+ return indicesToWrite;
+ }
+
+ public static Set<String> getRelevantIndicesOnDataOnlyNode(ClusterState state, Set<String> previouslyWrittenIndices) {
+ RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId());
+ if (newRoutingNode == null) {
+ throw new ElasticsearchIllegalStateException("cluster state does not contain this node - cannot write index meta state");
+ }
+ Set<String> indices = new HashSet<>();
+ for (MutableShardRouting routing : newRoutingNode) {
+ indices.add(routing.index());
+ }
+ // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
+ for (ObjectCursor<String> index : state.metaData().indices().keys()) {
@bleskes
bleskes Apr 28, 2015 Member

we can iterate on the state.metaData() it self. will make this loop slightly simpler

@bleskes bleskes commented on an outdated diff Apr 28, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
+ indices.add(routing.index());
+ }
+ // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
+ for (ObjectCursor<String> index : state.metaData().indices().keys()) {
+ if (previouslyWrittenIndices.contains(index.value) && state.metaData().getIndices().get(index.value).state().equals(IndexMetaData.State.CLOSE)) {
+ indices.add(index.value);
+ }
+ }
+ return indices;
+ }
+
+ public static Set<String> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
+ Set<String> relevantIndices;
+ relevantIndices = new HashSet<>();
+ // we have to iterate over the metadata to make sure we also capture closed indices
+ for (ObjectCursor<String> index : state.metaData().indices().keys()) {
@bleskes
bleskes Apr 28, 2015 Member

same here no need to use the indices.keys()

@bleskes bleskes commented on an outdated diff Apr 28, 2015
.../java/org/elasticsearch/gateway/GatewayMetaState.java
logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they "
+ "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE);
}
}
+
+ /**
+ * Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
+ * Each index state that should be written to disk will be returned. This is only run for data only nodes.
+ * It will return only the states for indices that actually have a shard allocated on the current node.
+ *
+ * @param previouslyWrittenIndices A list of indices for which the state was already written before
+ * @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written
+ * @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now
+ * @param newMetaData The new metadata
+ * @return iterable over all indices states that should be written to disk
+ */
+ public static Iterable<GatewayMetaState.IndexMetaWriteInfo> filterStates(Set<String> previouslyWrittenIndices, Set<String> potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) {
@bleskes
bleskes Apr 28, 2015 Member

can we call this resolveStatesToBeWritten?

@bleskes
Member
bleskes commented Apr 28, 2015

Left some very minor comments. Feel free to push without another review. LGTM!

brwe added some commits Mar 2, 2015
@brwe brwe Write state also on data nodes if not master eligible
When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

closes #8823
647eb22
@brwe brwe folder is deleted now, no need to clean up metadata 0f5afcd
@brwe brwe Revert changes to InternalTestCluster aa66bb9
@brwe brwe add artifial delay to start in DiscoveryService and remove ClusterCha…
…ngeEvent workaround

When a node starts up and elects itself as master then the first cluster
state might not be sent to the other nodes because
- discovery starts
- node elects itself and applies cluster state 1 locally
- does not send it to the other nodes because the lifecycle is not started
  yet in DiscoveryService.publish() (that is onlty set after doStart() is called)

As a result the data node only receives the second cluster state. State
persistence is not disabled. If the cluster state does not contain the index
(for example because the the master was started with empty data folder and
does not contain old cluster state) then the data node interprets the fact that the index
is missing from the new clsuter state as a delete and so deletes all shards.
dd28a5d
@brwe brwe unit test for data only node write and refactor similar to #10016 a3a9831
@brwe brwe remove artificial delay 39359a6
@brwe brwe format a72a297
@brwe brwe make protected and final and move to end of file c680f84
@brwe brwe empty list if not maser or dta d15e732
@brwe brwe rename c461fe6
@brwe brwe cleanup 18cf4e1
@brwe brwe add java docs 4e25695
@brwe brwe move to GatewayMetaState 8652750
@brwe brwe cleanup tests a little 8b58004
@brwe brwe more cleanup a792ad6
@brwe brwe cleanup a4baa80
@brwe brwe remove check state on disk. 9c1df62
@brwe brwe check version first in order to avoid writing several times for each …
…initializing index
c394971
@brwe brwe remove superfluous == true/false e7994b3
@brwe brwe pass ClusterState instead of ClusterStateEvent bf906fe
@brwe brwe Revert "check version first in order to avoid writing several times f…
…or each initializing index"

This reverts commit 1a45c8b.
32dc21a
@brwe brwe only write if the shard is actually new and there were no shards allo…
…cated previously
82505cf
@brwe brwe better name for in memory meta data aeedb29
@brwe brwe maintain list of indices that we wrote
also, deal with closed indices also on data nodes
92d1f40
@brwe brwe add comments ab90dbe
@brwe brwe exception if cluster state inconsistent 5770828
@brwe brwe check on disk if there is a shard written already for a closed index f88e821
@brwe brwe Set -> ImmutableSet 7c44299
@brwe brwe simplify iteration 5cb39b8
@brwe brwe rename 7569a51
@brwe brwe added a commit that referenced this pull request Apr 29, 2015
@brwe brwe Write state also on data nodes if not master eligible
When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

closes #8823
closes #9952
c3a1729
@brwe brwe added a commit that closed this pull request Apr 29, 2015
@brwe brwe Write state also on data nodes if not master eligible
When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

closes #8823
closes #9952
4088dd3
@brwe brwe closed this in 4088dd3 Apr 29, 2015
@kevinkluge kevinkluge removed the review label Apr 29, 2015
@brwe
Contributor
brwe commented Apr 29, 2015

need to investigate #10017 before we can push

@brwe brwe reopened this Apr 29, 2015
@brwe
Contributor
brwe commented Apr 30, 2015

The reason why the tests failed on CI is the same I described in the beginning #9952 (comment) : a data node receives a new cluster state from a master that does not have the index in its state but the data node missed the state with a no master block before and so state persistence was not disabled. the fact that an index is not in the cluster state is then interpreted as delete command. This can happen here for the reasons described in #10017 but there might be other reasons as well. I now think we should not delete indices at all if the cluster state that would cause a deletion comes from a new master.
I added a new commit for this but need someone to confirm that this is actually the right solution.

@s1monw

can we name this hasNewMaster and document it?

@s1monw

can we say newMaster() || previousState == null

@s1monw
Contributor
s1monw commented May 4, 2015

I added a new commit for this but need someone to confirm that this is actually the right solution.

+1 to the solution

@brwe
Contributor
brwe commented May 4, 2015

Chatted with @kimchy and we decided to push as is and add a //norelease comment and open an issue because the short term fix for the problem (#9952 (comment)) is not very elegant.
Added another commit to address the latest comments.

@s1monw
Contributor
s1monw commented May 5, 2015

LGTM

@brwe brwe added a commit that closed this pull request May 5, 2015
@brwe brwe Write state also on data nodes if not master eligible
When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

closes #8823
closes #9952
3cda9b2
@brwe brwe closed this in 3cda9b2 May 5, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment