Skip to content

Commit

Permalink
Streamline AsyncShardFetch#getNumberOfInFlightFetches (#96545)
Browse files Browse the repository at this point in the history
Avoids an O(#nodes) iteration by tracking the number of fetches directly.

Backport of #93632 to 7.17

Co-authored-by: luyuncheng <luyuncheng@bytedance.com>
  • Loading branch information
DaveCTurner and luyuncheng committed Jun 5, 2023
1 parent 72afc2e commit 6a755e3
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 19 deletions.
53 changes: 35 additions & 18 deletions server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, N
private final Set<String> nodesToIgnore = new HashSet<>();
private final AtomicLong round = new AtomicLong();
private boolean closed;
private volatile int fetchingCount;

@SuppressWarnings("unchecked")
protected AsyncShardFetch(
Expand All @@ -81,6 +82,7 @@ protected AsyncShardFetch(
this.type = type;
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
this.fetchingCount = 0;
this.action = (Lister<BaseNodesResponse<T>, T>) action;
}

Expand All @@ -92,14 +94,8 @@ public synchronized void close() {
/**
* Returns the number of async fetches that are currently ongoing.
*/
public synchronized int getNumberOfInFlightFetches() {
int count = 0;
for (NodeEntry<T> nodeEntry : cache.values()) {
if (nodeEntry.isFetching()) {
count++;
}
}
return count;
public int getNumberOfInFlightFetches() {
return fetchingCount;
}

/**
Expand All @@ -122,6 +118,7 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Set<String> i
final long fetchingRound = round.incrementAndGet();
for (NodeEntry<T> nodeEntry : nodesToFetch) {
nodeEntry.markAsFetching(fetchingRound);
fetchingCount++;
}
DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream()
.map(NodeEntry::getNodeId)
Expand All @@ -131,7 +128,7 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Set<String> i
}

// if we are still fetching, return null to indicate it
if (hasAnyNodeFetching(cache)) {
if (hasAnyNodeFetching()) {
return new FetchResult<>(shardId, null, emptySet());
} else {
// nothing to fetch, yay, build the return value
Expand Down Expand Up @@ -209,6 +206,7 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
// if the entry is there, for the right fetching round and not marked as failed already, process it
logger.trace("{} marking {} as done for [{}], result is [{}]", shardId, nodeEntry.getNodeId(), type, response);
nodeEntry.doneFetching(response);
fetchingCount--;
}
}
}
Expand Down Expand Up @@ -236,6 +234,7 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
|| unwrappedCause instanceof ReceiveTimeoutTransportException
|| unwrappedCause instanceof ElasticsearchTimeoutException) {
nodeEntry.restartFetching();
fetchingCount--;
} else {
logger.warn(
() -> new ParameterizedMessage(
Expand All @@ -247,12 +246,14 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
failure
);
nodeEntry.doneFetching(failure.getCause());
fetchingCount--;
}
}
}
}
}
reroute(shardId, "post_response");
assert assertFetchingCountConsistent();
}

/**
Expand All @@ -264,7 +265,11 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
* Clear cache for node, ensuring next fetch will fetch a fresh copy.
*/
synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
NodeEntry<T> nodeEntry = cache.remove(nodeId);
if (nodeEntry != null && nodeEntry.fetching) {
fetchingCount--;
}
assert assertFetchingCountConsistent();
}

/**
Expand All @@ -280,7 +285,18 @@ private void fillShardCacheWithDataNodes(Map<String, NodeEntry<T>> shardCache, D
}
}
// remove nodes that are not longer part of the data nodes set
shardCache.keySet().removeIf(nodeId -> nodes.nodeExists(nodeId) == false);
for (Iterator<Map.Entry<String, NodeEntry<T>>> iterator = shardCache.entrySet().iterator(); iterator.hasNext();) {
Map.Entry<String, NodeEntry<T>> entry = iterator.next();
String nodeId = entry.getKey();
NodeEntry<T> nodeEntry = entry.getValue();
if (nodes.nodeExists(nodeId) == false) {
if (nodeEntry.fetching) {
fetchingCount--;
}
iterator.remove();
}
}
assert assertFetchingCountConsistent();
}

/**
Expand All @@ -300,13 +316,8 @@ private List<NodeEntry<T>> findNodesToFetch(Map<String, NodeEntry<T>> shardCache
/**
* Are there any nodes that are fetching data?
*/
private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
for (NodeEntry<T> nodeEntry : shardCache.values()) {
if (nodeEntry.isFetching()) {
return true;
}
}
return false;
private boolean hasAnyNodeFetching() {
return fetchingCount != 0;
}

/**
Expand Down Expand Up @@ -465,4 +476,10 @@ long getFetchingRound() {
return fetchingRound;
}
}

private boolean assertFetchingCountConsistent() {
assert Thread.holdsLock(this);
assert fetchingCount == cache.values().stream().filter(NodeEntry::isFetching).count();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public void testTwoNodesOnSetup() throws Exception {

// no fetched data, 2 requests still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(2));
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

Expand All @@ -215,6 +216,7 @@ public void testTwoNodesOnSetup() throws Exception {
// no more ongoing requests, we should fetch the data
assertThat(test.reroute.get(), equalTo(2));
fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(2));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
Expand All @@ -228,6 +230,7 @@ public void testTwoNodesOnSetupAndFailure() throws Exception {

// no fetched data, 2 requests still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(2));
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

Expand All @@ -236,12 +239,14 @@ public void testTwoNodesOnSetupAndFailure() throws Exception {
assertThat(test.reroute.get(), equalTo(1));
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));

// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
assertThat(test.reroute.get(), equalTo(2));
// since one of those failed, we should only have one entry
fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
Expand Down Expand Up @@ -286,6 +291,7 @@ public void testClearCache() throws Exception {

// no fetched data, request still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

Expand All @@ -294,12 +300,14 @@ public void testClearCache() throws Exception {

// verify we get back right data from node
fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));

// second fetch gets same data
fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
Expand All @@ -311,18 +319,56 @@ public void testClearCache() throws Exception {

// no fetched data, new request on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));
assertThat(fetchData.hasData(), equalTo(false));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));
}

public void testTwoNodesRemoveOne() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).build();
test.addSimulation(node1.getId(), response1);
test.addSimulation(node2.getId(), response2);

// no fetched data, 2 requests still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(2));
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

// remove node1 that are no longer part of the data nodes set
DiscoveryNodes newNodes = DiscoveryNodes.builder().add(node2).build();
fetchData = test.fetchData(newNodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));
assertThat(fetchData.hasData(), equalTo(false));

// fire the first response, but data1 removed
test.fireSimulationAndWait(node1.getId());
// there is still another on going request, so no data
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));

// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// no more ongoing requests, we should fetch the data
assertThat(test.reroute.get(), equalTo(2));
fetchData = test.fetchData(newNodes, emptySet());
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));
assertThat(fetchData.hasData(), equalTo(true));
// only node2 in the fetchData
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node2), sameInstance(response2));
}

public void testConcurrentRequestAndClearCache() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);
Expand All @@ -333,7 +379,9 @@ public void testConcurrentRequestAndClearCache() throws Exception {
assertThat(test.reroute.get(), equalTo(0));

// clear cache while request is still on going, before it is processed
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));
test.clearCacheForNode(node1.getId());
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));
Expand All @@ -344,16 +392,17 @@ public void testConcurrentRequestAndClearCache() throws Exception {
// verify still no fetched data, request still on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(test.getNumberOfInFlightFetches(), equalTo(0));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));

}

static class TestFetch extends AsyncShardFetch<Response> {
Expand Down

0 comments on commit 6a755e3

Please sign in to comment.