-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
xds: implement a global map for holding circuit breaker request count…
…ers (#7588) Circuit breakers should be applied to clusters in the global scope. However, the LB hierarchy might cause the LB policy (currently EDS, but cluster_impl in the future) that applies circuit breaking to be duplicated. Also, for multi-channel cases, the circuit breaking threshold should still be shared across channels in the process. This change creates a global map for accessing circuit breaking atomics that used to count the number of outstanding requests per global cluster basis. Atomics in the global map are held by WeakReferences so LB policies/Pickers/StreamTracers do not need to worry about counter's lifecycle and refcount.
- Loading branch information
Showing
4 changed files
with
185 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
100 changes: 100 additions & 0 deletions
100
xds/src/main/java/io/grpc/xds/SharedCallCounterMap.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
/* | ||
* Copyright 2020 The gRPC Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.grpc.xds; | ||
|
||
import static com.google.common.base.Preconditions.checkNotNull; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import io.grpc.xds.EdsLoadBalancer2.CallCounterProvider; | ||
import java.lang.ref.ReferenceQueue; | ||
import java.lang.ref.WeakReference; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import javax.annotation.Nullable; | ||
import javax.annotation.concurrent.ThreadSafe; | ||
|
||
/** | ||
* The global map for holding circuit breaker atomic counters. | ||
*/ | ||
@ThreadSafe | ||
final class SharedCallCounterMap implements CallCounterProvider { | ||
|
||
private final ReferenceQueue<AtomicLong> refQueue = new ReferenceQueue<>(); | ||
private final Map<String, Map<String, CounterReference>> counters; | ||
|
||
private SharedCallCounterMap() { | ||
this(new HashMap<String, Map<String, CounterReference>>()); | ||
} | ||
|
||
@VisibleForTesting | ||
SharedCallCounterMap(Map<String, Map<String, CounterReference>> counters) { | ||
this.counters = checkNotNull(counters, "counters"); | ||
} | ||
|
||
static SharedCallCounterMap getInstance() { | ||
return SharedCallCounterMapHolder.instance; | ||
} | ||
|
||
@Override | ||
public synchronized AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) { | ||
Map<String, CounterReference> clusterCounters = counters.get(cluster); | ||
if (clusterCounters == null) { | ||
clusterCounters = new HashMap<>(); | ||
counters.put(cluster, clusterCounters); | ||
} | ||
CounterReference ref = clusterCounters.get(edsServiceName); | ||
AtomicLong counter; | ||
if (ref == null || (counter = ref.get()) == null) { | ||
counter = new AtomicLong(); | ||
ref = new CounterReference(counter, refQueue, cluster, edsServiceName); | ||
clusterCounters.put(edsServiceName, ref); | ||
} | ||
cleanQueue(); | ||
return counter; | ||
} | ||
|
||
@VisibleForTesting | ||
void cleanQueue() { | ||
CounterReference ref; | ||
while ((ref = (CounterReference) refQueue.poll()) != null) { | ||
Map<String, CounterReference> clusterCounter = counters.get(ref.cluster); | ||
clusterCounter.remove(ref.edsServiceName); | ||
if (clusterCounter.isEmpty()) { | ||
counters.remove(ref.cluster); | ||
} | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
static final class CounterReference extends WeakReference<AtomicLong> { | ||
private final String cluster; | ||
@Nullable | ||
private final String edsServiceName; | ||
|
||
CounterReference(AtomicLong counter, ReferenceQueue<AtomicLong> refQueue, String cluster, | ||
@Nullable String edsServiceName) { | ||
super(counter, refQueue); | ||
this.cluster = cluster; | ||
this.edsServiceName = edsServiceName; | ||
} | ||
} | ||
|
||
private static final class SharedCallCounterMapHolder { | ||
private static final SharedCallCounterMap instance = new SharedCallCounterMap(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 59 additions & 0 deletions
59
xds/src/test/java/io/grpc/xds/SharedCallCounterMapTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright 2019 The gRPC Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.grpc.xds; | ||
|
||
import static com.google.common.truth.Truth.assertThat; | ||
|
||
import com.google.common.testing.GcFinalization; | ||
import io.grpc.xds.SharedCallCounterMap.CounterReference; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.junit.runners.JUnit4; | ||
|
||
/** | ||
* Tests for {@link SharedCallCounterMap}. | ||
*/ | ||
@RunWith(JUnit4.class) | ||
public class SharedCallCounterMapTest { | ||
|
||
private static final String CLUSTER = "cluster-foo.googleapis.com"; | ||
private static final String EDS_SERVICE_NAME = null; | ||
|
||
private final Map<String, Map<String, CounterReference>> counters = new HashMap<>(); | ||
private final SharedCallCounterMap map = new SharedCallCounterMap(counters); | ||
|
||
@Test | ||
public void sharedCounterInstance() { | ||
AtomicLong counter1 = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME); | ||
AtomicLong counter2 = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME); | ||
assertThat(counter2).isSameInstanceAs(counter1); | ||
} | ||
|
||
@Test | ||
public void autoCleanUp() { | ||
@SuppressWarnings("UnusedVariable") | ||
AtomicLong counter = map.getOrCreate(CLUSTER, EDS_SERVICE_NAME); | ||
CounterReference ref = counters.get(CLUSTER).get(EDS_SERVICE_NAME); | ||
counter = null; | ||
GcFinalization.awaitClear(ref); | ||
map.cleanQueue(); | ||
assertThat(counters).isEmpty(); | ||
} | ||
} |