Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 123 additions & 90 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
Expand All @@ -42,12 +45,10 @@
import io.grpc.internal.ServiceConfigUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -113,6 +114,9 @@ static final class RoundRobinLoadBalancer extends LoadBalancer {
new HashMap<EquivalentAddressGroup, Subchannel>();
private final Random random;

private ConnectivityState currentState;
private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);

@Nullable
private StickinessState stickinessState;

Expand Down Expand Up @@ -176,50 +180,92 @@ public void handleResolvedAddressGroups(
// Shutdown subchannels for removed addresses.
for (EquivalentAddressGroup addressGroup : removedAddrs) {
Subchannel subchannel = subchannels.remove(addressGroup);
subchannel.shutdown();
shutdownSubchannel(subchannel);
}

updateBalancingState(getAggregatedState(), getAggregatedError());
updateBalancingState();
}

@Override
public void handleNameResolutionError(Status error) {
updateBalancingState(TRANSIENT_FAILURE, error);
// ready pickers aren't affected by status changes
updateBalancingState(TRANSIENT_FAILURE,
currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error));
}

@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
stickinessState.remove(subchannel);
}
if (subchannels.get(subchannel.getAddresses()) != subchannel) {
return;
}
if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
stickinessState.remove(subchannel);
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
getSubchannelStateInfoRef(subchannel).value = stateInfo;
updateBalancingState(getAggregatedState(), getAggregatedError());
updateBalancingState();
}

private void shutdownSubchannel(Subchannel subchannel) {
subchannel.shutdown();
getSubchannelStateInfoRef(subchannel).value =
ConnectivityStateInfo.forNonError(SHUTDOWN);
if (stickinessState != null) {
stickinessState.remove(subchannel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is not covered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've extended a unit test to cover this case - name resolver removal of a sticky subchannel.

}
}

@Override
public void shutdown() {
for (Subchannel subchannel : getSubchannels()) {
subchannel.shutdown();
shutdownSubchannel(subchannel);
}
}

private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");

/**
* Updates picker with the list of active subchannels (state == READY).
*/
private void updateBalancingState(ConnectivityState state, Status error) {
@SuppressWarnings("ReferenceEquality")
private void updateBalancingState() {
List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = activeList.isEmpty() ? 0 : random.nextInt(activeList.size());
helper.updateBalancingState(
state,
new Picker(activeList, error, startIndex, stickinessState));
if (activeList.isEmpty()) {
// No READY subchannels, determine aggregate state and error status
boolean isConnecting = false;
Status aggStatus = EMPTY_OK;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
// This subchannel IDLE is not because of channel IDLE_TIMEOUT,
// in which case LB is already shutdown.
// RRLB will request connection immediately on subchannel IDLE.
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
isConnecting = true;
}
if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
aggStatus = stateInfo.getStatus();
}
}
updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE,
// If all subchannels are TRANSIENT_FAILURE, return the Status associated with
// an arbitrary subchannel, otherwise return OK.
new EmptyPicker(aggStatus));
} else {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(activeList.size());
updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You never call getAggregatedState() when the state is READY. This shadows the return READY branch in that method (noticed by coveralls). Since getAggregatedState() is only called once now, probably better to move it into updateBalancingState().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now combined/collapsed both the getAggregatedState() and getAggregateError() methods into updateBalancingState().

}
}

private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) {
if (state != currentState || !picker.isEquivalentTo(currentPicker)) {
helper.updateBalancingState(state, picker);
currentState = state;
currentPicker = picker;
}
}

/**
Expand All @@ -229,7 +275,7 @@ private static List<Subchannel> filterNonFailingSubchannels(
Collection<Subchannel> subchannels) {
List<Subchannel> readySubchannels = new ArrayList<Subchannel>(subchannels.size());
for (Subchannel subchannel : subchannels) {
if (getSubchannelStateInfoRef(subchannel).value.getState() == READY) {
if (isReady(subchannel)) {
readySubchannels.add(subchannel);
}
}
Expand All @@ -248,43 +294,6 @@ private static Set<EquivalentAddressGroup> stripAttrs(List<EquivalentAddressGrou
return addrs;
}

/**
* If all subchannels are TRANSIENT_FAILURE, return the Status associated with an arbitrary
* subchannel otherwise, return null.
*/
@Nullable
private Status getAggregatedError() {
Status status = null;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
if (stateInfo.getState() != TRANSIENT_FAILURE) {
return null;
}
status = stateInfo.getStatus();
}
return status;
}

private ConnectivityState getAggregatedState() {
Set<ConnectivityState> states = EnumSet.noneOf(ConnectivityState.class);
for (Subchannel subchannel : getSubchannels()) {
states.add(getSubchannelStateInfoRef(subchannel).value.getState());
}
if (states.contains(READY)) {
return READY;
}
if (states.contains(CONNECTING)) {
return CONNECTING;
}
if (states.contains(IDLE)) {
// This subchannel IDLE is not because of channel IDLE_TIMEOUT, in which case LB is already
// shutdown.
// RRLB will request connection immediately on subchannel IDLE.
return CONNECTING;
}
return TRANSIENT_FAILURE;
}

@VisibleForTesting
Collection<Subchannel> getSubchannels() {
return subchannels.values();
Expand All @@ -294,6 +303,11 @@ private static Ref<ConnectivityStateInfo> getSubchannelStateInfoRef(
Subchannel subchannel) {
return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
}

// package-private to avoid synthetic access
static boolean isReady(Subchannel subchannel) {
return getSubchannelStateInfoRef(subchannel).value.getState() == READY;
}

private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
Set<T> aCopy = new HashSet<T>(a);
Expand All @@ -312,7 +326,8 @@ Map<String, Ref<Subchannel>> getStickinessMapForTest() {
* Holds stickiness related states: The stickiness key, a registry mapping stickiness values to
* the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref.
*/
private static final class StickinessState {
@VisibleForTesting
static final class StickinessState {
static final int MAX_ENTRIES = 1000;

final Key<String> key;
Expand All @@ -332,7 +347,7 @@ private static final class StickinessState {
*/
@Nonnull
Subchannel maybeRegister(
String stickinessValue, @Nonnull Subchannel subchannel, List<Subchannel> rrList) {
String stickinessValue, @Nonnull Subchannel subchannel) {
final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
while (true) {
Ref<Subchannel> existingSubchannelRef =
Expand All @@ -344,7 +359,7 @@ Subchannel maybeRegister(
} else {
// existing entry
Subchannel existingSubchannel = existingSubchannelRef.value;
if (existingSubchannel != null && rrList.contains(existingSubchannel)) {
if (existingSubchannel != null && isReady(existingSubchannel)) {
return existingSubchannel;
}
}
Expand Down Expand Up @@ -384,59 +399,49 @@ Subchannel getSubchannel(String stickinessValue) {
}
}
}

// Only subclasses are ReadyPicker or EmptyPicker
private abstract static class RoundRobinPicker extends SubchannelPicker {
abstract boolean isEquivalentTo(RoundRobinPicker picker);
}

@VisibleForTesting
static final class Picker extends SubchannelPicker {
private static final AtomicIntegerFieldUpdater<Picker> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(Picker.class, "index");
static final class ReadyPicker extends RoundRobinPicker {
private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");

@Nullable
private final Status status;
private final List<Subchannel> list;
private final List<Subchannel> list; // non-empty
@Nullable
private final RoundRobinLoadBalancer.StickinessState stickinessState;
@SuppressWarnings("unused")
private volatile int index;

Picker(
List<Subchannel> list, @Nullable Status status, int startIndex,
ReadyPicker(List<Subchannel> list, int startIndex,
@Nullable RoundRobinLoadBalancer.StickinessState stickinessState) {
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.status = status;
this.stickinessState = stickinessState;
this.index = startIndex - 1;
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (list.size() > 0) {
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !list.contains(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel(), list);
}
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
}
}

return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}

if (status != null) {
return PickResult.withError(status);
}

return PickResult.withNoResult();
return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}

private Subchannel nextSubchannel() {
int size = list.size();
if (size == 0) {
throw new NoSuchElementException();
}

int i = indexUpdater.incrementAndGet(this);
if (i >= size) {
int oldi = i;
Expand All @@ -451,9 +456,37 @@ List<Subchannel> getList() {
return list;
}

@VisibleForTesting
Status getStatus() {
return status;
@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
if (!(picker instanceof ReadyPicker)) {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not covered.

}
ReadyPicker other = (ReadyPicker) picker;
// the lists cannot contain duplicate subchannels
return other == this || (stickinessState == other.stickinessState
&& list.size() == other.list.size()
&& new HashSet<Subchannel>(list).containsAll(other.list));
}
}

@VisibleForTesting
static final class EmptyPicker extends RoundRobinPicker {

private final Status status;

EmptyPicker(@Nonnull Status status) {
this.status = Preconditions.checkNotNull(status, "status");
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status);
}

@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
|| (status.isOk() && ((EmptyPicker) picker).status.isOk()));
}
}
}
Loading