Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions grpclb/src/main/java/io/grpc/grpclb/CachedSubchannelPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.grpclb;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* A {@link SubchannelPool} that keeps returned {@link Subchannel}s for a given time before it's
* shut down by the pool.
*/
final class CachedSubchannelPool implements SubchannelPool {
private final HashMap<EquivalentAddressGroup, CacheEntry> cache =
new HashMap<EquivalentAddressGroup, CacheEntry>();

private Helper helper;
private ScheduledExecutorService timerService;

@VisibleForTesting
static final long SHUTDOWN_TIMEOUT_MS = 10000;

@Override
public void init(Helper helper, ScheduledExecutorService timerService) {
this.helper = checkNotNull(helper, "helper");
this.timerService = checkNotNull(timerService, "timerService");
}

@Override
public Subchannel takeOrCreateSubchannel(
EquivalentAddressGroup eag, Attributes defaultAttributes) {
CacheEntry entry = cache.remove(eag);
Subchannel subchannel;
if (entry == null) {
subchannel = helper.createSubchannel(eag, defaultAttributes);
} else {
subchannel = entry.subchannel;
entry.shutdownTimer.cancel(false);
}
return subchannel;
}

@Override
public void returnSubchannel(Subchannel subchannel) {
CacheEntry prev = cache.get(subchannel.getAddresses());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return immediately if prev != null && prev.subchannel == subchannel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (prev != null) {
// Returning the same Subchannel twice has no effect.
// Returning a different Subchannel for an already cached EAG will cause the
// latter Subchannel to be shutdown immediately.
if (prev.subchannel != subchannel) {
subchannel.shutdown();
}
return;
}
final ShutdownSubchannelTask shutdownTask = new ShutdownSubchannelTask(subchannel);
ScheduledFuture<?> shutdownTimer =
timerService.schedule(
new ShutdownSubchannelScheduledTask(shutdownTask),
SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
shutdownTask.timer = shutdownTimer;
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer);
cache.put(subchannel.getAddresses(), entry);
}

@Override
public void clear() {
for (CacheEntry entry : cache.values()) {
entry.shutdownTimer.cancel(false);
entry.subchannel.shutdown();
}
cache.clear();
}

@VisibleForTesting
final class ShutdownSubchannelScheduledTask implements Runnable {
private final ShutdownSubchannelTask task;

ShutdownSubchannelScheduledTask(ShutdownSubchannelTask task) {
this.task = checkNotNull(task, "task");
}

@Override
public void run() {
helper.runSerialized(task);
}
}

@VisibleForTesting
final class ShutdownSubchannelTask implements Runnable {
private final Subchannel subchannel;
private ScheduledFuture<?> timer;

private ShutdownSubchannelTask(Subchannel subchannel) {
this.subchannel = checkNotNull(subchannel, "subchannel");
}

// This runs in channelExecutor
@Override
public void run() {
// getSubchannel() may have cancelled the timer after the timer has expired but before this
// task is actually run in the channelExecutor.
if (!timer.isCancelled()) {
CacheEntry entry = cache.remove(subchannel.getAddresses());
checkState(entry.subchannel == subchannel, "Inconsistent state");
subchannel.shutdown();
}
}
}

private static class CacheEntry {
final Subchannel subchannel;
final ScheduledFuture<?> shutdownTimer;

CacheEntry(Subchannel subchannel, ScheduledFuture<?> shutdownTimer) {
this.subchannel = checkNotNull(subchannel, "subchannel");
this.shutdownTimer = checkNotNull(shutdownTimer, "shutdownTimer");
}
}
}
8 changes: 6 additions & 2 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
private final LogId logId = LogId.allocate(getClass().getName());

private final Helper helper;
private final SubchannelPool subchannelPool;
private final Factory pickFirstBalancerFactory;
private final Factory roundRobinBalancerFactory;
private final ObjectPool<ScheduledExecutorService> timerServicePool;
Expand All @@ -67,7 +68,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
@Nullable
private GrpclbState grpclbState;

GrpclbLoadBalancer(Helper helper, Factory pickFirstBalancerFactory,
GrpclbLoadBalancer(Helper helper, SubchannelPool subchannelPool, Factory pickFirstBalancerFactory,
Factory roundRobinBalancerFactory, ObjectPool<ScheduledExecutorService> timerServicePool,
TimeProvider time) {
this.helper = checkNotNull(helper, "helper");
Expand All @@ -78,6 +79,8 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool");
this.timerService = checkNotNull(timerServicePool.getObject(), "timerService");
this.time = checkNotNull(time, "time provider");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool.init(helper, timerService);
setLbPolicy(LbPolicy.GRPCLB);
}

Expand Down Expand Up @@ -159,7 +162,8 @@ private void setLbPolicy(LbPolicy newLbPolicy) {
"roundRobinBalancerFactory.newLoadBalancer()");
break;
case GRPCLB:
grpclbState = new GrpclbState(helper, time, timerService, logId);
grpclbState =
new GrpclbState(helper, subchannelPool, time, timerService, logId);
break;
default:
// Do nohting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static GrpclbLoadBalancerFactory getInstance() {
@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new GrpclbLoadBalancer(
helper, PickFirstBalancerFactory.getInstance(),
helper, new CachedSubchannelPool(), PickFirstBalancerFactory.getInstance(),
RoundRobinLoadBalancerFactory.getInstance(),
// TODO(zhangkun83): balancer sends load reporting RPCs from it, which also involves
// channelExecutor thus may also run other tasks queued in the channelExecutor. If such
Expand Down
12 changes: 9 additions & 3 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public String toString() {
private final LogId logId;
private final String serviceName;
private final Helper helper;
private final SubchannelPool subchannelPool;
private final TimeProvider time;
private final ScheduledExecutorService timerService;

Expand Down Expand Up @@ -128,10 +129,12 @@ public String toString() {

GrpclbState(
Helper helper,
SubchannelPool subchannelPool,
TimeProvider time,
ScheduledExecutorService timerService,
LogId logId) {
this.helper = checkNotNull(helper, "helper");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.time = checkNotNull(time, "time provider");
this.timerService = checkNotNull(timerService, "timerService");
this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
Expand Down Expand Up @@ -278,10 +281,13 @@ private void cancelFallbackTimer() {

void shutdown() {
shutdownLbComm();
// We close the subchannels through subchannelPool instead of helper just for convenience of
// testing.
for (Subchannel subchannel : subchannels.values()) {
subchannel.shutdown();
subchannelPool.returnSubchannel(subchannel);
}
subchannels = Collections.emptyMap();
subchannelPool.clear();
cancelFallbackTimer();
}

Expand Down Expand Up @@ -324,7 +330,7 @@ private void useRoundRobinLists(
new AtomicReference<ConnectivityStateInfo>(
ConnectivityStateInfo.forNonError(IDLE)))
.build();
subchannel = helper.createSubchannel(eag, subchannelAttrs);
subchannel = subchannelPool.takeOrCreateSubchannel(eag, subchannelAttrs);
subchannel.requestConnection();
}
newSubchannelMap.put(eag, subchannel);
Expand All @@ -343,7 +349,7 @@ private void useRoundRobinLists(
for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) {
EquivalentAddressGroup eag = entry.getKey();
if (!newSubchannelMap.containsKey(eag)) {
entry.getValue().shutdown();
subchannelPool.returnSubchannel(entry.getValue());
}
}

Expand Down
55 changes: 55 additions & 0 deletions grpclb/src/main/java/io/grpc/grpclb/SubchannelPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.grpclb;

import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.NotThreadSafe;

/**
* Manages life-cycle of Subchannels for {@link GrpclbState}.
*
* <p>All methods are run from the ChannelExecutor that the helper uses.
*/
@NotThreadSafe
interface SubchannelPool {
/**
* Pass essential utilities.
*/
void init(Helper helper, ScheduledExecutorService timerService);

/**
* Takes a {@link Subchannel} from the pool for the given {@code eag} if there is one available.
* Otherwise, creates and returns a new {@code Subchannel} with the given {@code eag} and {@code
* defaultAttributes}.
*/
Subchannel takeOrCreateSubchannel(EquivalentAddressGroup eag, Attributes defaultAttributes);

/**
* Puts a {@link Subchannel} back to the pool. From this point the Subchannel is owned by the
* pool.
*/
void returnSubchannel(Subchannel subchannel);

/**
* Shuts down all subchannels in the pool immediately.
*/
void clear();
}
Loading