Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 25 additions & 35 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,27 +307,13 @@ public LogId getLogId() {

// Run from channelExecutor
private class IdleModeTimer implements Runnable {
// Only mutated from channelExecutor
boolean cancelled;

@Override
public void run() {
if (cancelled) {
// Race detected: this task was scheduled on channelExecutor before cancelIdleTimer()
// could cancel the timer.
return;
}
enterIdleMode();
}
}

// Must be used from channelExecutor
@Nullable
private ScheduledFuture<?> idleModeTimerFuture;
// Must be used from channelExecutor
@Nullable
private IdleModeTimer idleModeTimer;

// Must be called from channelExecutor
private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) {
if (verifyActive) {
Expand Down Expand Up @@ -360,7 +346,7 @@ void exitIdleMode() {
if (inUseStateAggregator.isInUse()) {
// Cancel the timer now, so that a racing due timer will not put Channel on idleness
// when the caller of exitIdleMode() is about to use the returned loadBalancer.
cancelIdleTimer();
cancelIdleTimer(false);
} else {
// exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
// isInUse() == false, in which case we still need to schedule the timer.
Expand Down Expand Up @@ -396,30 +382,16 @@ private void enterIdleMode() {
}

// Must be run from channelExecutor
private void cancelIdleTimer() {
if (idleModeTimerFuture != null) {
idleModeTimerFuture.cancel(false);
idleModeTimer.cancelled = true;
idleModeTimerFuture = null;
idleModeTimer = null;
}
private void cancelIdleTimer(boolean permanent) {
idleTimer.cancel(permanent);
}

// Always run from channelExecutor
private void rescheduleIdleTimer() {
if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
return;
}
cancelIdleTimer();
idleModeTimer = new IdleModeTimer();
idleModeTimerFuture = transportFactory.getScheduledExecutorService().schedule(
new LogExceptionRunnable(new Runnable() {
@Override
public void run() {
channelExecutor.executeLater(idleModeTimer).drain();
}
}),
idleTimeoutMillis, TimeUnit.MILLISECONDS);
idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
}

// Run from channelExecutor
Expand Down Expand Up @@ -537,6 +509,8 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
}
};

private final Rescheduler idleTimer;

ManagedChannelImpl(
AbstractManagedChannelImplBuilder<?> builder,
ClientTransportFactory clientTransportFactory,
Expand Down Expand Up @@ -570,13 +544,29 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
this.idleTimeoutMillis = builder.idleTimeoutMillis;

} else {
checkArgument(
builder.idleTimeoutMillis
>= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
"invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
this.idleTimeoutMillis = builder.idleTimeoutMillis;
}

final class AutoDrainChannelExecutor implements Executor {

@Override
public void execute(Runnable command) {
channelExecutor.executeLater(command);
channelExecutor.drain();
}
}

idleTimer = new Rescheduler(
new IdleModeTimer(),
new AutoDrainChannelExecutor(),
transportFactory.getScheduledExecutorService(),
stopwatchSupplier.get());
this.fullStreamDecompression = builder.fullStreamDecompression;
this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
Expand Down Expand Up @@ -666,7 +656,7 @@ public void run() {
channelExecutor.executeLater(new Runnable() {
@Override
public void run() {
cancelIdleTimer();
cancelIdleTimer(/* permanent= */ true);
}
}).drain();
logger.log(Level.FINE, "[{0}] Shutting down", getLogId());
Expand Down Expand Up @@ -704,7 +694,7 @@ void panic(final Throwable t) {
return;
}
panicMode = true;
cancelIdleTimer();
cancelIdleTimer(/* permanent= */ true);
shutdownNameResolverAndLoadBalancer(false);
SubchannelPicker newPicker = new SubchannelPicker() {
final PickResult panicPickResult =
Expand Down Expand Up @@ -868,7 +858,7 @@ public void run() {
if (shutdown.get() || lbHelper == null) {
return;
}
cancelIdleTimer();
cancelIdleTimer(/* permanent= */ false);
enterIdleMode();
}
}
Expand Down
119 changes: 119 additions & 0 deletions core/src/main/java/io/grpc/internal/Rescheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Reschedules a runnable lazily.
*/
final class Rescheduler {

// deps
private final ScheduledExecutorService scheduler;
private final Executor serializingExecutor;
private final Runnable runnable;

// state
private final Stopwatch stopwatch;
private long runAtNanos;
private boolean enabled;
private ScheduledFuture<?> wakeUp;

Rescheduler(
Runnable r,
Executor serializingExecutor,
ScheduledExecutorService scheduler,
Stopwatch stopwatch) {
this.runnable = r;
this.serializingExecutor = serializingExecutor;
this.scheduler = scheduler;
this.stopwatch = stopwatch;
stopwatch.start();
}

/* must be called from the {@link #serializingExecutor} originally passed in. */
void reschedule(long delay, TimeUnit timeUnit) {
long delayNanos = timeUnit.toNanos(delay);
long newRunAtNanos = nanoTime() + delayNanos;
enabled = true;
if (newRunAtNanos - runAtNanos < 0 || wakeUp == null) {
if (wakeUp != null) {
wakeUp.cancel(false);
}
wakeUp = scheduler.schedule(new FutureRunnable(this), delayNanos, TimeUnit.NANOSECONDS);
}
runAtNanos = newRunAtNanos;
}

// must be called from channel executor
void cancel(boolean permanent) {
enabled = false;
if (permanent && wakeUp != null) {
wakeUp.cancel(false);
wakeUp = null;
}
}

private static final class FutureRunnable implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why is this static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ManagedChannelImplIdlenessTest.java there is a test that checks there are no more scheduled tasks in the scheduler. It fails because the tasks are still left over, but now disabled. I wanted to make it so the test scans the tasks and checks to see they are Rescheduler Runnables, and that they are disabled.

There needs to be a reference from the Runnable back to the Rescheduler, so this became static and took an explicit reference to the outer class. This is so isEnabled below works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. You could have added another method to FutureRunnable to do something like return Rescheduler.this, but that's close to the same amount of effort. Makes sense.


private final Rescheduler rescheduler;

FutureRunnable(Rescheduler rescheduler) {
this.rescheduler = rescheduler;
}

@Override
public void run() {
rescheduler.serializingExecutor.execute(rescheduler.new ChannelFutureRunnable());
}
}

private final class ChannelFutureRunnable implements Runnable {

@Override
public void run() {
if (!enabled) {
wakeUp = null;
return;
}
long now = nanoTime();
if (runAtNanos - now > 0) {
wakeUp = scheduler.schedule(
new FutureRunnable(Rescheduler.this), runAtNanos - now, TimeUnit.NANOSECONDS);
} else {
enabled = false;
wakeUp = null;
runnable.run();
}
}
}

@VisibleForTesting
static boolean isEnabled(Runnable r) {
return ((FutureRunnable) r).rescheduler.enabled;
}

private long nanoTime() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you did there. 😄

return stopwatch.elapsed(TimeUnit.NANOSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.internal.FakeClock.ScheduledTask;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -163,7 +165,10 @@ builder, mockTransportFactory, new FakeBackoffPolicyProvider(),

@After
public void allPendingTasksAreRun() {
assertEquals(timer.getPendingTasks() + " should be empty", 0, timer.numPendingTasks());
Collection<ScheduledTask> pendingTimerTasks = timer.getPendingTasks();
for (ScheduledTask a : pendingTimerTasks) {
assertFalse(Rescheduler.isEnabled(a.command));
}
assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public void shutdownWithNoTransportsEverCreated() {
NO_INTERCEPTOR);
verify(executorPool).getObject();
verify(executorPool, never()).returnObject(anyObject());
verify(mockTransportFactory).getScheduledExecutorService();
verifyNoMoreInteractions(mockTransportFactory);
channel.shutdown();
assertTrue(channel.isShutdown());
Expand Down
Loading