Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: implement a global map for holding circuit breaker request counters #7588

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,25 @@ final class EdsLoadBalancer2 extends LoadBalancer {
private final SynchronizationContext syncContext;
private final LoadBalancerRegistry lbRegistry;
private final ThreadSafeRandom random;
private final CallCounterProvider callCounterProvider;
private final GracefulSwitchLoadBalancer switchingLoadBalancer;
private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient;
private String cluster;
private EdsLbState edsLbState;

EdsLoadBalancer2(LoadBalancer.Helper helper) {
this(helper, LoadBalancerRegistry.getDefaultRegistry(), ThreadSafeRandomImpl.instance);
this(helper, LoadBalancerRegistry.getDefaultRegistry(), ThreadSafeRandomImpl.instance,
SharedCallCounterMap.getInstance());
}

@VisibleForTesting
EdsLoadBalancer2(
LoadBalancer.Helper helper, LoadBalancerRegistry lbRegistry, ThreadSafeRandom random) {
EdsLoadBalancer2(LoadBalancer.Helper helper, LoadBalancerRegistry lbRegistry,
ThreadSafeRandom random, CallCounterProvider callCounterProvider) {
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.random = checkNotNull(random, "random");
syncContext = checkNotNull(helper, "helper").getSynchronizationContext();
this.callCounterProvider = checkNotNull(callCounterProvider, "callCounterProvider");
switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper);
InternalLogId logId = InternalLogId.allocate("eds-lb", helper.getAuthority());
logger = XdsLogger.withLogId(logId);
Expand Down Expand Up @@ -159,7 +162,7 @@ public LoadBalancer newLoadBalancer(Helper helper) {
}

private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher {
private final AtomicLong requestCount = new AtomicLong();
private final AtomicLong requestCount;
@Nullable
private final LoadStatsStore loadStatsStore;
private final RequestLimitingLbHelper lbHelper;
Expand All @@ -174,6 +177,7 @@ private final class ChildLbState extends LoadBalancer implements EdsResourceWatc
private LoadBalancer lb;

private ChildLbState(Helper helper) {
requestCount = callCounterProvider.getOrCreate(cluster, edsServiceName);
if (lrsServerName != null) {
loadStatsStore = xdsClient.addClientStats(cluster, edsServiceName);
} else {
Expand Down Expand Up @@ -487,6 +491,14 @@ public void streamClosed(Status status) {
}
}

/**
* Provides the counter for aggregating outstanding requests per cluster:eds_service_name.
*/
// Introduced for testing.
interface CallCounterProvider {
AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName);
}

@VisibleForTesting
static PriorityLbConfig generatePriorityLbConfig(
String cluster, String edsServiceName, String lrsServerName,
Expand Down
100 changes: 100 additions & 0 deletions xds/src/main/java/io/grpc/xds/SharedCallCounterMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.xds.EdsLoadBalancer2.CallCounterProvider;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/**
* The global map for holding circuit breaker atomic counters.
*/
@ThreadSafe
final class SharedCallCounterMap implements CallCounterProvider {

private final ReferenceQueue<AtomicLong> refQueue = new ReferenceQueue<>();
private final Map<String, Map<String, CounterReference>> counters;

private SharedCallCounterMap() {
this(new HashMap<String, Map<String, CounterReference>>());
}

@VisibleForTesting
SharedCallCounterMap(Map<String, Map<String, CounterReference>> counters) {
this.counters = checkNotNull(counters, "counters");
}

static SharedCallCounterMap getInstance() {
return SharedCallCounterMapHolder.instance;
}

@Override
public synchronized AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) {
Map<String, CounterReference> clusterCounters = counters.get(cluster);
if (clusterCounters == null) {
clusterCounters = new HashMap<>();
counters.put(cluster, clusterCounters);
}
CounterReference ref = clusterCounters.get(edsServiceName);
AtomicLong counter;
if (ref == null || (counter = ref.get()) == null) {
counter = new AtomicLong();
ref = new CounterReference(counter, refQueue, cluster, edsServiceName);
clusterCounters.put(edsServiceName, ref);
}
Comment on lines +61 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about avoiding assignment inside if-condition?

AtomicLong counter = null;
if (ref != null) {
  counter = ref.get();
}
if (counter == null) {
  counter = new AtomicLong();
  ref = new CounterReference(counter, refQueue, cluster, edsServiceName);
  clusterCounters.put(edsServiceName, ref);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why choose something that is more verbose? 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because assignment inside if-condition is less readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that should not be considered as a readability issue, that's what the language feature is, just like all the one liners in Python.

cleanQueue();
return counter;
}

@VisibleForTesting
void cleanQueue() {
CounterReference ref;
while ((ref = (CounterReference) refQueue.poll()) != null) {
Map<String, CounterReference> clusterCounter = counters.get(ref.cluster);
clusterCounter.remove(ref.edsServiceName);
if (clusterCounter.isEmpty()) {
counters.remove(ref.cluster);
}
}
}

@VisibleForTesting
static final class CounterReference extends WeakReference<AtomicLong> {
private final String cluster;
@Nullable
private final String edsServiceName;

CounterReference(AtomicLong counter, ReferenceQueue<AtomicLong> refQueue, String cluster,
@Nullable String edsServiceName) {
super(counter, refQueue);
this.cluster = cluster;
this.edsServiceName = edsServiceName;
}
}

private static final class SharedCallCounterMapHolder {
private static final SharedCallCounterMap instance = new SharedCallCounterMap();
}
}
11 changes: 10 additions & 1 deletion xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.grpc.internal.FakeClock;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.EdsLoadBalancer2.CallCounterProvider;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.EnvoyProtoData.DropOverload;
Expand All @@ -69,6 +70,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.junit.After;
Expand Down Expand Up @@ -135,9 +137,16 @@ public XdsClient returnObject(Object object) {
public void setUp() {
MockitoAnnotations.initMocks(this);

CallCounterProvider callCounterProvider = new CallCounterProvider() {
@Override
public AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) {
return new AtomicLong();
}
};

registry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME));
registry.register(new FakeLoadBalancerProvider(LRS_POLICY_NAME));
loadBalancer = new EdsLoadBalancer2(helper, registry, mockRandom);
loadBalancer = new EdsLoadBalancer2(helper, registry, mockRandom, callCounterProvider);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
Expand Down
59 changes: 59 additions & 0 deletions xds/src/test/java/io/grpc/xds/SharedCallCounterMapTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.truth.Truth.assertThat;

import com.google.common.testing.GcFinalization;
import io.grpc.xds.SharedCallCounterMap.CounterReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Tests for {@link SharedCallCounterMap}.
*/
@RunWith(JUnit4.class)
public class SharedCallCounterMapTest {

private static final String CLUSTER = "cluster-foo.googleapis.com";
private static final String EDS_SERVICE_NAME = null;

private final Map<String, Map<String, CounterReference>> counters = new HashMap<>();
private final SharedCallCounterMap map = new SharedCallCounterMap(counters);

@Test
public void sharedCounterInstance() {
AtomicLong counter1 = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME);
AtomicLong counter2 = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME);
assertThat(counter2).isSameInstanceAs(counter1);
}

@Test
public void autoCleanUp() {
@SuppressWarnings("UnusedVariable")
AtomicLong counter = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME);
CounterReference ref = counters.get(CLUSTER).get(EDS_SERVICE_NAME);
counter = null;
GcFinalization.awaitClear(ref);
map.cleanQueue();
assertThat(counters).isEmpty();
}
}