From d8397247a62a3da2084fafc6fc38b72e18f3c296 Mon Sep 17 00:00:00 2001 From: dapengzhang0 Date: Thu, 27 Jul 2017 10:36:58 -0700 Subject: [PATCH] core: fix the race between channel shutdown and clientCallImpl start --- .../java/io/grpc/internal/ClientCallImpl.java | 35 ++-- .../io/grpc/internal/ManagedChannelImpl.java | 190 +++++++++++++++++- .../java/io/grpc/internal/OobChannel.java | 12 +- .../io/grpc/internal/ClientCallImplTest.java | 53 +++-- 4 files changed, 242 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 564cd987aa3..dc443d813a5 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -48,8 +48,7 @@ import java.io.InputStream; import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -65,7 +64,7 @@ final class ClientCallImpl extends ClientCall { private final MethodDescriptor method; private final Executor callExecutor; private final Context context; - private volatile ScheduledFuture deadlineCancellationFuture; + private volatile Future deadlineCancellationFuture; private final boolean unaryRequest; private final CallOptions callOptions; private ClientStream stream; @@ -74,14 +73,14 @@ final class ClientCallImpl extends ClientCall { private boolean halfCloseCalled; private final ClientTransportProvider clientTransportProvider; private final CancellationListener cancellationListener = new ContextCancellationListener(); - private ScheduledExecutorService deadlineCancellationExecutor; + private final DeadlineHandler deadlineHandler; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); ClientCallImpl( MethodDescriptor method, Executor executor, CallOptions callOptions, ClientTransportProvider clientTransportProvider, - ScheduledExecutorService deadlineCancellationExecutor) { + DeadlineHandler deadlineHandler) { this.method = method; // If we know that the executor is a direct executor, we don't need to wrap it with a // SerializingExecutor. This is purely for performance reasons. @@ -95,7 +94,7 @@ final class ClientCallImpl extends ClientCall { || method.getType() == MethodType.SERVER_STREAMING; this.callOptions = callOptions; this.clientTransportProvider = clientTransportProvider; - this.deadlineCancellationExecutor = deadlineCancellationExecutor; + this.deadlineHandler = deadlineHandler; } private final class ContextCancellationListener implements CancellationListener { @@ -117,6 +116,17 @@ interface ClientTransportProvider { ClientTransport get(PickSubchannelArgs args); } + + /** + * Handler for RPC deadline. + */ + interface DeadlineHandler { + /** + * Schedules a task for deadline cancellation. + */ + Future scheduleDeadlineCancellation(Runnable command, Deadline deadline); + } + ClientCallImpl setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { this.decompressorRegistry = decompressorRegistry; return this; @@ -234,9 +244,7 @@ public void runInContext() { context.addListener(cancellationListener, directExecutor()); if (effectiveDeadline != null // If the context has the effective deadline, we don't need to schedule an extra task. - && context.getDeadline() != effectiveDeadline - // If the channel has been terminated, we don't need to schedule an extra task. - && deadlineCancellationExecutor != null) { + && context.getDeadline() != effectiveDeadline) { deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); } if (cancelListenersShouldBeRemoved) { @@ -288,7 +296,7 @@ private static void logIfContextNarrowedTimeout(long effectiveTimeout, private void removeContextListenerAndCancelDeadlineFuture() { context.removeListener(cancellationListener); - ScheduledFuture f = deadlineCancellationFuture; + Future f = deadlineCancellationFuture; if (f != null) { f.cancel(false); } @@ -310,11 +318,10 @@ public void run() { } } - private ScheduledFuture startDeadlineTimer(Deadline deadline) { + private Future startDeadlineTimer(final Deadline deadline) { long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); - return deadlineCancellationExecutor.schedule( - new LogExceptionRunnable( - new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS); + return deadlineHandler.scheduleDeadlineCancellation( + new LogExceptionRunnable(new DeadlineTimer(remainingNanos)), deadline); } @Nullable diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index bcd681e8eaf..a3ae23074cc 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -35,6 +35,7 @@ import io.grpc.CompressorRegistry; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; +import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; @@ -46,6 +47,7 @@ import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientCallImpl.DeadlineHandler; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -54,9 +56,13 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; @@ -90,6 +96,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI private final LoadBalancer.Factory loadBalancerFactory; private final ClientTransportFactory transportFactory; private final Executor executor; + private final Executor directFallbackExecutor = new DirectFallbackExecutor(); + private final DeadlineHandler deadlineHandler = new DeadlineHandlerImpl(); private final ObjectPool executorPool; private final ObjectPool oobExecutorPool; private final LogId logId = LogId.allocate(getClass().getName()); @@ -107,6 +115,9 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI private final BackoffPolicy.Provider backoffPolicyProvider; + private final ReadWriteLock shutdownReadWriteLock = new ReadWriteLock(); + private boolean shutdownWriteLockUsed; + /** * We delegate to this channel, so that we can have interceptors as necessary. If there aren't * any interceptors this will just be {@link RealChannel}. @@ -457,6 +468,30 @@ public ManagedChannelImpl shutdown() { return this; } + runShutdownTask(); + return this; + } + + private void runShutdownTask() { + checkState(shutdown.get(), "shutdown must be true"); + + if (shutdownReadWriteLock.tryToAcquireWriteLock()) { + if (!shutdownWriteLockUsed) { + shutdownWriteLockUsed = true; + shutdownReadWriteLock.releaseWriteLock(); + // Continue the following shutdown process. + } else { + shutdownReadWriteLock.releaseWriteLock(); + return; + } + } else { + // Either the read or the write lock is currently acquired. Quit here, and let one of the read + // lock acquisitions or the write lock acquisition take over the following shutdown process. + return; + } + + // The following is the shutdown process. It can be run only once. + // Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible. // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the @@ -478,7 +513,6 @@ public void run() { } }).drain(); log.log(Level.FINE, "[{0}] Shutting down", getLogId()); - return this; } /** @@ -535,18 +569,18 @@ public String authority() { private class RealChannel extends Channel { @Override - public ClientCall newCall(MethodDescriptor method, + public ClientCall newCall(MethodDescriptor method, CallOptions callOptions) { Executor executor = callOptions.getExecutor(); if (executor == null) { - executor = ManagedChannelImpl.this.executor; + executor = directFallbackExecutor; } return new ClientCallImpl( method, executor, callOptions, transportProvider, - terminated ? null : transportFactory.getScheduledExecutorService()) + terminated ? null : deadlineHandler) .setDecompressorRegistry(decompressorRegistry) .setCompressorRegistry(compressorRegistry); } @@ -558,6 +592,151 @@ public String authority() { } } + + /** + * Maintains a pair of associated locks. The read lock can be acquired many times as long as the + * write lock is not currently acquired. The write lock can be acquired only when no lock is + * currently acquired. Each acquisition of the locks must be released once and only once later. + * The lock is not reentrant. The lock is irrelevant to the threads in which you try to acquire or + * release. + */ + private static final class ReadWriteLock { + /** + * A positive number counts the times the read lock has been acquired, and Long.MIN_VALUE or a + * negative number close to Long.MIN_VALUE means a write lock is currently acquired. + */ + final AtomicLong readRef = new AtomicLong(); + + /** + * Try to acquire the read lock. Return true if succeeded; and false if not able to acquire it + * right now, meaning that the write lock is currently acquired. The method does not block. + */ + boolean tryToAcquireReadLock() { + return readRef.getAndIncrement() >= 0L; + } + + /** + * Release an acquisition of the read lock. + * + * @throws IllegalStateException if the read lock is not currently acquired. + */ + void releaseReadLock() { + checkState(readRef.getAndDecrement() > 0L); + } + + /** + * Try to acquire the write lock. Return true if succeeded; and false if not able to acquire it + * right now, meaning that either the read or the write lock is currently acquired. The method + * does not block. + */ + boolean tryToAcquireWriteLock() { + return readRef.compareAndSet(0L, Long.MIN_VALUE); + } + + /** + * Release an acquisition of the write lock. + * + * @throws IllegalStateException if the write lock is not currently acquired. + */ + void releaseWriteLock() { + checkState(readRef.getAndSet(0L) < 0L); + } + } + + private final class DirectFallbackExecutor implements Executor { + @Override + public void execute(final Runnable command) { + if (shutdown.get()) { + command.run(); + return; + } + + if (!shutdownReadWriteLock.tryToAcquireReadLock()) { + // The write lock is currently acquired, so shutdown is in progress. + command.run(); + return; + } + if (shutdownWriteLockUsed) { + // The write lock had been acquired and then released, so shutdown is in progress. + shutdownReadWriteLock.releaseReadLock(); + command.run(); + return; + } + + final boolean[] accepted = new boolean[1]; + try { + final Thread thread = Thread.currentThread(); + executor.execute(new Runnable() { + @Override + public void run() { + if (Thread.currentThread() == thread && !accepted[0]) { + // Just in case the executor is a direct one. + accepted[0] = true; + shutdownReadWriteLock.releaseReadLock(); + if (shutdown.get()) { + // May take over the shutdown process. + runShutdownTask(); + } + } + command.run(); + } + }); + } catch (RuntimeException e) { + // TODO(zdapeng): Handle exception properly + throw e; + } finally { + if (!accepted[0]) { + accepted[0] = true; + shutdownReadWriteLock.releaseReadLock(); + if (shutdown.get()) { + // May take over the shutdown process. + runShutdownTask(); + } + } + } + } + } + + private final class DeadlineHandlerImpl implements DeadlineHandler { + @Override + public Future scheduleDeadlineCancellation(Runnable command, Deadline deadline) { + if (shutdown.get()) { + return cancelledFuture(command); + } + + if (!shutdownReadWriteLock.tryToAcquireReadLock()) { + // The write lock is currently acquired, so shutdown is in progress. + command.run(); + return cancelledFuture(command); + } + if (shutdownWriteLockUsed) { + // The write lock had been acquired and then released, so shutdown is in progress. + shutdownReadWriteLock.releaseReadLock(); + return cancelledFuture(command); + } + + try { + return transportFactory.getScheduledExecutorService() + .schedule(command, deadline.timeRemaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } catch (RejectedExecutionException e) { + // TODO(zdapeng): Handle Exception properly + throw e; + } finally { + shutdownReadWriteLock.releaseReadLock(); + if (shutdown.get()) { + // May take over the shutdown process. + runShutdownTask(); + } + } + } + } + + private static Future cancelledFuture(Runnable command) { + Future future = new FutureTask(command, null); + future.cancel(false); + return future; + } + /** * Terminate the channel if termination conditions are met. */ @@ -708,8 +887,7 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, Stri // TODO(ejona): can we be even stricter? Like terminating? checkState(!terminated, "Channel is terminated"); final OobChannel oobChannel = new OobChannel( - authority, oobExecutorPool, transportFactory.getScheduledExecutorService(), - channelExecutor); + authority, oobExecutorPool, deadlineHandler, channelExecutor); final InternalSubchannel internalSubchannel = new InternalSubchannel( addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory, transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 018a7e71fc0..97387c9121a 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -31,9 +31,9 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientCallImpl.DeadlineHandler; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -56,7 +56,7 @@ final class OobChannel extends ManagedChannel implements WithLogId { private final DelayedClientTransport delayedTransport; private final ObjectPool executorPool; private final Executor executor; - private final ScheduledExecutorService deadlineCancellationExecutor; + private final DeadlineHandler deadlineHandler; private final CountDownLatch terminatedLatch = new CountDownLatch(1); private volatile boolean shutdown; @@ -72,12 +72,12 @@ public ClientTransport get(PickSubchannelArgs args) { OobChannel( String authority, ObjectPool executorPool, - ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor) { + DeadlineHandler deadlineHandler, ChannelExecutor channelExecutor) { this.authority = checkNotNull(authority, "authority"); this.executorPool = checkNotNull(executorPool, "executorPool"); this.executor = checkNotNull(executorPool.getObject(), "executor"); - this.deadlineCancellationExecutor = checkNotNull( - deadlineCancellationExecutor, "deadlineCancellationExecutor"); + this.deadlineHandler = checkNotNull( + deadlineHandler, "deadlineCancellationExecutor"); this.delayedTransport = new DelayedClientTransport(executor, channelExecutor); this.delayedTransport.start(new ManagedClientTransport.Listener() { @Override @@ -153,7 +153,7 @@ public ClientCall newCall( MethodDescriptor methodDescriptor, CallOptions callOptions) { return new ClientCallImpl(methodDescriptor, callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), - callOptions, transportProvider, deadlineCancellationExecutor); + callOptions, transportProvider, deadlineHandler); } @Override diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index abb04aa1bca..611104d4e58 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -55,6 +55,7 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientCallImpl.DeadlineHandler; import io.grpc.internal.testing.SingleMessageProducer; import io.grpc.testing.TestMethodDescriptors; import java.io.ByteArrayInputStream; @@ -65,6 +66,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -89,6 +91,13 @@ public class ClientCallImplTest { private final FakeClock fakeClock = new FakeClock(); private final ScheduledExecutorService deadlineCancellationExecutor = fakeClock.getScheduledExecutorService(); + private final DeadlineHandler deadlineHandler = new DeadlineHandler() { + @Override + public Future scheduleDeadlineCancellation(Runnable command, Deadline deadline) { + return deadlineCancellationExecutor + .schedule(command, deadline.timeRemaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } + }; private final DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance().with(new Codec.Gzip(), true); private final MethodDescriptor method = MethodDescriptor.newBuilder() @@ -149,7 +158,7 @@ public void statusPropagatedFromStreamToCallListener() { executor, baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -169,7 +178,7 @@ public void exceptionInOnMessageTakesPrecedenceOverServer() { executor, baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -204,7 +213,7 @@ public void exceptionInOnHeadersTakesPrecedenceOverServer() { executor, baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -237,7 +246,7 @@ public void exceptionInOnReadyTakesPrecedenceOverServer() { executor, baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -269,7 +278,7 @@ public void advertisedEncodingsAreSent() { MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -291,7 +300,7 @@ public void authorityPropagatedToStream() { MoreExecutors.directExecutor(), baseCallOptions.withAuthority("overridden-authority"), provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -306,7 +315,7 @@ public void callOptionsPropagatedToTransport() { MoreExecutors.directExecutor(), callOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); final Metadata metadata = new Metadata(); @@ -323,7 +332,7 @@ public void authorityNotPropagatedToStream() { // Don't provide an authority baseCallOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -418,7 +427,7 @@ public void callerContextPropagatedToListener() throws Exception { new SerializingExecutor(Executors.newSingleThreadExecutor()), baseCallOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); Context.ROOT.attach(); @@ -491,7 +500,7 @@ public void contextCancellationCancelsStream() throws Exception { new SerializingExecutor(Executors.newSingleThreadExecutor()), baseCallOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); previous.attach(); @@ -519,7 +528,7 @@ public void contextAlreadyCancelledNotifiesImmediately() throws Exception { new SerializingExecutor(Executors.newSingleThreadExecutor()), baseCallOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); previous.attach(); @@ -562,7 +571,7 @@ public void deadlineExceededBeforeCallStarted() { new SerializingExecutor(Executors.newSingleThreadExecutor()), callOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); verify(transport, times(0)) @@ -584,7 +593,7 @@ public void contextDeadlineShouldBePropagatedInMetadata() { MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); Metadata headers = new Metadata(); @@ -611,7 +620,7 @@ public void contextDeadlineShouldOverrideLargerMetadataTimeout() { MoreExecutors.directExecutor(), callOpts, provider, - deadlineCancellationExecutor); + deadlineHandler); Metadata headers = new Metadata(); @@ -638,7 +647,7 @@ public void contextDeadlineShouldNotOverrideSmallerMetadataTimeout() { MoreExecutors.directExecutor(), callOpts, provider, - deadlineCancellationExecutor); + deadlineHandler); Metadata headers = new Metadata(); @@ -663,7 +672,7 @@ public void expiredDeadlineCancelsStream_CallOptions() { MoreExecutors.directExecutor(), baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), provider, - deadlineCancellationExecutor); + deadlineHandler); call.start(callListener, new Metadata()); @@ -686,7 +695,7 @@ public void expiredDeadlineCancelsStream_Context() { MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); call.start(callListener, new Metadata()); @@ -705,7 +714,7 @@ public void streamCancelAbortsDeadlineTimer() { MoreExecutors.directExecutor(), baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), provider, - deadlineCancellationExecutor); + deadlineHandler); call.start(callListener, new Metadata()); call.cancel("canceled", null); @@ -728,7 +737,7 @@ public void timeoutShouldNotBeSet() { MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); Metadata headers = new Metadata(); @@ -744,7 +753,7 @@ public void cancelInOnMessageShouldInvokeStreamCancel() throws Exception { MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); final Exception cause = new Exception(); ClientCall.Listener callListener = new ClientCall.Listener() { @@ -780,7 +789,7 @@ public void startAddsMaxSize() { new SerializingExecutor(Executors.newSingleThreadExecutor()), callOptions, provider, - deadlineCancellationExecutor) + deadlineHandler) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -793,7 +802,7 @@ public void startAddsMaxSize() { public void getAttributes() { ClientCallImpl call = new ClientCallImpl( method, MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor); + deadlineHandler); Attributes attrs = Attributes.newBuilder().set(Key.of("fake key"), "fake value").build(); when(stream.getAttributes()).thenReturn(attrs);