Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write shard state metadata as soon as shard is created / initializing #16625

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -187,12 +187,14 @@ protected NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting s
}

if (nodeShardState.storeException() == null) {
if (allocationId == null && nodeShardState.legacyVersion() != ShardStateMetaData.NO_VERSION) {
// old shard with no allocation id, assign dummy value so that it gets added below in case of matchAnyShard
allocationId = "_n/a_";
if (allocationId == null && nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION) {
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
} else if (allocationId != null) {
assert nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION : "Allocation id and legacy version cannot be both present";
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);
} else {
logger.trace("[{}] on node [{}] has no allocation id, out-dated shard (shard state version: [{}])", shard, nodeShardState.getNode(), nodeShardState.legacyVersion());
}

logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId);
} else {
logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId);
allocationId = null;
Expand Down Expand Up @@ -299,9 +301,20 @@ NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, boolean m
continue;
}

// no version means it does not exists, which is what the API returns, and what we expect to
if (nodeShardState.storeException() == null) {
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
if (version == ShardStateMetaData.NO_VERSION && nodeShardState.allocationId() == null) {
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
} else if (version != ShardStateMetaData.NO_VERSION) {
assert nodeShardState.allocationId() == null : "Allocation id and legacy version cannot be both present";
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
} else {
// shard was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but
// did not make it to STARTED state before the cluster crashed (otherwise list of active allocation ids would be
// non-empty and allocation id - based allocation mode would be chosen).
// Prefer this shard copy again.
version = Long.MAX_VALUE;
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId());
}
} else {
// when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist)
logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", nodeShardState.storeException(), shard, nodeShardState.getNode(), version);
Expand Down
Expand Up @@ -215,7 +215,7 @@ final class BitSetProducerWarmer implements IndexWarmer.Listener {

@Override
public IndexWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
if (indexSettings.getIndex().equals(indexShard.getIndexSettings().getIndex()) == false) {
if (indexSettings.getIndex().equals(indexShard.indexSettings().getIndex()) == false) {
// this is from a different index
return TerminationHandle.NO_WAIT;
}
Expand Down
150 changes: 64 additions & 86 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -153,7 +153,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final EngineConfig engineConfig;
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
private final IndexSettings idxSettings;

/** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
Expand Down Expand Up @@ -205,7 +204,6 @@ public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.idxSettings = indexSettings;
this.codecService = new CodecService(mapperService, logger);
this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
Expand Down Expand Up @@ -248,18 +246,14 @@ public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path,
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.suspendableRefContainer = new SuspendableRefContainer();
this.searcherWrapper = indexSearcherWrapper;
QueryShardContext queryShardContext = new QueryShardContext(idxSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
QueryShardContext queryShardContext = new QueryShardContext(indexSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryShardContext);
}

public Store store() {
return this.store;
}

public IndexSettings getIndexSettings() {
return idxSettings;
}

/** returns true if this shard supports indexing (i.e., write) operations. */
public boolean canIndex() {
return true;
Expand Down Expand Up @@ -319,66 +313,64 @@ public QueryCachingPolicy getQueryCachingPolicy() {
* unless explicitly disabled.
*
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
* @throws IOException if shard state could not be persisted
*/
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) {
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the change, but remind me why you needed it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, we would just ignore exception if shard state metadata could not be written. Now we propagate the exception upwards (where shard is failed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. So it's not strictly needed, just an improvement. wanted to make sure I didn't miss anything.

final ShardRouting currentRouting = this.shardRouting;
if (!newRouting.shardId().equals(shardId())) {
throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
}
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
}
try {
if (currentRouting != null) {
if (!newRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
}
// if its the same routing, return
if (currentRouting.equals(newRouting)) {
return;
}
if (currentRouting != null) {
if (!newRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
}
// if its the same routing, return
if (currentRouting.equals(newRouting)) {
return;
}
}

if (state == IndexShardState.POST_RECOVERY) {
// if the state is started or relocating (cause it might move right away from started to relocating)
// then move to STARTED
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
// we want to refresh *before* we move to internal STARTED state
try {
getEngine().refresh("cluster_state_started");
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
if (state == IndexShardState.POST_RECOVERY) {
// if the state is started or relocating (cause it might move right away from started to relocating)
// then move to STARTED
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
// we want to refresh *before* we move to internal STARTED state
try {
getEngine().refresh("cluster_state_started");
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}

boolean movedToStarted = false;
synchronized (mutex) {
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
if (state == IndexShardState.POST_RECOVERY) {
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
movedToStarted = true;
} else {
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
}
}
if (movedToStarted) {
indexEventListener.afterIndexShardStarted(this);
boolean movedToStarted = false;
synchronized (mutex) {
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
if (state == IndexShardState.POST_RECOVERY) {
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
movedToStarted = true;
} else {
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
}
}
if (movedToStarted) {
indexEventListener.afterIndexShardStarted(this);
}
}
}

if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
// active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
this.shardRouting = newRouting;
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
} finally {
if (persistState) {
persistMetadata(newRouting, currentRouting);
}
if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
// active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
this.shardRouting = newRouting;
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
if (persistState) {
persistMetadata(newRouting, currentRouting);
}
}

Expand Down Expand Up @@ -733,7 +725,7 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
luceneVersion = segment.getVersion();
}
}
return luceneVersion == null ? idxSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
}

/**
Expand Down Expand Up @@ -1046,18 +1038,6 @@ public void checkIdle(long inactiveTimeNS) {
}
}

/**
* Deletes the shards metadata state. This method can only be executed if the shard is not active.
*
* @throws IOException if the delete fails
*/
public void deleteShardState() throws IOException {
if (this.routingEntry() != null && this.routingEntry().active()) {
throw new IllegalStateException("Can't delete shard state on an active shard");
}
MetaDataStateFormat.deleteMetaState(shardPath().getDataPath());
}

public boolean isActive() {
return active.get();
}
Expand All @@ -1070,7 +1050,7 @@ public boolean recoverFromStore(DiscoveryNode localNode) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
boolean shouldExist = shardRouting.allocatedPostIndexCreate(idxSettings.getIndexMetaData());
boolean shouldExist = shardRouting.allocatedPostIndexCreate(indexSettings.getIndexMetaData());

StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
Expand Down Expand Up @@ -1344,27 +1324,25 @@ public boolean allowsPrimaryPromotion() {
}

// pkg private for testing
void persistMetadata(ShardRouting newRouting, ShardRouting currentRouting) {
void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException {
assert newRouting != null : "newRouting must not be null";
if (newRouting.active()) {
try {
final String writeReason;
if (currentRouting == null) {
writeReason = "freshly started, allocation id [" + newRouting.allocationId() + "]";
} else if (currentRouting.equals(newRouting) == false) {
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
} else {
logger.trace("{} skip writing shard state, has been written before", shardId);
return;
}
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath());
} catch (IOException e) { // this is how we used to handle it.... :(
logger.warn("failed to write shard state", e);
// we failed to write the shard state, we will try and write
// it next time...

// only persist metadata if routing information that is persisted in shard state metadata actually changed
if (currentRouting == null
|| currentRouting.primary() != newRouting.primary()
|| currentRouting.allocationId().equals(newRouting.allocationId()) == false) {
assert currentRouting == null || currentRouting.isSameAllocation(newRouting);
final String writeReason;
if (currentRouting == null) {
writeReason = "initial state with allocation id [" + newRouting.allocationId() + "]";
} else {
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
}
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath());
} else {
logger.trace("{} skip writing shard state, has been written before", shardId);
}
}

Expand Down Expand Up @@ -1396,7 +1374,7 @@ public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throw
return new EngineConfig(shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
}

public Releasable acquirePrimaryOperationLock() {
Expand Down
Expand Up @@ -51,12 +51,12 @@ public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath

/**
* In addition to the regular accounting done in
* {@link IndexShard#updateRoutingEntry(org.elasticsearch.cluster.routing.ShardRouting, boolean)},
* {@link IndexShard#updateRoutingEntry(ShardRouting, boolean)},
* if this shadow replica needs to be promoted to a primary, the shard is
* failed in order to allow a new primary to be re-allocated.
*/
@Override
public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) {
public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) throws IOException {
if (newRouting.primary() == true) {// becoming a primary
throw new IllegalStateException("can't promote shard to primary");
}
Expand Down
Expand Up @@ -904,7 +904,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
return false;
}
IndexSettings settings = context.indexShard().getIndexSettings();
IndexSettings settings = context.indexShard().indexSettings();
// if not explicitly set in the request, use the index setting, if not, use the request
if (request.requestCache() == null) {
if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {
Expand Down