Skip to content

Commit

Permalink
Remove some blocking in CcrRepository (#87016)
Browse files Browse the repository at this point in the history
Today `CcrRepository#getRepositoryData` blocks the calling thread
pending receipt of the full metadata of the remote cluster. It would be
preferable if it didn't get the full cluster metadata at all, but we can
at least remove the blocking wait here.
  • Loading branch information
DaveCTurner committed May 31, 2022
1 parent 61d51e5 commit 40b7c6c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 58 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/87016.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87016
summary: Remove blocking in `CcrRespository#getRepositoryData`
area: CCR
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -36,7 +37,6 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -176,32 +176,30 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) {
final List<SnapshotId> snapshotIds = context.snapshotIds();
assert snapshotIds.size() == 1 && SNAPSHOT_ID.equals(snapshotIds.iterator().next())
: "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds;
Client remoteClient = getRemoteClusterClient();
ClusterStateResponse response = remoteClient.admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.setNodes(true)
.get(ccrSettings.getRecoveryActionTimeout());
Metadata responseMetadata = response.getState().metadata();
ImmutableOpenMap<String, IndexMetadata> indicesMap = responseMetadata.indices();
List<String> indices = new ArrayList<>(indicesMap.keySet());

// fork to the snapshot meta pool because the context expects to run on it and asserts that it does
threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
.execute(
() -> context.onResponse(
new SnapshotInfo(
try {
getRemoteClusterClient().admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.setNodes(true)
// fork to the snapshot meta pool because the context expects to run on it and asserts that it does
.execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> {
Metadata responseMetadata = response.getState().metadata();
Map<String, IndexMetadata> indicesMap = responseMetadata.indices();
List<String> indices = new ArrayList<>(indicesMap.keySet());
return new SnapshotInfo(
new Snapshot(this.metadata.name(), SNAPSHOT_ID),
indices,
new ArrayList<>(responseMetadata.dataStreams().keySet()),
Collections.emptyList(),
response.getState().getNodes().getMaxNodeVersion(),
SnapshotState.SUCCESS
)
)
);
);
}), false));
} catch (Exception e) {
context.onFailure(e);
}
}

@Override
Expand Down Expand Up @@ -259,44 +257,42 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
ActionListener.completeWith(listener, () -> {
Client remoteClient = getRemoteClusterClient();
ClusterStateResponse response = remoteClient.admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.get(ccrSettings.getRecoveryActionTimeout());
Metadata remoteMetadata = response.getState().getMetadata();

Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
Map<String, RepositoryData.SnapshotDetails> snapshotsDetails = Maps.newMapWithExpectedSize(copiedSnapshotIds.size());
Map<IndexId, List<SnapshotId>> indexSnapshots = Maps.newMapWithExpectedSize(copiedSnapshotIds.size());

ImmutableOpenMap<String, IndexMetadata> remoteIndices = remoteMetadata.getIndices();
for (String indexName : remoteMetadata.getConcreteAllIndices()) {
// Both the Snapshot name and UUID are set to _latest_
SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
copiedSnapshotIds.put(indexName, snapshotId);
final long nowMillis = threadPool.absoluteTimeInMillis();
snapshotsDetails.put(
indexName,
new RepositoryData.SnapshotDetails(SnapshotState.SUCCESS, Version.CURRENT, nowMillis, nowMillis, "")
try {
getRemoteClusterClient().admin().cluster().prepareState().clear().setMetadata(true).execute(listener.map(response -> {
final Metadata remoteMetadata = response.getState().getMetadata();
final String[] concreteAllIndices = remoteMetadata.getConcreteAllIndices();
final Map<String, SnapshotId> copiedSnapshotIds = Maps.newMapWithExpectedSize(concreteAllIndices.length);
final Map<String, RepositoryData.SnapshotDetails> snapshotsDetails = Maps.newMapWithExpectedSize(concreteAllIndices.length);
final Map<IndexId, List<SnapshotId>> indexSnapshots = Maps.newMapWithExpectedSize(concreteAllIndices.length);
final Map<String, IndexMetadata> remoteIndices = remoteMetadata.getIndices();
for (String indexName : concreteAllIndices) {
// Both the Snapshot name and UUID are set to _latest_
final SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
copiedSnapshotIds.put(indexName, snapshotId);
final long nowMillis = threadPool.absoluteTimeInMillis();
snapshotsDetails.put(
indexName,
new RepositoryData.SnapshotDetails(SnapshotState.SUCCESS, Version.CURRENT, nowMillis, nowMillis, "")
);
indexSnapshots.put(
new IndexId(indexName, remoteIndices.get(indexName).getIndex().getUUID()),
Collections.singletonList(snapshotId)
);
}
return new RepositoryData(
MISSING_UUID,
1,
copiedSnapshotIds,
snapshotsDetails,
indexSnapshots,
ShardGenerations.EMPTY,
IndexMetaDataGenerations.EMPTY,
MISSING_UUID
);
Index index = remoteIndices.get(indexName).getIndex();
indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId));
}
return new RepositoryData(
MISSING_UUID,
1,
copiedSnapshotIds,
snapshotsDetails,
indexSnapshots,
ShardGenerations.EMPTY,
IndexMetaDataGenerations.EMPTY,
MISSING_UUID
);
});
}));
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
Expand Down

0 comments on commit 40b7c6c

Please sign in to comment.