Skip to content

Commit

Permalink
Fold EngineDiskUtils into Store, for better lock semantics (#29156)
Browse files Browse the repository at this point in the history
#28245 has introduced the utility class`EngineDiskUtils` with a set of methods to prepare/change
translog and lucene commit points. That util class bundled everything that's needed to create and
empty shard, bootstrap a shard from a lucene index that was just restored etc. 

In order to safely do these manipulations, the util methods acquired the IndexWriter's lock. That
would sometime fail due to concurrent shard store fetching or other short activities that require the
files not to be changed while they read from them. 

Since there is no way to wait on the index writer lock, the `Store` class has other locks to make
sure that once we try to acquire the IW lock, it will succeed. To side step this waiting problem, this
PR folds `EngineDiskUtils` into `Store`. Sadly this comes with a price - the store class doesn't and
shouldn't know about the translog. As such the logic is slightly less tight and callers have to do the
translog manipulations on their own.
  • Loading branch information
bleskes committed Mar 26, 2018
1 parent a9392f6 commit f5d4550
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 388 deletions.
2 changes: 1 addition & 1 deletion docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ which returns something similar to:
{
"commit" : {
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
"generation" : 3,
"generation" : 4,
"user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -40,6 +39,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineDiskUtils;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
Expand Down Expand Up @@ -390,7 +390,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), indexShard.shardPath().resolveTranslog(), shardId);
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
// since we recover from local, just fill the files and size
try {
Expand All @@ -402,7 +406,10 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
logger.debug("failed to list file details", e);
}
} else {
EngineDiskUtils.createEmpty(store.directory(), indexShard.shardPath().resolveTranslog(), shardId);
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED, shardId);
store.associateIndexWithNewTranslog(translogUUID);
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
Expand Down Expand Up @@ -445,8 +452,12 @@ private void restore(final IndexShard indexShard, final Repository repository, f
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(indexShard.store().directory(), indexShard.shardPath().resolveTranslog(),
shardId);
final Store store = indexShard.store();
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
Expand Down
104 changes: 102 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.AlreadyClosedException;
Expand All @@ -46,7 +48,6 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -69,11 +70,13 @@
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -155,7 +158,8 @@ public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService dire
this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY);
}

public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock, OnClose onClose) throws IOException {
public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock,
OnClose onClose) throws IOException {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId));
Expand Down Expand Up @@ -1454,4 +1458,100 @@ private static long estimateSize(Directory directory) throws IOException {
}
}

/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public void createEmpty() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1");
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
}
}


/**
* Marks an existing lucene index with a new history uuid.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
*/
public void bootstrapNewHistory() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
final Map<String, String> userData = getUserData(writer);
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
}
}

/**
* Force bakes the given translog generation as recovery information in the lucene index. This is
* used when recovering from a snapshot or peer file based recovery where a new empty translog is
* created and the existing lucene index needs should be changed to use it.
*/
public void associateIndexWithNewTranslog(final String translogUUID) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
}
}


/**
* Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed.
*/
public void ensureIndexHasHistoryUUID() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
final Map<String, String> userData = getUserData(writer);
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
}
} finally {
metadataLock.writeLock().unlock();
}
}

private void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}

private Map<String, String> getUserData(IndexWriter writer) {
final Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
return userData;
}

private IndexWriter newIndexWriter(IndexWriterConfig.OpenMode openMode, final Directory dir) throws IOException {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(openMode);
return new IndexWriter(dir, iwc);
}

}
Loading

0 comments on commit f5d4550

Please sign in to comment.