Skip to content

Commit

Permalink
Added new IndicesLifecycle.Listener method that allows to listen for …
Browse files Browse the repository at this point in the history
…any IndexShardState internal change.

Closes elastic#4413
  • Loading branch information
javanna authored and brusic committed Jan 19, 2014
1 parent cc0855a commit e19db21
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 16 deletions.
Expand Up @@ -351,6 +351,7 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticSearchExc

IndexShard indexShard = shardInjector.getInstance(IndexShard.class);

indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
indicesLifecycle.afterIndexShardCreated(indexShard);

shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down
Expand Up @@ -289,8 +289,7 @@ public InternalIndexShard routingEntry(ShardRouting newRouting) {
synchronized (mutex) {
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
if (state == IndexShardState.POST_RECOVERY) {
logger.debug("state: [{}]->[{}], reason [global state is [{}]]", state, IndexShardState.STARTED, newRouting.state());
state = IndexShardState.STARTED;
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
movedToStarted = true;
} else {
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
Expand All @@ -314,7 +313,6 @@ public InternalIndexShard routingEntry(ShardRouting newRouting) {
public IndexShardState recovering(String reason) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
IndexShardState returnValue = state;
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
Expand All @@ -330,9 +328,7 @@ public IndexShardState recovering(String reason) throws IndexShardStartedExcepti
if (state == IndexShardState.POST_RECOVERY) {
throw new IndexShardRecoveringException(shardId);
}
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RECOVERING, reason);
state = IndexShardState.RECOVERING;
return returnValue;
return changeState(IndexShardState.RECOVERING, reason);
}
}

Expand All @@ -341,8 +337,7 @@ public InternalIndexShard relocated(String reason) throws IndexShardNotStartedEx
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RELOCATED, reason);
state = IndexShardState.RELOCATED;
changeState(IndexShardState.RELOCATED, reason);
}
return this;
}
Expand All @@ -352,6 +347,20 @@ public IndexShardState state() {
return state;
}

/**
* Changes the state of the current shard
* @param newState the new shard state
* @param reason the reason for the state change
* @return the previous shard state
*/
private IndexShardState changeState(IndexShardState newState, String reason) {
logger.debug("state: [{}]->[{}], reason [{}]", state, newState, reason);
IndexShardState previousState = state;
state = newState;
this.indicesLifecycle.indexShardStateChanged(this, previousState, reason);
return previousState;
}

@Override
public Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException {
long startTime = System.nanoTime();
Expand Down Expand Up @@ -653,10 +662,7 @@ public void close(String reason) {
mergeScheduleFuture = null;
}
}
if (logger.isDebugEnabled()) {
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
}
state = IndexShardState.CLOSED;
changeState(IndexShardState.CLOSED, reason);
}
}

Expand All @@ -681,8 +687,7 @@ public InternalIndexShard postRecovery(String reason) throws IndexShardStartedEx
}
engine.start();
startScheduledTasksIfNeeded();
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.POST_RECOVERY, reason);
state = IndexShardState.POST_RECOVERY;
changeState(IndexShardState.POST_RECOVERY, reason);
}
indicesLifecycle.afterIndexShardPostRecovery(this);
return this;
Expand Down Expand Up @@ -725,8 +730,7 @@ public void performRecoveryFinalization(boolean withFlush) throws ElasticSearchE
translog.clearUnreferenced();
engine.refresh(new Engine.Refresh("recovery_finalization").force(true));
synchronized (mutex) {
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.POST_RECOVERY);
state = IndexShardState.POST_RECOVERY;
changeState(IndexShardState.POST_RECOVERY, "post recovery");
}
indicesLifecycle.afterIndexShardPostRecovery(this);
startScheduledTasksIfNeeded();
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/elasticsearch/indices/IndicesLifecycle.java
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;

Expand Down Expand Up @@ -132,6 +133,19 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
public void afterIndexShardClosed(ShardId shardId) {

}

/**
* Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes.
* The order of concurrent events is preserved. The execution must be lightweight.
*
* @param indexShard the shard the new state was applied to
* @param previousState the previous index shard state if there was one, null otherwise
* @param currentState the new shard state
* @param reason the reason for the state change if there is one, null otherwise
*/
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {

}
}

}
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;

Expand Down Expand Up @@ -160,4 +161,14 @@ public void afterIndexShardClosed(ShardId shardId) {
}
}
}

public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, @Nullable String reason) {
for (Listener listener : listeners) {
try {
listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason);
} catch (Throwable t) {
logger.warn("{} failed to invoke index shard state changed callback", t, indexShard.shardId());
}
}
}
}
@@ -0,0 +1,127 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.elasticsearch.indices;

import com.google.common.collect.Maps;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION;
import static org.elasticsearch.common.settings.ImmutableSettings.builder;
import static org.elasticsearch.index.shard.IndexShardState.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;

@ClusterScope(scope = Scope.TEST, numNodes = 0)
public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest {

@Test
public void testIndexStateShardChanged() throws Throwable {

//start with a single node
String node1 = cluster().startNode();
IndexShardStateChangeListener stateChangeListenerNode1 = new IndexShardStateChangeListener();
//add a listener that keeps track of the shard state changes
cluster().getInstance(IndicesLifecycle.class, node1).addListener(stateChangeListenerNode1);

//create an index
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 6, SETTING_NUMBER_OF_REPLICAS, 0));
ensureGreen();

//new shards got started
assertShardStatesMatch(stateChangeListenerNode1, 6, CREATED, RECOVERING, POST_RECOVERY, STARTED);


//add a node: 3 out of the 6 shards will be relocated to it
//disable allocation before starting a new node, as we need to register the listener first
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(builder().put(CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)));
String node2 = cluster().startNode();
IndexShardStateChangeListener stateChangeListenerNode2 = new IndexShardStateChangeListener();
//add a listener that keeps track of the shard state changes
cluster().getInstance(IndicesLifecycle.class, node2).addListener(stateChangeListenerNode2);
//re-enable allocation
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(builder().put(CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, false)));
ensureGreen();

//the 3 relocated shards get closed on the first node
assertShardStatesMatch(stateChangeListenerNode1, 3, CLOSED);
//the 3 relocated shards get created on the second node
assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);


//increase replicas from 0 to 1
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(builder().put(SETTING_NUMBER_OF_REPLICAS, 1)));
ensureGreen();

//3 replicas are allocated to the first node
assertShardStatesMatch(stateChangeListenerNode1, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);

//3 replicas are allocated to the second node
assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);


//close the index
assertAcked(client().admin().indices().prepareClose("test"));

assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED);
assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED);
}

private static void assertShardStatesMatch(IndexShardStateChangeListener stateChangeListener, int numShards, IndexShardState... shardStates) {
assertThat(stateChangeListener.shardStates.size(), equalTo(numShards));
for (List<IndexShardState> indexShardStates : stateChangeListener.shardStates.values()) {
assertThat(indexShardStates, notNullValue());
assertThat(indexShardStates.size(), equalTo(shardStates.length));
for (int i = 0; i < shardStates.length; i++) {
assertThat(indexShardStates.get(i), equalTo(shardStates[i]));
}
}
stateChangeListener.shardStates.clear();
}

private static class IndexShardStateChangeListener extends IndicesLifecycle.Listener {
//we keep track of all the states (ordered) a shard goes through
final ConcurrentMap<ShardId, List<IndexShardState>> shardStates = Maps.newConcurrentMap();

@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState newState, @Nullable String reason) {
List<IndexShardState> shardStates = this.shardStates.putIfAbsent(indexShard.shardId(),
new CopyOnWriteArrayList<IndexShardState>(new IndexShardState[]{newState}));
if (shardStates != null) {
shardStates.add(newState);
}
}
}
}

0 comments on commit e19db21

Please sign in to comment.