Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class AsyncRequestContext<Request, Response, RequestType, NodeLocation> {
* <p>All kinds of AsyncHandler will remove its targetNode from the nodeLocationMap only if its
* corresponding RPC request success
*/
private final Map<Integer, NodeLocation> nodeLocationMap;
private final ConcurrentHashMap<Integer, NodeLocation> nodeLocationMap;

/**
* Map key: The indices(targetNode's ID) of asynchronous RPC requests.
Expand Down Expand Up @@ -88,7 +88,7 @@ public void putNodeLocation(final int requestId, final NodeLocation nodeLocation
/** Constructor for null requests. */
public AsyncRequestContext(RequestType requestType, Map<Integer, NodeLocation> nodeLocationMap) {
this.requestType = requestType;
this.nodeLocationMap = nodeLocationMap;
this.nodeLocationMap = new ConcurrentHashMap<>(nodeLocationMap);
this.requestMap = new ConcurrentHashMap<>();
this.responseMap = new ConcurrentHashMap<>();
}
Expand All @@ -97,7 +97,7 @@ public AsyncRequestContext(RequestType requestType, Map<Integer, NodeLocation> n
public AsyncRequestContext(
RequestType requestType, Request request, Map<Integer, NodeLocation> nodeLocationMap) {
this.requestType = requestType;
this.nodeLocationMap = nodeLocationMap;
this.nodeLocationMap = new ConcurrentHashMap<>(nodeLocationMap);
this.requestMap = new ConcurrentHashMap<>();
this.nodeLocationMap.keySet().forEach(nodeId -> this.requestMap.put(nodeId, request));
this.responseMap = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,12 @@ public void sendAsyncRequest(
requestContext.resetCountDownLatch();

// Send requests to all targetNodes
for (final int requestId : requestContext.getRequestIndices()) {
final NodeLocation targetNode = requestContext.getNodeLocation(requestId);
sendAsyncRequest(requestContext, requestId, targetNode, retry);
}
final int finalRetry = retry;
requestContext
.getNodeLocationMap()
.forEach(
(requestId, nodeLocation) ->
sendAsyncRequest(requestContext, requestId, nodeLocation, finalRetry));

// Wait for this batch of asynchronous RPC requests finish
try {
Expand Down