From fab86d751f752e08e5b51d296c731589ea62205a Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Tue, 3 Apr 2018 08:31:24 -0700 Subject: [PATCH 1/7] don't reschedule idle timer if it is already active --- .../io/grpc/internal/KeepAliveManager.java | 2 +- .../io/grpc/internal/ManagedChannelImpl.java | 131 +++++++++++++----- .../test/java/io/grpc/internal/FakeClock.java | 18 ++- .../ManagedChannelImplIdlenessTest.java | 2 + .../grpc/internal/ManagedChannelImplTest.java | 1 + 5 files changed, 112 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/KeepAliveManager.java b/core/src/main/java/io/grpc/internal/KeepAliveManager.java index 7311ba27d68..a6babbe33e5 100644 --- a/core/src/main/java/io/grpc/internal/KeepAliveManager.java +++ b/core/src/main/java/io/grpc/internal/KeepAliveManager.java @@ -293,7 +293,7 @@ abstract static class Ticker { public abstract long read(); } - private static class SystemTicker extends Ticker { + static class SystemTicker extends Ticker { @Override public long read() { return System.nanoTime(); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9298accd80f..1b03a5d7ef5 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -67,6 +67,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -307,27 +308,13 @@ public LogId getLogId() { // Run from channelExecutor private class IdleModeTimer implements Runnable { - // Only mutated from channelExecutor - boolean cancelled; @Override public void run() { - if (cancelled) { - // Race detected: this task was scheduled on channelExecutor before cancelIdleTimer() - // could cancel the timer. - return; - } enterIdleMode(); } } - // Must be used from channelExecutor - @Nullable - private ScheduledFuture idleModeTimerFuture; - // Must be used from channelExecutor - @Nullable - private IdleModeTimer idleModeTimer; - // Must be called from channelExecutor private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) { if (verifyActive) { @@ -360,7 +347,7 @@ void exitIdleMode() { if (inUseStateAggregator.isInUse()) { // Cancel the timer now, so that a racing due timer will not put Channel on idleness // when the caller of exitIdleMode() is about to use the returned loadBalancer. - cancelIdleTimer(); + cancelIdleTimer(false); } else { // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while // isInUse() == false, in which case we still need to schedule the timer. @@ -395,14 +382,92 @@ private void enterIdleMode() { channelStateManager.gotoState(IDLE); } - // Must be run from channelExecutor - private void cancelIdleTimer() { - if (idleModeTimerFuture != null) { - idleModeTimerFuture.cancel(false); - idleModeTimer.cancelled = true; - idleModeTimerFuture = null; - idleModeTimer = null; + static final class Rescheduler { + private final Runnable runnable; + private final ChannelExecutor exec; + private final ScheduledExecutorService scheduler; + + private ScheduledFuture wakeUp; + + // state + private long runAt; + private boolean enabled; + + Rescheduler(Runnable r, ChannelExecutor exec, ScheduledExecutorService scheduler) { + this.runnable = r; + this.exec = exec; + this.scheduler = scheduler; + } + + // must be called from channel executor + void reschedule(long delay, TimeUnit timeUnit) { + long delayNanos = timeUnit.toNanos(delay); + long newRunAt = ticker.read() + delayNanos; + if (newRunAt - runAt < 0 || !enabled) { + enabled = true; + if (wakeUp != null) { + wakeUp.cancel(false); + } + wakeUp = scheduler.schedule(new FutureRunnable(), delayNanos, TimeUnit.NANOSECONDS); + } + runAt = newRunAt; + } + + // must be called from channel executor + void cancel(boolean permanent) { + enabled = false; + if (permanent && wakeUp != null) { + wakeUp.cancel(false); + wakeUp = null; + } + } + + private final class FutureRunnable implements Runnable { + + @Override + public void run() { + exec.executeLater(new ChannelFutureRunnable()); + exec.drain(); + } + } + + private final class ChannelFutureRunnable implements Runnable { + + @Override + public void run() { + if (!enabled) { + return; + } + long now = ticker.read(); + if (runAt - now > 0) { + wakeUp = scheduler.schedule(new FutureRunnable(), runAt - now, TimeUnit.NANOSECONDS); + } else { + enabled = false; + wakeUp = null; + runnable.run(); + } + } + } + + /** Time source representing nanoseconds since fixed but arbitrary point in time. */ + interface Ticker { + /** Returns the number of nanoseconds since this source's epoch. */ + long read(); } + + @VisibleForTesting + static Ticker ticker = new Ticker() { + + @Override + public long read() { + return System.nanoTime(); + } + }; + } + + // Must be run from channelExecutor + private void cancelIdleTimer(boolean permanent) { + idleTimer.cancel(permanent); } // Always run from channelExecutor @@ -410,16 +475,7 @@ private void rescheduleIdleTimer() { if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { return; } - cancelIdleTimer(); - idleModeTimer = new IdleModeTimer(); - idleModeTimerFuture = transportFactory.getScheduledExecutorService().schedule( - new LogExceptionRunnable(new Runnable() { - @Override - public void run() { - channelExecutor.executeLater(idleModeTimer).drain(); - } - }), - idleTimeoutMillis, TimeUnit.MILLISECONDS); + idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS); } // Run from channelExecutor @@ -537,6 +593,8 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new } }; + private final Rescheduler idleTimer; + ManagedChannelImpl( AbstractManagedChannelImplBuilder builder, ClientTransportFactory clientTransportFactory, @@ -570,6 +628,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { this.idleTimeoutMillis = builder.idleTimeoutMillis; + } else { checkArgument( builder.idleTimeoutMillis @@ -577,6 +636,8 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis); this.idleTimeoutMillis = builder.idleTimeoutMillis; } + idleTimer = new Rescheduler( + new IdleModeTimer(), channelExecutor, transportFactory.getScheduledExecutorService()); this.fullStreamDecompression = builder.fullStreamDecompression; this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry"); this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry"); @@ -666,7 +727,7 @@ public void run() { channelExecutor.executeLater(new Runnable() { @Override public void run() { - cancelIdleTimer(); + cancelIdleTimer(/* permanent= */ true); } }).drain(); logger.log(Level.FINE, "[{0}] Shutting down", getLogId()); @@ -704,7 +765,7 @@ void panic(final Throwable t) { return; } panicMode = true; - cancelIdleTimer(); + cancelIdleTimer(/* permanent= */ true); shutdownNameResolverAndLoadBalancer(false); SubchannelPicker newPicker = new SubchannelPicker() { final PickResult panicPickResult = @@ -868,7 +929,7 @@ public void run() { if (shutdown.get() || lbHelper == null) { return; } - cancelIdleTimer(); + cancelIdleTimer(/* permanent= */ false); enterIdleMode(); } } diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java index bb10263ebae..c764c958488 100644 --- a/core/src/test/java/io/grpc/internal/FakeClock.java +++ b/core/src/test/java/io/grpc/internal/FakeClock.java @@ -53,12 +53,14 @@ public boolean shouldAccept(Runnable command) { private final PriorityBlockingQueue tasks = new PriorityBlockingQueue(); - private final Ticker ticker = - new Ticker() { - @Override public long read() { - return currentTimeNanos; - } - }; + private final class FakeTicker extends Ticker implements ManagedChannelImpl.Rescheduler.Ticker { + @Override + public long read() { + return currentTimeNanos; + } + } + + private final FakeTicker ticker = new FakeTicker(); private final Supplier stopwatchSupplier = new Supplier() { @@ -67,6 +69,10 @@ public boolean shouldAccept(Runnable command) { } }; + public ManagedChannelImpl.Rescheduler.Ticker ticker() { + return ticker; + } + private long currentTimeNanos; public class ScheduledTask extends AbstractFuture implements ScheduledFuture { diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index ba04974efbd..e726afe63a9 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -51,6 +51,7 @@ import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StringMarshaller; +import io.grpc.internal.ManagedChannelImpl.Rescheduler; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.SocketAddress; import java.net.URI; @@ -111,6 +112,7 @@ public class ManagedChannelImplIdlenessTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); + Rescheduler.ticker = timer.ticker(); when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); when(mockNameResolver.getServiceAuthority()).thenReturn(AUTHORITY); when(mockNameResolverFactory diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 33abbde1604..f6ac5ae20a8 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -269,6 +269,7 @@ builder, mockTransportFactory, new FakeBackoffPolicyProvider(), @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); + ManagedChannelImpl.Rescheduler.ticker = executor.ticker(); expectedUri = new URI(target); when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); transports = TestUtils.captureTransports(mockTransportFactory); From aa18a5bed6ec4593265f8bdd12c844f66b67fe49 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Tue, 3 Apr 2018 09:58:51 -0700 Subject: [PATCH 2/7] use wakeup rather than enabled --- core/src/main/java/io/grpc/internal/ManagedChannelImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 1b03a5d7ef5..99f3a52eb3c 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -403,7 +403,7 @@ static final class Rescheduler { void reschedule(long delay, TimeUnit timeUnit) { long delayNanos = timeUnit.toNanos(delay); long newRunAt = ticker.read() + delayNanos; - if (newRunAt - runAt < 0 || !enabled) { + if (newRunAt - runAt < 0 || wakeUp == null) { enabled = true; if (wakeUp != null) { wakeUp.cancel(false); From 1056b90b25afd79f3d16e81cbc21a1ead96354f9 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 4 Apr 2018 13:35:50 -0700 Subject: [PATCH 3/7] made tests pass --- .../io/grpc/internal/ManagedChannelImpl.java | 99 +++------------ .../java/io/grpc/internal/Rescheduler.java | 119 ++++++++++++++++++ .../test/java/io/grpc/internal/FakeClock.java | 30 ++--- .../ManagedChannelImplIdlenessTest.java | 9 +- .../grpc/internal/ManagedChannelImplTest.java | 2 +- 5 files changed, 152 insertions(+), 107 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/Rescheduler.java diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 99f3a52eb3c..a9fb34ade2d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -67,7 +67,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -382,89 +381,6 @@ private void enterIdleMode() { channelStateManager.gotoState(IDLE); } - static final class Rescheduler { - private final Runnable runnable; - private final ChannelExecutor exec; - private final ScheduledExecutorService scheduler; - - private ScheduledFuture wakeUp; - - // state - private long runAt; - private boolean enabled; - - Rescheduler(Runnable r, ChannelExecutor exec, ScheduledExecutorService scheduler) { - this.runnable = r; - this.exec = exec; - this.scheduler = scheduler; - } - - // must be called from channel executor - void reschedule(long delay, TimeUnit timeUnit) { - long delayNanos = timeUnit.toNanos(delay); - long newRunAt = ticker.read() + delayNanos; - if (newRunAt - runAt < 0 || wakeUp == null) { - enabled = true; - if (wakeUp != null) { - wakeUp.cancel(false); - } - wakeUp = scheduler.schedule(new FutureRunnable(), delayNanos, TimeUnit.NANOSECONDS); - } - runAt = newRunAt; - } - - // must be called from channel executor - void cancel(boolean permanent) { - enabled = false; - if (permanent && wakeUp != null) { - wakeUp.cancel(false); - wakeUp = null; - } - } - - private final class FutureRunnable implements Runnable { - - @Override - public void run() { - exec.executeLater(new ChannelFutureRunnable()); - exec.drain(); - } - } - - private final class ChannelFutureRunnable implements Runnable { - - @Override - public void run() { - if (!enabled) { - return; - } - long now = ticker.read(); - if (runAt - now > 0) { - wakeUp = scheduler.schedule(new FutureRunnable(), runAt - now, TimeUnit.NANOSECONDS); - } else { - enabled = false; - wakeUp = null; - runnable.run(); - } - } - } - - /** Time source representing nanoseconds since fixed but arbitrary point in time. */ - interface Ticker { - /** Returns the number of nanoseconds since this source's epoch. */ - long read(); - } - - @VisibleForTesting - static Ticker ticker = new Ticker() { - - @Override - public long read() { - return System.nanoTime(); - } - }; - } - // Must be run from channelExecutor private void cancelIdleTimer(boolean permanent) { idleTimer.cancel(permanent); @@ -636,8 +552,21 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis); this.idleTimeoutMillis = builder.idleTimeoutMillis; } + + final class AutoDrainChannelExecutor implements Executor { + + @Override + public void execute(Runnable command) { + channelExecutor.executeLater(command); + channelExecutor.drain(); + } + } + idleTimer = new Rescheduler( - new IdleModeTimer(), channelExecutor, transportFactory.getScheduledExecutorService()); + new IdleModeTimer(), + new AutoDrainChannelExecutor(), + transportFactory.getScheduledExecutorService(), + stopwatchSupplier.get()); this.fullStreamDecompression = builder.fullStreamDecompression; this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry"); this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry"); diff --git a/core/src/main/java/io/grpc/internal/Rescheduler.java b/core/src/main/java/io/grpc/internal/Rescheduler.java new file mode 100644 index 00000000000..a6091ee2381 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/Rescheduler.java @@ -0,0 +1,119 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Reschedules a runnable lazily. + */ +final class Rescheduler { + + // deps + private final ScheduledExecutorService scheduler; + private final Executor serializingExecutor; + private final Runnable runnable; + + // state + private final Stopwatch stopwatch; + private long runAtNanos; + private boolean enabled; + private ScheduledFuture wakeUp; + + Rescheduler( + Runnable r, + Executor serializingExecutor, + ScheduledExecutorService scheduler, + Stopwatch stopwatch) { + this.runnable = r; + this.serializingExecutor = serializingExecutor; + this.scheduler = scheduler; + this.stopwatch = stopwatch; + stopwatch.start(); + } + + /* must be called from the {@link #serializingExecutor} originally passed in. */ + void reschedule(long delay, TimeUnit timeUnit) { + long delayNanos = timeUnit.toNanos(delay); + long newRunAtNanos = nanoTime() + delayNanos; + enabled = true; + if (newRunAtNanos - runAtNanos < 0 || wakeUp == null) { + if (wakeUp != null) { + wakeUp.cancel(false); + } + wakeUp = scheduler.schedule(new FutureRunnable(this), delayNanos, TimeUnit.NANOSECONDS); + } + runAtNanos = newRunAtNanos; + } + + // must be called from channel executor + void cancel(boolean permanent) { + enabled = false; + if (permanent && wakeUp != null) { + wakeUp.cancel(false); + wakeUp = null; + } + } + + private static final class FutureRunnable implements Runnable { + + private final Rescheduler rescheduler; + + FutureRunnable(Rescheduler rescheduler) { + this.rescheduler = rescheduler; + } + + @Override + public void run() { + rescheduler.serializingExecutor.execute(rescheduler.new ChannelFutureRunnable()); + } + } + + private final class ChannelFutureRunnable implements Runnable { + + @Override + public void run() { + if (!enabled) { + wakeUp = null; + return; + } + long now = nanoTime(); + if (runAtNanos - now > 0) { + wakeUp = scheduler.schedule( + new FutureRunnable(Rescheduler.this), runAtNanos - now, TimeUnit.NANOSECONDS); + } else { + enabled = false; + wakeUp = null; + runnable.run(); + } + } + } + + @VisibleForTesting + static boolean isEnabled(Runnable r) { + return ((FutureRunnable) r).rescheduler.enabled; + } + + private long nanoTime() { + return stopwatch.elapsed(TimeUnit.NANOSECONDS); + } +} diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java index c764c958488..c45d8fddbc2 100644 --- a/core/src/test/java/io/grpc/internal/FakeClock.java +++ b/core/src/test/java/io/grpc/internal/FakeClock.java @@ -42,25 +42,23 @@ public final class FakeClock { private static final TaskFilter ACCEPT_ALL_FILTER = new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return true; - } - }; + @Override + public boolean shouldAccept(Runnable command) { + return true; + } + }; private final ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorImpl(); private final PriorityBlockingQueue tasks = new PriorityBlockingQueue(); - private final class FakeTicker extends Ticker implements ManagedChannelImpl.Rescheduler.Ticker { - @Override - public long read() { - return currentTimeNanos; - } - } - - private final FakeTicker ticker = new FakeTicker(); + private final Ticker ticker = + new Ticker() { + @Override public long read() { + return currentTimeNanos; + } + }; private final Supplier stopwatchSupplier = new Supplier() { @@ -69,10 +67,6 @@ public long read() { } }; - public ManagedChannelImpl.Rescheduler.Ticker ticker() { - return ticker; - } - private long currentTimeNanos; public class ScheduledTask extends AbstractFuture implements ScheduledFuture { @@ -323,4 +317,4 @@ public interface TaskFilter { */ boolean shouldAccept(Runnable runnable); } -} +} \ No newline at end of file diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index e726afe63a9..d2631e179de 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -51,11 +51,12 @@ import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StringMarshaller; -import io.grpc.internal.ManagedChannelImpl.Rescheduler; +import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.SocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -112,7 +113,6 @@ public class ManagedChannelImplIdlenessTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - Rescheduler.ticker = timer.ticker(); when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); when(mockNameResolver.getServiceAuthority()).thenReturn(AUTHORITY); when(mockNameResolverFactory @@ -165,7 +165,10 @@ builder, mockTransportFactory, new FakeBackoffPolicyProvider(), @After public void allPendingTasksAreRun() { - assertEquals(timer.getPendingTasks() + " should be empty", 0, timer.numPendingTasks()); + Collection pendingTimerTasks = timer.getPendingTasks(); + for (ScheduledTask a : pendingTimerTasks) { + assertFalse(Rescheduler.isEnabled(a.command)); + } assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index f6ac5ae20a8..f5cb01bb01e 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -269,7 +269,6 @@ builder, mockTransportFactory, new FakeBackoffPolicyProvider(), @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - ManagedChannelImpl.Rescheduler.ticker = executor.ticker(); expectedUri = new URI(target); when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); transports = TestUtils.captureTransports(mockTransportFactory); @@ -330,6 +329,7 @@ public void shutdownWithNoTransportsEverCreated() { NO_INTERCEPTOR); verify(executorPool).getObject(); verify(executorPool, never()).returnObject(anyObject()); + verify(mockTransportFactory).getScheduledExecutorService(); verifyNoMoreInteractions(mockTransportFactory); channel.shutdown(); assertTrue(channel.isShutdown()); From 4ba2ebfeecf21e6c773553babcd03c6d9bda357c Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 4 Apr 2018 13:53:39 -0700 Subject: [PATCH 4/7] ad dtetss --- .../io/grpc/internal/ReschedulerTest.java | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 core/src/test/java/io/grpc/internal/ReschedulerTest.java diff --git a/core/src/test/java/io/grpc/internal/ReschedulerTest.java b/core/src/test/java/io/grpc/internal/ReschedulerTest.java new file mode 100644 index 00000000000..1b969f40564 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ReschedulerTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Rescheduler}. + */ +@RunWith(JUnit4.class) +public class ReschedulerTest { + + private final Runner runner = new Runner(); + private final Exec exec = new Exec(); + private final FakeClock scheduler = new FakeClock(); + private final Rescheduler rescheduler = new Rescheduler( + runner, + exec, + scheduler.getScheduledExecutorService(), + scheduler.getStopwatchSupplier().get()); + + @Test + public void runs() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + + scheduler.forwardNanos(1); + + assertTrue(runner.ran); + } + + @Test + public void cancels() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + rescheduler.cancel(/* permanent= */ false); + + scheduler.forwardNanos(1); + + assertFalse(runner.ran); + assertTrue(exec.executed); + } + + @Test + public void reschedules() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + rescheduler.reschedule(50, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + + scheduler.forwardNanos(1); + assertFalse(runner.ran); + assertTrue(exec.executed); + + scheduler.forwardNanos(50); + + assertTrue(runner.ran); + } + + private static final class Exec implements Executor { + boolean executed; + + @Override + public void execute(Runnable command) { + executed = true; + + command.run(); + } + } + + private static final class Runner implements Runnable { + boolean ran; + + @Override + public void run() { + ran = true; + } + } +} From 3d2d9728884fd45291df0d27b8ad8404a3944a64 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 4 Apr 2018 13:55:44 -0700 Subject: [PATCH 5/7] revert --- core/src/main/java/io/grpc/internal/KeepAliveManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/KeepAliveManager.java b/core/src/main/java/io/grpc/internal/KeepAliveManager.java index a6babbe33e5..7311ba27d68 100644 --- a/core/src/main/java/io/grpc/internal/KeepAliveManager.java +++ b/core/src/main/java/io/grpc/internal/KeepAliveManager.java @@ -293,7 +293,7 @@ abstract static class Ticker { public abstract long read(); } - static class SystemTicker extends Ticker { + private static class SystemTicker extends Ticker { @Override public long read() { return System.nanoTime(); From 3a8571794c861f9a96c0cc0ed47cf6553e1a9456 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 4 Apr 2018 15:06:47 -0700 Subject: [PATCH 6/7] revert formatting changes --- core/src/test/java/io/grpc/internal/FakeClock.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java index c45d8fddbc2..bb10263ebae 100644 --- a/core/src/test/java/io/grpc/internal/FakeClock.java +++ b/core/src/test/java/io/grpc/internal/FakeClock.java @@ -42,11 +42,11 @@ public final class FakeClock { private static final TaskFilter ACCEPT_ALL_FILTER = new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return true; - } - }; + @Override + public boolean shouldAccept(Runnable command) { + return true; + } + }; private final ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorImpl(); @@ -317,4 +317,4 @@ public interface TaskFilter { */ boolean shouldAccept(Runnable runnable); } -} \ No newline at end of file +} From 626f41beaf778d2c5b0c3bb3626929f65fdabb6a Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 4 Apr 2018 15:31:47 -0700 Subject: [PATCH 7/7] more tests --- .../io/grpc/internal/ReschedulerTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/ReschedulerTest.java b/core/src/test/java/io/grpc/internal/ReschedulerTest.java index 1b969f40564..4eef0677e3e 100644 --- a/core/src/test/java/io/grpc/internal/ReschedulerTest.java +++ b/core/src/test/java/io/grpc/internal/ReschedulerTest.java @@ -64,6 +64,19 @@ public void cancels() { assertTrue(exec.executed); } + @Test + public void cancelPermanently() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + rescheduler.cancel(/* permanent= */ true); + + scheduler.forwardNanos(1); + + assertFalse(runner.ran); + assertFalse(exec.executed); + } + @Test public void reschedules() { assertFalse(runner.ran); @@ -83,6 +96,21 @@ public void reschedules() { assertTrue(runner.ran); } + @Test + public void reschedulesShortDelay() { + assertFalse(runner.ran); + rescheduler.reschedule(50, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + + scheduler.forwardNanos(1); + assertTrue(runner.ran); + assertTrue(exec.executed); + } + private static final class Exec implements Executor { boolean executed;