Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,7 @@ public void handleResolvedAddressGroups(
delegate.handleResolvedAddressGroups(newBackendServers, attributes);
break;
case GRPCLB:
if (newLbAddressGroups.isEmpty()) {
grpclbState.propagateError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no LB address while asking for GRPCLB"));
} else {
grpclbState.updateAddresses(newLbAddressGroups, newBackendServers);
}
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
break;
default:
// Do nothing
Expand Down
138 changes: 108 additions & 30 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ public String toString() {
private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");

// Once set, never go back to null.
// Reset to null when timer expires or cancelled.
@Nullable
private ScheduledFuture<?> fallbackTimer;
private FallbackModeTask fallbackTimer;
private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
private boolean fallbackTimerExpired;
private boolean receivedServerListFromBalancer;
private boolean usingFallbackBackends;
// True if the current balancer has returned a serverlist. Will be reset to false when lost
// connection to a balancer.
private boolean balancerWorking;

@Nullable
private ManagedChannel lbCommChannel;
Expand Down Expand Up @@ -141,14 +143,21 @@ void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).set(newState);
maybeStartFallbackTimer();
maybeUpdatePicker();
}

/**
* Set the new addresses of the balancer and backends, and create connection if not yet connected.
* Handle new addresses of the balancer and backends from the resolver, and create connection if
* not yet connected.
*/
void updateAddresses(
void handleAddresses(
List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) {
if (newLbAddressGroups.isEmpty()) {
propagateError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no LB address while asking for GRPCLB"));
return;
}
LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups);
startLbComm(newLbAddressGroup);
// Avoid creating a new RPC just because the addresses were updated, as it can cause a
Expand All @@ -158,30 +167,59 @@ void updateAddresses(
if (lbStream == null) {
startLbRpc();
}
// If we don't receive server list from the balancer within the timeout, we round-robin on
// the backend list from the resolver (aka fallback), until the balancer returns a server list.
fallbackBackendList = newBackendServers;
if (fallbackTimer == null) {
fallbackTimer =
timerService.schedule(new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} else {
maybeUseFallbackBackends();
maybeStartFallbackTimer();
if (usingFallbackBackends) {
// Populate the new fallback backends to round-robin list.
useFallbackBackends();
}
maybeUpdatePicker();
}

private void maybeUseFallbackBackends() {
// Only use fallback backends after fallback timer expired and before receiving server list from
// the balancer.
if (receivedServerListFromBalancer || !fallbackTimerExpired) {
/**
* Start the fallback timer if it's not already started and all connections are lost.
*/
private void maybeStartFallbackTimer() {
if (fallbackTimer != null) {
return;
}
if (fallbackBackendList.isEmpty()) {
return;
}
if (balancerWorking) {
return;
}
if (usingFallbackBackends) {
return;
}
int numReadySubchannels = 0;
for (Subchannel subchannel : subchannels.values()) {
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
numReadySubchannels++;
}
}
if (numReadySubchannels > 0) {
return;
}
logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId});
fallbackTimer = new FallbackModeTask();
fallbackTimer.schedule();
}

/**
* Populate the round-robin lists with the fallback backends.
*/
private void useFallbackBackends() {
usingFallbackBackends = true;
logger.log(Level.INFO, "[{0}] Using fallback: {1}", new Object[] {logId, fallbackBackendList});

List<DropEntry> newDropList = new ArrayList<DropEntry>();
List<BackendAddressGroup> newBackendAddrList = new ArrayList<BackendAddressGroup>();
for (EquivalentAddressGroup eag : fallbackBackendList) {
newDropList.add(null);
newBackendAddrList.add(new BackendAddressGroup(eag, null));
}
updateRoundRobinLists(newDropList, newBackendAddrList, null);
useRoundRobinLists(newDropList, newBackendAddrList, null);
}

private void shutdownLbComm() {
Expand Down Expand Up @@ -231,12 +269,20 @@ private void startLbRpc() {
}
}

private void cancelFallbackTimer() {
if (fallbackTimer != null) {
fallbackTimer.cancel();
fallbackTimer = null;
}
}

void shutdown() {
shutdownLbComm();
for (Subchannel subchannel : subchannels.values()) {
subchannel.shutdown();
}
subchannels = Collections.emptyMap();
cancelFallbackTimer();
}

void propagateError(Status status) {
Expand All @@ -257,7 +303,10 @@ GrpclbClientLoadRecorder getLoadRecorder() {
return lbStream.loadRecorder;
}

private void updateRoundRobinLists(
/**
* Populate the round-robin lists with the given values.
*/
private void useRoundRobinLists(
List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
@Nullable GrpclbClientLoadRecorder loadRecorder) {
HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
Expand Down Expand Up @@ -301,21 +350,39 @@ private void updateRoundRobinLists(
subchannels = Collections.unmodifiableMap(newSubchannelMap);
dropList = Collections.unmodifiableList(newDropList);
backendList = Collections.unmodifiableList(newBackendList);
maybeUpdatePicker();
}

@VisibleForTesting
class FallbackModeTask implements Runnable {
private ScheduledFuture<?> scheduledFuture;
// If the scheduledFuture is cancelled after the task has made it into the ChannelExecutor, the
// task will be started anyway. Use this boolean to signal that the task should not be run.
private boolean cancelled;

@Override
public void run() {
helper.runSerialized(new Runnable() {
@Override
public void run() {
fallbackTimerExpired = true;
maybeUseFallbackBackends();
if (!cancelled) {
checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
fallbackTimer = null;
useFallbackBackends();
maybeUpdatePicker();
}
}
});
}

void cancel() {
scheduledFuture.cancel(false);
cancelled = true;
}

void schedule() {
checkState(scheduledFuture == null, "FallbackModeTask already scheduled");
scheduledFuture = timerService.schedule(this, FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -443,10 +510,7 @@ private void handleResponse(LoadBalanceResponse response) {
return;
}

receivedServerListFromBalancer = true;
if (fallbackTimer != null) {
fallbackTimer.cancel(false);
}
balancerWorking = true;
// TODO(zhangkun83): handle delegate from initialResponse
ServerList serverList = response.getServerList();
List<DropEntry> newDropList = new ArrayList<DropEntry>();
Expand All @@ -470,16 +534,23 @@ private void handleResponse(LoadBalanceResponse response) {
newBackendAddrList.add(new BackendAddressGroup(eag, token));
}
}
updateRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
// Stop using fallback backends as soon as a new server list is received from the balancer.
usingFallbackBackends = false;
cancelFallbackTimer();
useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
maybeUpdatePicker();
}

private void handleStreamClosed(Status status) {
private void handleStreamClosed(Status error) {
checkArgument(!error.isOk(), "unexpected OK status");
if (closed) {
return;
}
closed = true;
cleanUp();
propagateError(status);
propagateError(error);
balancerWorking = false;
maybeStartFallbackTimer();
startLbRpc();
}

Expand Down Expand Up @@ -705,7 +776,7 @@ public boolean equals(Object other) {

@VisibleForTesting
static final class ErrorEntry implements RoundRobinEntry {
private final PickResult result;
final PickResult result;

ErrorEntry(Status status) {
result = PickResult.withError(status);
Expand All @@ -728,6 +799,13 @@ public boolean equals(Object other) {
}
return Objects.equal(result, ((ErrorEntry) other).result);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("result", result)
.toString();
}
}

@VisibleForTesting
Expand Down
Loading