Skip to content

Commit

Permalink
Take include_aliases flag into account when restoring data stream ali…
Browse files Browse the repository at this point in the history
…ases and also rename write data stream alias during a restore. (#73653)

Backport #73588 and #73595 to 7.x branch.

This commit includes two changes:

* Also rename write data stream alias during a restore.
Rename during a restore should also rename write data stream in data stream alias.

* Take include_aliases flag into account when restoring data stream aliases
   Take RestoreSnapshotRequest#includeAliases() into account when restoring
   data stream aliases from a snapshot into a cluster.

Relates to #66163
  • Loading branch information
martijnvg committed Jun 2, 2021
1 parent b532f13 commit ad7fecb
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void restoreSnapshot(final RestoreSnapshotRequest request,

// Get data stream metadata for requested data streams
Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> result =
getDataStreamsToRestore(repository, snapshotId, snapshotInfo, globalMetadata, requestIndices);
getDataStreamsToRestore(repository, snapshotId, snapshotInfo, globalMetadata, requestIndices, request.includeAliases());
Map<String, DataStream> dataStreamsToRestore = result.v1();
Map<String, DataStreamAlias> dataStreamAliasesToRestore = result.v2();

Expand Down Expand Up @@ -524,7 +524,11 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
List<String> renamedDataStreams = alias.getDataStreams().stream()
.map(s -> s.replaceAll(request.renamePattern(), request.renameReplacement()))
.collect(Collectors.toList());
return new DataStreamAlias(alias.getName(), renamedDataStreams, alias.getWriteDataStream());
String writeDataStream = alias.getWriteDataStream();
if (writeDataStream != null) {
writeDataStream = writeDataStream.replaceAll(request.renamePattern(), request.renameReplacement());
}
return new DataStreamAlias(alias.getName(), renamedDataStreams, writeDataStream);
} else {
return alias;
}
Expand Down Expand Up @@ -813,7 +817,8 @@ private Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> getDataStre
SnapshotId snapshotId,
SnapshotInfo snapshotInfo,
Metadata globalMetadata,
List<String> requestIndices) {
List<String> requestIndices,
boolean includeAliases) {
Map<String, DataStream> dataStreams;
Map<String, DataStreamAlias> dataStreamAliases;
List<String> requestedDataStreams = filterIndices(snapshotInfo.dataStreams(), requestIndices.toArray(new String[]{}),
Expand All @@ -826,27 +831,30 @@ private Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> getDataStre
globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
}
final Map<String, DataStream> dataStreamsInSnapshot = globalMetadata.dataStreams();
final Map<String, DataStreamAlias> dataStreamAliasesInSnapshot = globalMetadata.dataStreamAliases();
dataStreams = new HashMap<>(requestedDataStreams.size());
dataStreamAliases = new HashMap<>();
for (String requestedDataStream : requestedDataStreams) {
final DataStream dataStreamInSnapshot = dataStreamsInSnapshot.get(requestedDataStream);
assert dataStreamInSnapshot != null : "DataStream [" + requestedDataStream + "] not found in snapshot";
dataStreams.put(requestedDataStream, dataStreamInSnapshot);

}
for (DataStreamAlias alias : dataStreamAliasesInSnapshot.values()) {
List<String> intersectingDataStreams = alias.getDataStreams().stream()
.filter(requestedDataStreams::contains)
.collect(Collectors.toList());
String writeDateStream = alias.getWriteDataStream();
if (intersectingDataStreams.contains(writeDateStream) == false) {
writeDateStream = null;
}
if (intersectingDataStreams.isEmpty() == false) {
DataStreamAlias copy = new DataStreamAlias(alias.getName(), intersectingDataStreams, writeDateStream);
dataStreamAliases.put(alias.getName(), copy);
if (includeAliases) {
dataStreamAliases = new HashMap<>();
final Map<String, DataStreamAlias> dataStreamAliasesInSnapshot = globalMetadata.dataStreamAliases();
for (DataStreamAlias alias : dataStreamAliasesInSnapshot.values()) {
List<String> intersectingDataStreams = alias.getDataStreams().stream()
.filter(requestedDataStreams::contains)
.collect(Collectors.toList());
String writeDateStream = alias.getWriteDataStream();
if (intersectingDataStreams.contains(writeDateStream) == false) {
writeDateStream = null;
}
if (intersectingDataStreams.isEmpty() == false) {
DataStreamAlias copy = new DataStreamAlias(alias.getName(), intersectingDataStreams, writeDateStream);
dataStreamAliases.put(alias.getName(), copy);
}
}
} else {
dataStreamAliases = Collections.emptyMap();
}
}
return new Tuple<>(dataStreams, dataStreamAliases);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -80,6 +81,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
private String dsBackingIndexName;
private String otherDsBackingIndexName;
private String ds2BackingIndexName;
private String otherDs2BackingIndexName;
private String id;

@Override
Expand Down Expand Up @@ -111,6 +113,7 @@ public void setup() throws Exception {
otherDsBackingIndexName = getDataStreamResponse.getDataStreams().get(1).getDataStream().getIndices().get(0).getName();
// Will be used in some tests, to test renaming while restoring a snapshot:
ds2BackingIndexName = dsBackingIndexName.replace("-ds-", "-ds2-");
otherDs2BackingIndexName = otherDsBackingIndexName.replace("-other-ds-", "-other-ds2-");

IndexResponse indexResponse = client.prepareIndex("ds", "_doc")
.setOpType(DocWriteRequest.OpType.CREATE)
Expand Down Expand Up @@ -472,6 +475,46 @@ public void testSnapshotAndRestoreAll() throws Exception {
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
}

public void testSnapshotAndRestoreIncludeAliasesFalse() throws Exception {
CreateSnapshotRequest createSnapshotRequest = new CreateSnapshotRequest(REPO, SNAPSHOT);
createSnapshotRequest.waitForCompletion(true);
createSnapshotRequest.includeGlobalState(false);
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().createSnapshot(createSnapshotRequest).actionGet();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
assertThat(getSnapshot(REPO, SNAPSHOT).indices(), containsInAnyOrder(dsBackingIndexName, otherDsBackingIndexName));

assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "*" })).get());
assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN));

RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(REPO, SNAPSHOT);
restoreSnapshotRequest.waitForCompletion(true);
restoreSnapshotRequest.includeGlobalState(true);
restoreSnapshotRequest.includeAliases(false);
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().restoreSnapshot(restoreSnapshotRequest).actionGet();
assertEquals(2, restoreSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(DOCUMENT_SOURCE, client.prepareGet(dsBackingIndexName, "_doc", id).get().getSourceAsMap());
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());

GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(new String[] { "*" })
).get();
assertEquals(2, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
assertEquals(1, ds.getDataStreams().get(1).getDataStream().getIndices().size());
assertEquals(otherDsBackingIndexName, ds.getDataStreams().get(1).getDataStream().getIndices().get(0).getName());

GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("*")).actionGet();
assertThat(getAliasesResponse.getDataStreamAliases(), anEmptyMap());
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
}

public void testRename() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
Expand Down Expand Up @@ -520,6 +563,48 @@ public void testRename() throws Exception {
assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds"));
}

public void testRenameWriteDataStream() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("other-ds")
.setIncludeGlobalState(false)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);

client.admin()
.cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("other-ds")
.setRenamePattern("other-ds")
.setRenameReplacement("other-ds2")
.get();

GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(new String[] { "other-ds2" })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(otherDs2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());

GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds", "other-ds2"));
assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds2").size(), equalTo(1));
assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds2").get(0).getName(), equalTo("my-alias"));
assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds2").get(0).getWriteDataStream(), equalTo("other-ds2"));
assertThat(getAliasesResponse.getDataStreamAliases().get("ds").size(), equalTo(1));
assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getName(), equalTo("my-alias"));
assertThat(getAliasesResponse.getDataStreamAliases().get("ds").get(0).getWriteDataStream(), equalTo("other-ds2"));
assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").size(), equalTo(1));
assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getName(), equalTo("my-alias"));
assertThat(getAliasesResponse.getDataStreamAliases().get("other-ds").get(0).getWriteDataStream(), equalTo("other-ds2"));
}

public void testBackingIndexIsNotRenamedWhenRestoringDataStream() {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
Expand Down

0 comments on commit ad7fecb

Please sign in to comment.