Skip to content

Commit

Permalink
Improve Partial Snapshot Rollover Behavior (#69364) (#69370)
Browse files Browse the repository at this point in the history
Using new reconciliation functionality to not needlessly drop rolling over
data streams from the final snapshot.

closes #68536
  • Loading branch information
original-brownbear committed Feb 23, 2021
1 parent 293436c commit b41784f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -177,15 +178,19 @@ public DataStream promoteDataStream() {
* stream definitions that do not reference backing indices not contained in the snapshot.
*
* @param indicesInSnapshot List of indices in the snapshot
* @return Reconciled {@link DataStream} instance
* @return Reconciled {@link DataStream} instance or {@code null} if no reconciled version of this data stream could be built from the
* given indices
*/
public DataStream snapshot(List<String> indicesInSnapshot) {
@Nullable
public DataStream snapshot(Collection<String> indicesInSnapshot) {
// do not include indices not available in the snapshot
List<Index> reconciledIndices = new ArrayList<>(this.indices);
reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false);
if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) {
return this;
}

if (reconciledIndices.size() == 0) {
throw new IllegalArgumentException("cannot reconcile data stream without at least one backing index");
return null;
}

return new DataStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1122,8 +1122,9 @@ private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot,
break;
}
}
if (missingIndex == false) {
dataStreams.put(dataStreamName, dataStream);
final DataStream reconciled = missingIndex ? dataStream.snapshot(indicesInSnapshot) : dataStream;
if (reconciled != null) {
dataStreams.put(dataStreamName, reconciled);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasItems;
Expand Down Expand Up @@ -217,12 +216,7 @@ public void testSnapshotWithAllBackingIndicesRemoved() {
preSnapshotDataStream.isReplicated()
);

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> postSnapshotDataStream.snapshot(
preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())
)
);
assertThat(e.getMessage(), containsString("cannot reconcile data stream without at least one backing index"));
assertNull(postSnapshotDataStream.snapshot(
preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

@ESIntegTestCase.ClusterScope(transportClientRatio = 0)
Expand Down Expand Up @@ -546,22 +548,64 @@ public void testSnapshotDSDuringRollover() throws Exception {
unblockAllDataNodes(repoName);
final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);

if (snapshotInfo.dataStreams().contains("ds")) {
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
assertThat(snapshotInfo.dataStreams(), hasItems("ds"));
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());

RestoreInfo restoreSnapshotResponse = client().admin()
.cluster()
.prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setIndices("ds")
.get()
.getRestoreInfo();
RestoreInfo restoreSnapshotResponse = client().admin()
.cluster()
.prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setIndices("ds")
.get()
.getRestoreInfo();

assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
assertEquals(restoreSnapshotResponse.failedShards(), 0);
assertFalse(partial);
} else {
assertTrue(partial);
}
assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
assertEquals(restoreSnapshotResponse.failedShards(), 0);
}

public void testSnapshotDSDuringRolloverAndDeleteOldIndex() throws Exception {
// repository consistency check requires at least one snapshot per registered repository
createFullSnapshot(REPO, "snap-so-repo-checks-pass");
final String repoName = "mock-repo";
createRepository(repoName, "mock");
blockAllDataNodes(repoName);
final String snapshotName = "ds-snap";
final ActionFuture<CreateSnapshotResponse> snapshotFuture = client().admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setPartial(true)
.setIncludeGlobalState(randomBoolean())
.execute();
waitForBlockOnAnyDataNode(repoName);
awaitNumberOfSnapshotsInProgress(1);
final RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("ds", null)).get();
assertTrue(rolloverResponse.isRolledOver());

logger.info("--> deleting former write index");
assertAcked(client().admin().indices().prepareDelete(rolloverResponse.getOldIndex()));

unblockAllDataNodes(repoName);
final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);

assertThat(
"snapshot should not contain 'ds' since none of its indices existed both at the start and at the end of the snapshot",
snapshotInfo.dataStreams(),
not(hasItems("ds"))
);
assertAcked(
client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "other-ds" })).get()
);

RestoreInfo restoreSnapshotResponse = client().admin()
.cluster()
.prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setIndices("other-ds")
.get()
.getRestoreInfo();

assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
assertEquals(restoreSnapshotResponse.failedShards(), 0);
}
}

0 comments on commit b41784f

Please sign in to comment.