Skip to content

Commit

Permalink
grpclb: keep track of state updates for cached Subchannels. (#5441)
Browse files Browse the repository at this point in the history
The problem: GrpclbState tracks Subchannels' states as a mutable
attribute in Subchannel.getAttributes(). However, GrpclbState only
update this attribute for the Subchannels its managing. For those
cached in SubchannelPool, their state attributes are stale. When they
are given back to GrpclbState, IDLE state is assumed.  As a result, if
a Subchannel is READY when it's reclaimed from the pool, it will not
be picked.

To fix that, this change expands SubchannelPool interface to handle
Subchannel state updates, which GrpclbState will call. SubchannelPool
saves the latest state and delivers it when it's returned to
GrpclbState by scheduling a call to handleSubchannelState() in the
SynchronizationContext, so that GrpclbState will take the latest state
as if it was just reported from the Channel.
  • Loading branch information
zhangkun83 committed Mar 7, 2019
1 parent 6b0325c commit 034675e
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 44 deletions.
36 changes: 30 additions & 6 deletions grpclb/src/main/java/io/grpc/grpclb/CachedSubchannelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.SynchronizationContext.ScheduledHandle;
Expand All @@ -37,31 +39,51 @@ final class CachedSubchannelPool implements SubchannelPool {
new HashMap<>();

private Helper helper;
private LoadBalancer lb;

@VisibleForTesting
static final long SHUTDOWN_TIMEOUT_MS = 10000;

@Override
public void init(Helper helper) {
public void init(Helper helper, LoadBalancer lb) {
this.helper = checkNotNull(helper, "helper");
this.lb = checkNotNull(lb, "lb");
}

@Override
public Subchannel takeOrCreateSubchannel(
EquivalentAddressGroup eag, Attributes defaultAttributes) {
CacheEntry entry = cache.remove(eag);
Subchannel subchannel;
final CacheEntry entry = cache.remove(eag);
final Subchannel subchannel;
if (entry == null) {
subchannel = helper.createSubchannel(eag, defaultAttributes);
} else {
subchannel = entry.subchannel;
entry.shutdownTimer.cancel();
// Make the balancer up-to-date with the latest state in case it has changed while it's
// in the cache.
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
lb.handleSubchannelState(subchannel, entry.state);
}
});
}
return subchannel;
}

@Override
public void returnSubchannel(Subchannel subchannel) {
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo) {
CacheEntry cached = cache.get(subchannel.getAddresses());
if (cached == null || cached.subchannel != subchannel) {
// Given subchannel is not cached. Not our responsibility.
return;
}
cached.state = newStateInfo;
}

@Override
public void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState) {
CacheEntry prev = cache.get(subchannel.getAddresses());
if (prev != null) {
// Returning the same Subchannel twice has no effect.
Expand All @@ -77,7 +99,7 @@ public void returnSubchannel(Subchannel subchannel) {
helper.getSynchronizationContext().schedule(
shutdownTask, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService());
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer);
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer, lastKnownState);
cache.put(subchannel.getAddresses(), entry);
}

Expand Down Expand Up @@ -110,10 +132,12 @@ public void run() {
private static class CacheEntry {
final Subchannel subchannel;
final ScheduledHandle shutdownTimer;
ConnectivityStateInfo state;

CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer) {
CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer, ConnectivityStateInfo state) {
this.subchannel = checkNotNull(subchannel, "subchannel");
this.shutdownTimer = checkNotNull(shutdownTimer, "shutdownTimer");
this.state = checkNotNull(state, "state");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class GrpclbLoadBalancer extends LoadBalancer {
this.time = checkNotNull(time, "time provider");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool.init(helper);
this.subchannelPool.init(helper, this);
recreateStates();
checkNotNull(grpclbState, "grpclbState");
}
Expand Down
21 changes: 16 additions & 5 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ static enum Mode {
this.mode = checkNotNull(mode, "mode");
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool =
mode == Mode.ROUND_ROBIN ? checkNotNull(subchannelPool, "subchannelPool") : null;
this.time = checkNotNull(time, "time provider");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
Expand All @@ -168,7 +169,13 @@ static enum Mode {
}

void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
if (newState.getState() == SHUTDOWN || !subchannels.values().contains(subchannel)) {
if (newState.getState() == SHUTDOWN) {
return;
}
if (!subchannels.values().contains(subchannel)) {
if (subchannelPool != null ) {
subchannelPool.handleSubchannelState(subchannel, newState);
}
return;
}
if (mode == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
Expand Down Expand Up @@ -311,8 +318,9 @@ void shutdown() {
// We close the subchannels through subchannelPool instead of helper just for convenience of
// testing.
for (Subchannel subchannel : subchannels.values()) {
subchannelPool.returnSubchannel(subchannel);
returnSubchannelToPool(subchannel);
}
subchannelPool.clear();
break;
case PICK_FIRST:
checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels);
Expand All @@ -322,7 +330,6 @@ void shutdown() {
throw new AssertionError("Missing case for " + mode);
}
subchannels = Collections.emptyMap();
subchannelPool.clear();
cancelFallbackTimer();
cancelLbRpcRetryTimer();
}
Expand All @@ -335,6 +342,10 @@ void propagateError(Status status) {
}
}

private void returnSubchannelToPool(Subchannel subchannel) {
subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get());
}

@VisibleForTesting
@Nullable
GrpclbClientLoadRecorder getLoadRecorder() {
Expand Down Expand Up @@ -383,7 +394,7 @@ private void useRoundRobinLists(
for (Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) {
List<EquivalentAddressGroup> eagList = entry.getKey();
if (!newSubchannelMap.containsKey(eagList)) {
subchannelPool.returnSubchannel(entry.getValue());
returnSubchannelToPool(entry.getValue());
}
}
subchannels = Collections.unmodifiableMap(newSubchannelMap);
Expand Down
16 changes: 12 additions & 4 deletions grpclb/src/main/java/io/grpc/grpclb/SubchannelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.grpc.grpclb;

import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -30,9 +32,9 @@
@NotThreadSafe
interface SubchannelPool {
/**
* Pass essential utilities.
* Pass essential utilities and the balancer that's using this pool.
*/
void init(Helper helper);
void init(Helper helper, LoadBalancer lb);

/**
* Takes a {@link Subchannel} from the pool for the given {@code eag} if there is one available.
Expand All @@ -41,11 +43,17 @@ interface SubchannelPool {
*/
Subchannel takeOrCreateSubchannel(EquivalentAddressGroup eag, Attributes defaultAttributes);

/**
* Gets notified about a state change of Subchannel that is possibly cached in this pool. Do
* nothing if this pool doesn't own this Subchannel.
*/
void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo);

/**
* Puts a {@link Subchannel} back to the pool. From this point the Subchannel is owned by the
* pool.
* pool, and the caller should stop referencing to this Subchannel.
*/
void returnSubchannel(Subchannel subchannel);
void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState);

/**
* Shuts down all subchannels in the pool immediately.
Expand Down

0 comments on commit 034675e

Please sign in to comment.