Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpclb: keep track of state updates for cached Subchannels. #5441

Merged
merged 1 commit into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 30 additions & 6 deletions grpclb/src/main/java/io/grpc/grpclb/CachedSubchannelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.SynchronizationContext.ScheduledHandle;
Expand All @@ -37,31 +39,51 @@ final class CachedSubchannelPool implements SubchannelPool {
new HashMap<>();

private Helper helper;
private LoadBalancer lb;

@VisibleForTesting
static final long SHUTDOWN_TIMEOUT_MS = 10000;

@Override
public void init(Helper helper) {
public void init(Helper helper, LoadBalancer lb) {
this.helper = checkNotNull(helper, "helper");
this.lb = checkNotNull(lb, "lb");
}

@Override
public Subchannel takeOrCreateSubchannel(
EquivalentAddressGroup eag, Attributes defaultAttributes) {
CacheEntry entry = cache.remove(eag);
Subchannel subchannel;
final CacheEntry entry = cache.remove(eag);
final Subchannel subchannel;
if (entry == null) {
subchannel = helper.createSubchannel(eag, defaultAttributes);
} else {
subchannel = entry.subchannel;
entry.shutdownTimer.cancel();
// Make the balancer up-to-date with the latest state in case it has changed while it's
// in the cache.
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
lb.handleSubchannelState(subchannel, entry.state);
}
});
}
return subchannel;
}

@Override
public void returnSubchannel(Subchannel subchannel) {
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo) {
CacheEntry cached = cache.get(subchannel.getAddresses());
if (cached == null || cached.subchannel != subchannel) {
// Given subchannel is not cached. Not our responsibility.
return;
}
cached.state = newStateInfo;
}

@Override
public void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState) {
CacheEntry prev = cache.get(subchannel.getAddresses());
if (prev != null) {
// Returning the same Subchannel twice has no effect.
Expand All @@ -77,7 +99,7 @@ public void returnSubchannel(Subchannel subchannel) {
helper.getSynchronizationContext().schedule(
shutdownTask, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService());
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer);
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer, lastKnownState);
cache.put(subchannel.getAddresses(), entry);
}

Expand Down Expand Up @@ -110,10 +132,12 @@ public void run() {
private static class CacheEntry {
final Subchannel subchannel;
final ScheduledHandle shutdownTimer;
ConnectivityStateInfo state;

CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer) {
CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer, ConnectivityStateInfo state) {
this.subchannel = checkNotNull(subchannel, "subchannel");
this.shutdownTimer = checkNotNull(shutdownTimer, "shutdownTimer");
this.state = checkNotNull(state, "state");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class GrpclbLoadBalancer extends LoadBalancer {
this.time = checkNotNull(time, "time provider");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool.init(helper);
this.subchannelPool.init(helper, this);
recreateStates();
checkNotNull(grpclbState, "grpclbState");
}
Expand Down
21 changes: 16 additions & 5 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ static enum Mode {
this.mode = checkNotNull(mode, "mode");
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool =
mode == Mode.ROUND_ROBIN ? checkNotNull(subchannelPool, "subchannelPool") : null;
this.time = checkNotNull(time, "time provider");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
Expand All @@ -168,7 +169,13 @@ static enum Mode {
}

void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
if (newState.getState() == SHUTDOWN || !subchannels.values().contains(subchannel)) {
if (newState.getState() == SHUTDOWN) {
return;
}
if (!subchannels.values().contains(subchannel)) {
if (subchannelPool != null ) {
subchannelPool.handleSubchannelState(subchannel, newState);
}
return;
}
if (mode == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
Expand Down Expand Up @@ -311,8 +318,9 @@ void shutdown() {
// We close the subchannels through subchannelPool instead of helper just for convenience of
// testing.
for (Subchannel subchannel : subchannels.values()) {
subchannelPool.returnSubchannel(subchannel);
returnSubchannelToPool(subchannel);
}
subchannelPool.clear();
break;
case PICK_FIRST:
checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels);
Expand All @@ -322,7 +330,6 @@ void shutdown() {
throw new AssertionError("Missing case for " + mode);
}
subchannels = Collections.emptyMap();
subchannelPool.clear();
cancelFallbackTimer();
cancelLbRpcRetryTimer();
}
Expand All @@ -335,6 +342,10 @@ void propagateError(Status status) {
}
}

private void returnSubchannelToPool(Subchannel subchannel) {
subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get());
}

@VisibleForTesting
@Nullable
GrpclbClientLoadRecorder getLoadRecorder() {
Expand Down Expand Up @@ -383,7 +394,7 @@ private void useRoundRobinLists(
for (Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) {
List<EquivalentAddressGroup> eagList = entry.getKey();
if (!newSubchannelMap.containsKey(eagList)) {
subchannelPool.returnSubchannel(entry.getValue());
returnSubchannelToPool(entry.getValue());
}
}
subchannels = Collections.unmodifiableMap(newSubchannelMap);
Expand Down
16 changes: 12 additions & 4 deletions grpclb/src/main/java/io/grpc/grpclb/SubchannelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.grpc.grpclb;

import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -30,9 +32,9 @@
@NotThreadSafe
interface SubchannelPool {
/**
* Pass essential utilities.
* Pass essential utilities and the balancer that's using this pool.
*/
void init(Helper helper);
void init(Helper helper, LoadBalancer lb);

/**
* Takes a {@link Subchannel} from the pool for the given {@code eag} if there is one available.
Expand All @@ -41,11 +43,17 @@ interface SubchannelPool {
*/
Subchannel takeOrCreateSubchannel(EquivalentAddressGroup eag, Attributes defaultAttributes);

/**
* Gets notified about a state change of Subchannel that is possibly cached in this pool. Do
* nothing if this pool doesn't own this Subchannel.
*/
void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo);

/**
* Puts a {@link Subchannel} back to the pool. From this point the Subchannel is owned by the
* pool.
* pool, and the caller should stop referencing to this Subchannel.
*/
void returnSubchannel(Subchannel subchannel);
void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState);

/**
* Shuts down all subchannels in the pool immediately.
Expand Down