From e19db21d9c1c62a5fe62d0c5d540704ba21dc6c3 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 11 Dec 2013 16:05:17 +0100 Subject: [PATCH] Added new IndicesLifecycle.Listener method that allows to listen for any IndexShardState internal change. Closes #4413 --- .../index/service/InternalIndexService.java | 1 + .../shard/service/InternalIndexShard.java | 36 ++--- .../indices/IndicesLifecycle.java | 14 ++ .../indices/InternalIndicesLifecycle.java | 11 ++ .../IndicesLifecycleListenerTests.java | 127 ++++++++++++++++++ 5 files changed, 173 insertions(+), 16 deletions(-) create mode 100644 src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java diff --git a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 5e63e25e3629b..26dcb24ee28bb 100644 --- a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -351,6 +351,7 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticSearchExc IndexShard indexShard = shardInjector.getInstance(IndexShard.class); + indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); indicesLifecycle.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 1eaf50f6d32d9..3e841c1775eea 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -289,8 +289,7 @@ public InternalIndexShard routingEntry(ShardRouting newRouting) { synchronized (mutex) { // do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY if (state == IndexShardState.POST_RECOVERY) { - logger.debug("state: [{}]->[{}], reason [global state is [{}]]", state, IndexShardState.STARTED, newRouting.state()); - state = IndexShardState.STARTED; + changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); movedToStarted = true; } else { logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state()); @@ -314,7 +313,6 @@ public InternalIndexShard routingEntry(ShardRouting newRouting) { public IndexShardState recovering(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { synchronized (mutex) { - IndexShardState returnValue = state; if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); } @@ -330,9 +328,7 @@ public IndexShardState recovering(String reason) throws IndexShardStartedExcepti if (state == IndexShardState.POST_RECOVERY) { throw new IndexShardRecoveringException(shardId); } - logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RECOVERING, reason); - state = IndexShardState.RECOVERING; - return returnValue; + return changeState(IndexShardState.RECOVERING, reason); } } @@ -341,8 +337,7 @@ public InternalIndexShard relocated(String reason) throws IndexShardNotStartedEx if (state != IndexShardState.STARTED) { throw new IndexShardNotStartedException(shardId, state); } - logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RELOCATED, reason); - state = IndexShardState.RELOCATED; + changeState(IndexShardState.RELOCATED, reason); } return this; } @@ -352,6 +347,20 @@ public IndexShardState state() { return state; } + /** + * Changes the state of the current shard + * @param newState the new shard state + * @param reason the reason for the state change + * @return the previous shard state + */ + private IndexShardState changeState(IndexShardState newState, String reason) { + logger.debug("state: [{}]->[{}], reason [{}]", state, newState, reason); + IndexShardState previousState = state; + state = newState; + this.indicesLifecycle.indexShardStateChanged(this, previousState, reason); + return previousState; + } + @Override public Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException { long startTime = System.nanoTime(); @@ -653,10 +662,7 @@ public void close(String reason) { mergeScheduleFuture = null; } } - if (logger.isDebugEnabled()) { - logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason); - } - state = IndexShardState.CLOSED; + changeState(IndexShardState.CLOSED, reason); } } @@ -681,8 +687,7 @@ public InternalIndexShard postRecovery(String reason) throws IndexShardStartedEx } engine.start(); startScheduledTasksIfNeeded(); - logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.POST_RECOVERY, reason); - state = IndexShardState.POST_RECOVERY; + changeState(IndexShardState.POST_RECOVERY, reason); } indicesLifecycle.afterIndexShardPostRecovery(this); return this; @@ -725,8 +730,7 @@ public void performRecoveryFinalization(boolean withFlush) throws ElasticSearchE translog.clearUnreferenced(); engine.refresh(new Engine.Refresh("recovery_finalization").force(true)); synchronized (mutex) { - logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.POST_RECOVERY); - state = IndexShardState.POST_RECOVERY; + changeState(IndexShardState.POST_RECOVERY, "post recovery"); } indicesLifecycle.afterIndexShardPostRecovery(this); startScheduledTasksIfNeeded(); diff --git a/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java b/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java index 9c95dd56133d6..978ac25a7777c 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java +++ b/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; @@ -132,6 +133,19 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh public void afterIndexShardClosed(ShardId shardId) { } + + /** + * Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes. + * The order of concurrent events is preserved. The execution must be lightweight. + * + * @param indexShard the shard the new state was applied to + * @param previousState the previous index shard state if there was one, null otherwise + * @param currentState the new shard state + * @param reason the reason for the state change if there is one, null otherwise + */ + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { + + } } } diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java b/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java index b196f9ed1e28a..b933d87853b72 100644 --- a/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; @@ -160,4 +161,14 @@ public void afterIndexShardClosed(ShardId shardId) { } } } + + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, @Nullable String reason) { + for (Listener listener : listeners) { + try { + listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason); + } catch (Throwable t) { + logger.warn("{} failed to invoke index shard state changed callback", t, indexShard.shardId()); + } + } + } } diff --git a/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java b/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java new file mode 100644 index 0000000000000..9db1d0b367b24 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java @@ -0,0 +1,127 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.indices; + +import com.google.common.collect.Maps; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION; +import static org.elasticsearch.common.settings.ImmutableSettings.builder; +import static org.elasticsearch.index.shard.IndexShardState.*; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; + +@ClusterScope(scope = Scope.TEST, numNodes = 0) +public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest { + + @Test + public void testIndexStateShardChanged() throws Throwable { + + //start with a single node + String node1 = cluster().startNode(); + IndexShardStateChangeListener stateChangeListenerNode1 = new IndexShardStateChangeListener(); + //add a listener that keeps track of the shard state changes + cluster().getInstance(IndicesLifecycle.class, node1).addListener(stateChangeListenerNode1); + + //create an index + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(SETTING_NUMBER_OF_SHARDS, 6, SETTING_NUMBER_OF_REPLICAS, 0)); + ensureGreen(); + + //new shards got started + assertShardStatesMatch(stateChangeListenerNode1, 6, CREATED, RECOVERING, POST_RECOVERY, STARTED); + + + //add a node: 3 out of the 6 shards will be relocated to it + //disable allocation before starting a new node, as we need to register the listener first + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(builder().put(CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true))); + String node2 = cluster().startNode(); + IndexShardStateChangeListener stateChangeListenerNode2 = new IndexShardStateChangeListener(); + //add a listener that keeps track of the shard state changes + cluster().getInstance(IndicesLifecycle.class, node2).addListener(stateChangeListenerNode2); + //re-enable allocation + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(builder().put(CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, false))); + ensureGreen(); + + //the 3 relocated shards get closed on the first node + assertShardStatesMatch(stateChangeListenerNode1, 3, CLOSED); + //the 3 relocated shards get created on the second node + assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED); + + + //increase replicas from 0 to 1 + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(builder().put(SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(); + + //3 replicas are allocated to the first node + assertShardStatesMatch(stateChangeListenerNode1, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED); + + //3 replicas are allocated to the second node + assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED); + + + //close the index + assertAcked(client().admin().indices().prepareClose("test")); + + assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED); + assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED); + } + + private static void assertShardStatesMatch(IndexShardStateChangeListener stateChangeListener, int numShards, IndexShardState... shardStates) { + assertThat(stateChangeListener.shardStates.size(), equalTo(numShards)); + for (List indexShardStates : stateChangeListener.shardStates.values()) { + assertThat(indexShardStates, notNullValue()); + assertThat(indexShardStates.size(), equalTo(shardStates.length)); + for (int i = 0; i < shardStates.length; i++) { + assertThat(indexShardStates.get(i), equalTo(shardStates[i])); + } + } + stateChangeListener.shardStates.clear(); + } + + private static class IndexShardStateChangeListener extends IndicesLifecycle.Listener { + //we keep track of all the states (ordered) a shard goes through + final ConcurrentMap> shardStates = Maps.newConcurrentMap(); + + @Override + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState newState, @Nullable String reason) { + List shardStates = this.shardStates.putIfAbsent(indexShard.shardId(), + new CopyOnWriteArrayList(new IndexShardState[]{newState})); + if (shardStates != null) { + shardStates.add(newState); + } + } + } +}