Skip to content

Commit

Permalink
Set an newly created IndexShard's ShardRouting before exposing it to …
Browse files Browse the repository at this point in the history
…operations

The work for elastic#10708 requires tighter integration with the current shard routing of a shard. As such, we need to make sure it is set before the IndexService exposes the shard to external operations.
  • Loading branch information
bleskes committed Nov 22, 2015
1 parent 6b2f3a9 commit fe2218e
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
7 changes: 4 additions & 3 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -216,18 +216,18 @@ private long getAvgShardSizeInBytes() throws IOException {
}
}

public synchronized IndexShard createShard(int sShardId, ShardRouting routing) throws IOException {
public synchronized IndexShard createShard(ShardRouting routing) throws IOException {
final boolean primary = routing.primary();
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
* keep it synced.
*/
if (closed.get()) {
throw new IllegalStateException("Can't create shard [" + index().name() + "][" + sShardId + "], closed");
throw new IllegalStateException("Can't create shard " + routing.shardId() + ", closed");
}
final Settings indexSettings = this.indexSettings.getSettings();
final ShardId shardId = new ShardId(index(), sShardId);
final ShardId shardId = routing.shardId();
boolean success = false;
Store store = null;
IndexShard indexShard = null;
Expand Down Expand Up @@ -285,6 +285,7 @@ public synchronized IndexShard createShard(int sShardId, ShardRouting routing) t

eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
indexShard.updateRoutingEntry(routing, true);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
success = true;
return indexShard;
Expand Down
Expand Up @@ -625,8 +625,7 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] creating shard", shardRouting.index(), shardId);
}
IndexShard indexShard = indexService.createShard(shardId, shardRouting);
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
IndexShard indexShard = indexService.createShard(shardRouting);
indexShard.addShardFailureCallback(failedShardHandler);
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
Expand Down
Expand Up @@ -20,7 +20,10 @@

import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.*;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
Expand Down Expand Up @@ -77,18 +80,13 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
Expand Down Expand Up @@ -755,7 +753,7 @@ public void testRecoverFromStore() throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c simon says so");
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
Expand Down Expand Up @@ -787,7 +785,7 @@ public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException {
Lucene.cleanLuceneIndex(store.directory());
store.decRef();
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
try {
Expand All @@ -807,7 +805,7 @@ public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException {
// OK!
}
test.removeShard(0, "I broken it");
newShard = test.createShard(0, routing);
newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(localNode));
Expand Down Expand Up @@ -840,7 +838,7 @@ public void testRestoreShard() throws IOException {
ShardRoutingHelper.reinit(routing);
routing = ShardRoutingHelper.newWithRestoreSource(routing, new RestoreSource(new SnapshotId("foo", "bar"), Version.CURRENT, "test"));
test_target.removeShard(0, "just do it man!");
final IndexShard test_target_shard = test_target.createShard(0, routing);
final IndexShard test_target_shard = test_target.createShard(routing);
Store sourceStore = test_shard.store();
Store targetStore = test_target_shard.store();

Expand Down
Expand Up @@ -97,7 +97,7 @@ public void afterIndexShardDeleted(ShardId shardId, Settings indexSettings) {
String nodeId = newRouting.currentNodeId();
ShardRoutingHelper.moveToUnassigned(newRouting, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom"));
ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(0, newRouting);
IndexShard shard = index.createShard(newRouting);
shard.updateRoutingEntry(newRouting, true);
final DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
shard.markAsRecovering("store", new RecoveryState(shard.shardId(), newRouting.primary(), RecoveryState.Type.SNAPSHOT, newRouting.restoreSource(), localNode));
Expand Down

0 comments on commit fe2218e

Please sign in to comment.