Skip to content

Commit

Permalink
util: Pass an AtomicInteger to RR's ReadyPicker
Browse files Browse the repository at this point in the history
We already do this for WRR. Notably, we are no longer trying to avoid
the modulus each pick. It was of questionable value, and removing it is
necessary to continue sharing the same integer when the list size
changes.

The change means we can implement a stronger isEquivalentTo() by
comparing the AtomicInteger references. It is strong enough that the
operation aligns with normal equals(). Using equals() instead of
isEquivalentTo() also made more obvious an equals() optimization that
uses the hashCode() before the more expensive HashSet creation; equals()
should now be very fast except when they are (very likely) equal.
  • Loading branch information
ejona86 committed Nov 27, 2023
1 parent 43e0637 commit dca89b2
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 110 deletions.
117 changes: 47 additions & 70 deletions util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Expand Up @@ -24,44 +24,38 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A {@link LoadBalancer} that provides round-robin load-balancing over the {@link
* EquivalentAddressGroup}s from the {@link NameResolver}.
*/
@Internal
public class RoundRobinLoadBalancer extends MultiChildLoadBalancer {
private final Random random;
protected RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);
private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt());
protected SubchannelPicker currentPicker = new EmptyPicker();

public RoundRobinLoadBalancer(Helper helper) {
super(helper);
this.random = new Random();
}

@Override
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
throw new UnsupportedOperationException(); // local updateOverallBalancingState doesn't use this
}

private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");

/**
* Updates picker with the list of active subchannels (state == READY).
*/
Expand All @@ -82,7 +76,7 @@ protected void updateOverallBalancingState() {
}

if (isConnecting) {
updateBalancingState(CONNECTING, new EmptyPicker(Status.OK));
updateBalancingState(CONNECTING, new EmptyPicker());
} else {
updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
}
Expand All @@ -91,45 +85,45 @@ protected void updateOverallBalancingState() {
}
}

private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) {
if (state != currentConnectivityState || !picker.isEquivalentTo(currentPicker)) {
private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
if (state != currentConnectivityState || !picker.equals(currentPicker)) {
getHelper().updateBalancingState(state, picker);
currentConnectivityState = state;
currentPicker = picker;
}
}

protected RoundRobinPicker createReadyPicker(Collection<ChildLbState> children) {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(children.size());

protected SubchannelPicker createReadyPicker(Collection<ChildLbState> children) {
List<SubchannelPicker> pickerList = new ArrayList<>();
for (ChildLbState child : children) {
SubchannelPicker picker = child.getCurrentPicker();
pickerList.add(picker);
}

return new ReadyPicker(pickerList, startIndex);
}

public abstract static class RoundRobinPicker extends SubchannelPicker {
public abstract boolean isEquivalentTo(RoundRobinPicker picker);
return new ReadyPicker(pickerList, sequence);
}

@VisibleForTesting
static class ReadyPicker extends RoundRobinPicker {
private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");

static class ReadyPicker extends SubchannelPicker {
private final List<SubchannelPicker> subchannelPickers; // non-empty
@SuppressWarnings("unused")
private volatile int index;
private final AtomicInteger index;
private final int hashCode;

public ReadyPicker(List<SubchannelPicker> list, int startIndex) {
public ReadyPicker(List<SubchannelPicker> list, AtomicInteger index) {
checkArgument(!list.isEmpty(), "empty list");
this.subchannelPickers = list;
this.index = startIndex - 1;
this.index = Preconditions.checkNotNull(index, "index");

// Every created picker is checked for equality in updateBalancingState() at least once.
// Pre-compute the hash so it can be checked cheaply. Using the hash in equals() makes it very
// fast except when the pickers are (very likely) equal.
//
// For equality we treat children as a set; use hash code as defined by Set
int sum = 0;
for (SubchannelPicker picker : subchannelPickers) {
sum += picker.hashCode();
}
this.hashCode = sum;
}

@Override
Expand All @@ -145,14 +139,8 @@ public String toString() {
}

private int nextIndex() {
int size = subchannelPickers.size();
int i = indexUpdater.incrementAndGet(this);
if (i >= size) {
int oldi = i;
i %= size;
indexUpdater.compareAndSet(this, oldi, i);
}
return i;
int i = index.getAndIncrement() & Integer.MAX_VALUE;
return i % subchannelPickers.size();
}

@VisibleForTesting
Expand All @@ -161,53 +149,42 @@ List<SubchannelPicker> getSubchannelPickers() {
}

@Override
public boolean isEquivalentTo(RoundRobinPicker picker) {
if (!(picker instanceof ReadyPicker)) {
public int hashCode() {
return hashCode;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ReadyPicker)) {
return false;
}
ReadyPicker other = (ReadyPicker) picker;
ReadyPicker other = (ReadyPicker) o;
if (other == this) {
return true;
}
// the lists cannot contain duplicate subchannels
return other == this
|| (subchannelPickers.size() == other.subchannelPickers.size() && new HashSet<>(
subchannelPickers).containsAll(other.subchannelPickers));
return hashCode == other.hashCode
&& index == other.index
&& subchannelPickers.size() == other.subchannelPickers.size()
&& new HashSet<>(subchannelPickers).containsAll(other.subchannelPickers);
}
}

@VisibleForTesting
static final class EmptyPicker extends RoundRobinPicker {

private final Status status;

EmptyPicker(@Nonnull Status status) {
this.status = Preconditions.checkNotNull(status, "status");
}

static final class EmptyPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status);
return PickResult.withNoResult();
}

@Override
public boolean isEquivalentTo(RoundRobinPicker picker) {
return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
|| (status.isOk() && ((EmptyPicker) picker).status.isOk()));
public int hashCode() {
return getClass().hashCode();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(EmptyPicker.class).add("status", status).toString();
}
}

/**
* A lighter weight Reference than AtomicReference.
*/
@VisibleForTesting
static final class Ref<T> {
T value;

Ref(T value) {
this.value = value;
public boolean equals(Object o) {
return o instanceof EmptyPicker;
}
}
}
56 changes: 26 additions & 30 deletions util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java
Expand Up @@ -23,9 +23,7 @@
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -64,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -376,22 +375,20 @@ public void pickerRoundRobin() throws Exception {
TestUtils.pickerOf(subchannel), TestUtils.pickerOf(subchannel1),
TestUtils.pickerOf(subchannel2));

ReadyPicker picker = new ReadyPicker(Collections.unmodifiableList(pickers),
0 /* startIndex */);
AtomicInteger seq = new AtomicInteger(0);
ReadyPicker picker = new ReadyPicker(Collections.unmodifiableList(pickers), seq);

assertEquals(subchannel, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel2, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel, picker.pickSubchannel(mockArgs).getSubchannel());
}

@Test
public void pickerEmptyList() throws Exception {
SubchannelPicker picker = new EmptyPicker(Status.UNKNOWN);

assertNull(picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(Status.UNKNOWN,
picker.pickSubchannel(mockArgs).getStatus());
seq.set(Integer.MAX_VALUE);
assertEquals(subchannel1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel2, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel, picker.pickSubchannel(mockArgs).getSubchannel());
}

@Test
Expand Down Expand Up @@ -481,36 +478,35 @@ public void subchannelStateIsolation() throws Exception {
public void readyPicker_emptyList() {
// ready picker list must be non-empty
try {
new ReadyPicker(Collections.emptyList(), 0);
new ReadyPicker(Collections.emptyList(), new AtomicInteger(0));
fail();
} catch (IllegalArgumentException expected) {
}
}

@Test
public void internalPickerComparisons() {
EmptyPicker emptyOk1 = new EmptyPicker(Status.OK);
EmptyPicker emptyOk2 = new EmptyPicker(Status.OK.withDescription("different OK"));
EmptyPicker emptyErr = new EmptyPicker(Status.UNKNOWN.withDescription(\\_(ツ)_//¯"));
SubchannelPicker empty1 = new EmptyPicker();
SubchannelPicker empty2 = new EmptyPicker();

AtomicInteger seq = new AtomicInteger(0);
acceptAddresses(servers, Attributes.EMPTY); // create subchannels
Iterator<Subchannel> subchannelIterator = subchannels.values().iterator();
SubchannelPicker sc1 = TestUtils.pickerOf(subchannelIterator.next());
SubchannelPicker sc2 = TestUtils.pickerOf(subchannelIterator.next());
ReadyPicker ready1 = new ReadyPicker(Arrays.asList(sc1, sc2), 0);
ReadyPicker ready2 = new ReadyPicker(Arrays.asList(sc1), 0);
ReadyPicker ready3 = new ReadyPicker(Arrays.asList(sc2, sc1), 1);
ReadyPicker ready4 = new ReadyPicker(Arrays.asList(sc1, sc2), 1);
ReadyPicker ready5 = new ReadyPicker(Arrays.asList(sc2, sc1), 0);

assertTrue(emptyOk1.isEquivalentTo(emptyOk2));
assertFalse(emptyOk1.isEquivalentTo(emptyErr));
assertFalse(ready1.isEquivalentTo(ready2));
assertTrue(ready1.isEquivalentTo(ready3));
assertTrue(ready3.isEquivalentTo(ready4));
assertTrue(ready4.isEquivalentTo(ready5));
assertFalse(emptyOk1.isEquivalentTo(ready1));
assertFalse(ready1.isEquivalentTo(emptyOk1));
SubchannelPicker ready1 = new ReadyPicker(Arrays.asList(sc1, sc2), seq);
SubchannelPicker ready2 = new ReadyPicker(Arrays.asList(sc1), seq);
SubchannelPicker ready3 = new ReadyPicker(Arrays.asList(sc2, sc1), seq);
SubchannelPicker ready4 = new ReadyPicker(Arrays.asList(sc1, sc2), seq);
SubchannelPicker ready5 = new ReadyPicker(Arrays.asList(sc2, sc1), new AtomicInteger(0));

assertThat(empty1).isEqualTo(empty2);
assertThat(ready1).isNotEqualTo(ready2);
assertThat(ready1).isEqualTo(ready3);
assertThat(ready3).isEqualTo(ready4);
assertThat(ready4).isNotEqualTo(ready5);
assertThat(empty1).isNotEqualTo(ready1);
assertThat(ready1).isNotEqualTo(empty1);
}

@Test
Expand Down

0 comments on commit dca89b2

Please sign in to comment.