Skip to content

Commit

Permalink
Track Repository Gen. in BlobStoreRepository (#48944) (#49116)
Browse files Browse the repository at this point in the history
This is intended as a stop-gap solution/improvement to #38941 that
prevents repo modifications without an intermittent master failover
from causing inconsistent (outdated due to inconsistent listing of index-N blobs)
`RepositoryData` to be written.

Tracking the latest repository generation will move to the cluster state in a
separate pull request. This is intended as a low-risk change to be backported as
far as possible and motived by the recently increased chance of #38941
causing trouble via SLM (see #47520).

Closes #47834
Closes #49048
  • Loading branch information
original-brownbear committed Nov 15, 2019
1 parent a370008 commit fc505aa
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -372,7 +373,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
} else {
try {
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
Expand All @@ -383,6 +384,30 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
}
}

/**
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
*
* @param repositoryStateId Expected repository generation
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
// snapshot delete run anyway.
logger.debug("Determined repository's generation from its contents to [" + generation + "] but " +
"current generation is at least [" + genToLoad + "]");
}
if (genToLoad != repositoryStateId) {
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
repositoryStateId + "], actual current generation [" + genToLoad + "]");
}
return getRepositoryData(genToLoad);
}

/**
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
* and then has all now unreferenced blobs in it deleted.
Expand Down Expand Up @@ -610,14 +635,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getGenId() != repositoryStateId) {
// Check that we are working on the expected repository version before gathering the data to clean up
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " +
"expected current generation [" + repositoryStateId + "], actual current generation ["
+ repositoryData.getGenId() + "]");
}
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
final Set<String> survivingIndexIds =
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
Expand Down Expand Up @@ -903,12 +922,36 @@ public void endVerification(String seed) {
}
}

// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
// and concurrent modifications.
// Protected for use in MockEventuallyConsistentRepository
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);

@Override
public RepositoryData getRepositoryData() {
try {
return getRepositoryData(latestIndexBlobId());
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
while (true) {
final long generation;
try {
generation = latestIndexBlobId();
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
}
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad > generation) {
logger.info("Determined repository generation [" + generation
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
}
try {
return getRepositoryData(genToLoad);
} catch (RepositoryException e) {
if (genToLoad != latestKnownRepoGen.get()) {
logger.warn("Failed to load repository data generation [" + genToLoad +
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
continue;
}
throw e;
}
}
}

Expand All @@ -926,6 +969,12 @@ private RepositoryData getRepositoryData(long indexGen) {
return RepositoryData.snapshotsFromXContent(parser, indexGen);
}
} catch (IOException ioe) {
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
// operations must start from the EMPTY_REPO_GEN again
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
}
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
}
}
Expand All @@ -951,11 +1000,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen < latestKnownGen) {
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void testConcurrentSnapshotCreateAndDelete() {
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();

continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener));
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener));
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));

Expand Down Expand Up @@ -1146,7 +1146,7 @@ protected void assertSnapshotOrGenericThread() {
} else {
return metaData -> {
final Repository repository = new MockEventuallyConsistentRepository(
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random());
repository.start();
return repository;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand All @@ -63,18 +65,22 @@
*/
public class MockEventuallyConsistentRepository extends BlobStoreRepository {

private final Random random;

private final Context context;

private final NamedXContentRegistry namedXContentRegistry;

public MockEventuallyConsistentRepository(
RepositoryMetaData metadata,
NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool,
Context context) {
super(metadata,false, namedXContentRegistry, threadPool);
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final ThreadPool threadPool,
final Context context,
final Random random) {
super(metadata, false, namedXContentRegistry, threadPool);
this.context = context;
this.namedXContentRegistry = namedXContentRegistry;
this.random = random;
}

// Filters out all actions that are super-seeded by subsequent actions
Expand Down Expand Up @@ -111,6 +117,9 @@ public BlobPath basePath() {
*/
public static final class Context {

// Eventual consistency is only simulated as long as this flag is false
private boolean consistent;

private final List<BlobStoreAction> actions = new ArrayList<>();

/**
Expand All @@ -121,6 +130,7 @@ public void forceConsistent() {
final List<BlobStoreAction> consistentActions = consistentView(actions);
actions.clear();
actions.addAll(consistentActions);
consistent = true;
}
}
}
Expand Down Expand Up @@ -244,14 +254,14 @@ public Map<String, BlobMetaData> listBlobs() {
ensureNotClosed();
final String thisPath = path.buildAsString();
synchronized (context.actions) {
return consistentView(context.actions).stream()
return maybeMissLatestIndexN(consistentView(context.actions).stream()
.filter(
action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1
&& action.operation == Operation.PUT)
.collect(
Collectors.toMap(
action -> action.path.substring(thisPath.length()),
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)));
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))));
}
}

Expand All @@ -272,9 +282,21 @@ public Map<String, BlobContainer> children() {

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) {
return
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return maybeMissLatestIndexN(
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

// Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen
// overrides an inconsistent listing
private Map<String, BlobMetaData> maybeMissLatestIndexN(Map<String, BlobMetaData> listing) {
// Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state
if (path.parent() == null && context.consistent == false && random.nextBoolean()) {
final Map<String, BlobMetaData> filtered = new HashMap<>(listing);
filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get());
return Collections.unmodifiableMap(filtered);
}
return listing;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testReadAfterWriteConsistently() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -70,7 +70,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -86,7 +86,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -104,7 +104,7 @@ public void testOverwriteRandomBlobFails() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
Expand All @@ -121,7 +121,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
repository.start();
final BlobContainer container =
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
Expand All @@ -143,7 +143,7 @@ public void testOverwriteSnapshotInfoBlob() {
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), threadPool, blobStoreContext)) {
xContentRegistry(), threadPool, blobStoreContext, random())) {
repository.start();

// We create a snap- blob for snapshot "foo" in the first generation
Expand Down

0 comments on commit fc505aa

Please sign in to comment.