Skip to content

Commit

Permalink
rls: Fix a local and remote race
Browse files Browse the repository at this point in the history
The local race passes `rlsPicker` to the channel before
CachingRlsLbClient is finished constructing. `RlsPicker` can use
multiple of the fields not yet initialized. This seems not to be
happening in practice, because it appears like it would break things
very loudly (e.g., NPE).

The remote race seems incredibly hard to hit, because it requires an RPC
to complete before the pending data tracking the RPC is added to a map.
But with if a system is at 100% CPU utilization, maybe it can be hit. If
it is hit, all RPCs needing the impacted cache entry will forever be
buffered.
  • Loading branch information
ejona86 committed Mar 11, 2024
1 parent a1515f9 commit eb17e8c
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ private CachingRlsLbClient(Builder builder) {
rlsChannelBuilder.disableServiceConfigLookUp();
}
rlsChannel = rlsChannelBuilder.build();
helper.updateBalancingState(ConnectivityState.CONNECTING, rlsPicker);
rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
childLbResolvedAddressFactory =
checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
Expand Down Expand Up @@ -285,7 +284,11 @@ private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) {
ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
pendingEntry = new PendingCacheEntry(request, asyncCall);
// Add the entry to the map before adding the Listener, because the listener removes the
// entry from the map
pendingCallCache.put(request, pendingEntry);
// Beware that the listener can run immediately on the current thread
asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
} else {
// async call returned finished future is most likely throttled
Expand Down Expand Up @@ -462,17 +465,9 @@ final class PendingCacheEntry {
this.request = checkNotNull(request, "request");
this.pendingCall = pendingCall;
this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy;
pendingCall.addListener(
new Runnable() {
@Override
public void run() {
handleDoneFuture();
}
},
synchronizationContext);
}

private void handleDoneFuture() {
void handleDoneFuture() {
synchronized (lock) {
pendingCallCache.remove(request);
if (pendingCall.isCancelled()) {
Expand Down Expand Up @@ -589,7 +584,9 @@ void maybeRefresh() {
}
final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall));
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall);
pendingCallCache.put(request, pendingEntry);
asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
} else {
// async call returned finished future is most likely throttled
try {
Expand Down Expand Up @@ -727,9 +724,10 @@ private void transitionToPending() {
}
ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
if (!call.isDone()) {
linkedHashLruCache.invalidate(request);
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
pendingCallCache.put(request, pendingEntry);
linkedHashLruCache.invalidate(request);
call.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
} else {
try {
RouteLookupResponse response = call.get();
Expand Down Expand Up @@ -837,7 +835,9 @@ Builder setBackoffProvider(BackoffPolicy.Provider provider) {
}

CachingRlsLbClient build() {
return new CachingRlsLbClient(this);
CachingRlsLbClient client = new CachingRlsLbClient(this);
helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
return client;
}
}

Expand Down

0 comments on commit eb17e8c

Please sign in to comment.