From 7df7e8012aed9b6916c4ff6deda7c5c31e6862da Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 15 Aug 2018 10:37:26 -0700 Subject: [PATCH 01/14] Implement flush coalescing in OkHttp. Implementation is skipping flush task if another flush is queued. --- .../java/io/grpc/okhttp/AsyncFrameWriter.java | 11 +- .../io/grpc/okhttp/AsyncFrameWriterTest.java | 154 ++++++++++++++++++ 2 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java diff --git a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java index 4c9aed597a1..83c4ac88682 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java +++ b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.Socket; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import okio.Buffer; @@ -37,6 +38,7 @@ class AsyncFrameWriter implements FrameWriter { // just waiting on each other. private final SerializingExecutor executor; private final OkHttpClientTransport transport; + private final AtomicLong flushCounter = new AtomicLong(); public AsyncFrameWriter(OkHttpClientTransport transport, SerializingExecutor executor) { this.transport = transport; @@ -89,10 +91,17 @@ public void doRun() throws IOException { @Override public void flush() { + // keep track of version of flushes to skip flush if another flush task is queued. + final long flushCount = flushCounter.incrementAndGet(); + executor.execute(new WriteRunnable() { @Override public void doRun() throws IOException { - frameWriter.flush(); + // There can be a flush starvation if there are continuous flood of flush is queued, this + // is not an issue with OkHttp since it flushes if the buffer is full. + if (flushCounter.get() == flushCount) { + frameWriter.flush(); + } } }); } diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java new file mode 100644 index 00000000000..f385cfdfbe1 --- /dev/null +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -0,0 +1,154 @@ +/* + * Copyright 2015 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.okhttp; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import io.grpc.internal.SerializingExecutor; +import io.grpc.okhttp.internal.framed.FrameWriter; +import java.io.IOException; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AsyncFrameWriterTest { + + @Mock private OkHttpClientTransport transport; + @Mock private Socket socket; + @Mock(answer = Answers.CALLS_REAL_METHODS) + private PartiallyTrackingFrameWriter frameWriter; + + private QueueingExecutor queueingExecutor; + private AsyncFrameWriter asyncFrameWriter; + + @Before + public void setUp() throws Exception { + queueingExecutor = new QueueingExecutor(); + asyncFrameWriter = + spy(new AsyncFrameWriter(transport, new SerializingExecutor(queueingExecutor))); + asyncFrameWriter.becomeConnected(frameWriter, socket); + } + + @After + public void tearDown() throws Exception { + asyncFrameWriter.close(); + } + + @Test + public void noCoalesceRequired() throws IOException { + asyncFrameWriter.ping(true, 0, 1); + asyncFrameWriter.flush(); + queueingExecutor.runAll(); + + verify(frameWriter, times(1)).ping(anyBoolean(), anyInt(), anyInt()); + verify(asyncFrameWriter, times(1)).flush(); + verify(frameWriter, times(1)).flush(); + } + + @Test + public void flushCoalescing_shouldNotMergeTwoDistinctFlushes() throws IOException { + asyncFrameWriter.ping(true, 0, 1); + asyncFrameWriter.flush(); + queueingExecutor.runAll(); + + asyncFrameWriter.ping(true, 0, 2); + asyncFrameWriter.flush(); + queueingExecutor.runAll(); + + verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); + verify(asyncFrameWriter, times(2)).flush(); + verify(frameWriter, times(2)).flush(); + } + + @Test + public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException { + asyncFrameWriter.ping(true, 0, 1); + asyncFrameWriter.flush(); + asyncFrameWriter.ping(true, 0, 2); + asyncFrameWriter.flush(); + + queueingExecutor.runAll(); + + verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); + verify(asyncFrameWriter, times(2)).flush(); + verify(frameWriter, times(1)).flush(); + assertThat(Iterables.getLast(PartiallyTrackingFrameWriter.getAllMethodsInOrder())) + .isSameAs(PartiallyTrackingFrameWriter.Method.FLUSH); + } + + /** + * Executor queues incoming runnables instead of running it. Runnables can be invoked via {@code + * runAll} in serial order. + */ + private static class QueueingExecutor implements Executor { + + private final Queue runnables = new ConcurrentLinkedQueue(); + + @Override + public void execute(Runnable command) { + runnables.add(command); + } + + public void runAll() { + Runnable r; + while ((r = runnables.poll()) != null) { + r.run(); + } + } + } + + private abstract static class PartiallyTrackingFrameWriter implements FrameWriter { + + private static final List methods = new ArrayList(); + + static ImmutableList getAllMethodsInOrder() { + return ImmutableList.copyOf(methods); + } + + @Override + public void ping(boolean ack, int payload1, int payload2) throws IOException { + methods.add(Method.PING); + } + + @Override + public void flush() throws IOException { + methods.add(Method.FLUSH); + } + + enum Method { + PING, FLUSH + } + } +} From fd2cda8abf01d0a7f67d7098f9fcc3958446afa1 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 15 Aug 2018 11:56:11 -0700 Subject: [PATCH 02/14] remove unnecessary verify --- okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index f385cfdfbe1..5a0b779dd3f 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -73,7 +73,6 @@ public void noCoalesceRequired() throws IOException { queueingExecutor.runAll(); verify(frameWriter, times(1)).ping(anyBoolean(), anyInt(), anyInt()); - verify(asyncFrameWriter, times(1)).flush(); verify(frameWriter, times(1)).flush(); } @@ -88,7 +87,6 @@ public void flushCoalescing_shouldNotMergeTwoDistinctFlushes() throws IOExceptio queueingExecutor.runAll(); verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); - verify(asyncFrameWriter, times(2)).flush(); verify(frameWriter, times(2)).flush(); } @@ -102,7 +100,6 @@ public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException { queueingExecutor.runAll(); verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); - verify(asyncFrameWriter, times(2)).flush(); verify(frameWriter, times(1)).flush(); assertThat(Iterables.getLast(PartiallyTrackingFrameWriter.getAllMethodsInOrder())) .isSameAs(PartiallyTrackingFrameWriter.Method.FLUSH); From 857c478830d8fd3e74c544058c02efff4c6d80b8 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 15 Aug 2018 11:57:38 -0700 Subject: [PATCH 03/14] fix copywrite year --- okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index 5a0b779dd3f..5c39cb123d6 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 The gRPC Authors + * Copyright 2018 The gRPC Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 5e181b816cab9a6b7b21f7030504ee8ff677f212 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 15 Aug 2018 12:03:01 -0700 Subject: [PATCH 04/14] change the message name to be clear that it returns called method. --- okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index 5c39cb123d6..813a87871a9 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -101,7 +101,7 @@ public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException { verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); verify(frameWriter, times(1)).flush(); - assertThat(Iterables.getLast(PartiallyTrackingFrameWriter.getAllMethodsInOrder())) + assertThat(Iterables.getLast(PartiallyTrackingFrameWriter.getAllInvokedMethodsInOrder())) .isSameAs(PartiallyTrackingFrameWriter.Method.FLUSH); } @@ -130,7 +130,7 @@ private abstract static class PartiallyTrackingFrameWriter implements FrameWrite private static final List methods = new ArrayList(); - static ImmutableList getAllMethodsInOrder() { + static ImmutableList getAllInvokedMethodsInOrder() { return ImmutableList.copyOf(methods); } From 8a8a4a534cea8a401cf622acd5c1b9acac812244 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 15 Aug 2018 12:52:24 -0700 Subject: [PATCH 05/14] remove spy, add teardown for the partial mock --- .../io/grpc/okhttp/AsyncFrameWriterTest.java | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index 813a87871a9..fd08c0d1693 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -19,8 +19,8 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; import com.google.common.collect.ImmutableList; @@ -28,6 +28,8 @@ import io.grpc.internal.SerializingExecutor; import io.grpc.okhttp.internal.framed.FrameWriter; import java.io.IOException; +import java.io.OutputStream; +import java.io.PipedOutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.List; @@ -52,18 +54,21 @@ public class AsyncFrameWriterTest { private QueueingExecutor queueingExecutor; private AsyncFrameWriter asyncFrameWriter; + private OutputStream outputStream; @Before public void setUp() throws Exception { queueingExecutor = new QueueingExecutor(); - asyncFrameWriter = - spy(new AsyncFrameWriter(transport, new SerializingExecutor(queueingExecutor))); + asyncFrameWriter = new AsyncFrameWriter(transport, new SerializingExecutor(queueingExecutor)); asyncFrameWriter.becomeConnected(frameWriter, socket); + outputStream = new PipedOutputStream(); + when(socket.getOutputStream()).thenReturn(outputStream); } @After public void tearDown() throws Exception { asyncFrameWriter.close(); + PartiallyTrackingFrameWriter.clearMethodHistory(); } @Test @@ -101,7 +106,7 @@ public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException { verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); verify(frameWriter, times(1)).flush(); - assertThat(Iterables.getLast(PartiallyTrackingFrameWriter.getAllInvokedMethodsInOrder())) + assertThat(Iterables.getLast(PartiallyTrackingFrameWriter.getAllInvokedMethodsHistory())) .isSameAs(PartiallyTrackingFrameWriter.Method.FLUSH); } @@ -128,20 +133,24 @@ public void runAll() { private abstract static class PartiallyTrackingFrameWriter implements FrameWriter { - private static final List methods = new ArrayList(); + private static final List methodHistory = new ArrayList(); - static ImmutableList getAllInvokedMethodsInOrder() { - return ImmutableList.copyOf(methods); + static void clearMethodHistory() { + methodHistory.clear(); + } + + static ImmutableList getAllInvokedMethodsHistory() { + return ImmutableList.copyOf(methodHistory); } @Override public void ping(boolean ack, int payload1, int payload2) throws IOException { - methods.add(Method.PING); + methodHistory.add(Method.PING); } @Override public void flush() throws IOException { - methods.add(Method.FLUSH); + methodHistory.add(Method.FLUSH); } enum Method { From 26ef4990b161344d192900a280d36183e8d02f47 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 15 Aug 2018 14:34:15 -0700 Subject: [PATCH 06/14] using mockito inorder --- .../io/grpc/okhttp/AsyncFrameWriterTest.java | 52 +++---------------- 1 file changed, 6 insertions(+), 46 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index fd08c0d1693..f0846753220 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -16,23 +16,16 @@ package io.grpc.okhttp; -import static com.google.common.truth.Truth.assertThat; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import io.grpc.internal.SerializingExecutor; import io.grpc.okhttp.internal.framed.FrameWriter; import java.io.IOException; -import java.io.OutputStream; -import java.io.PipedOutputStream; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -40,7 +33,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Answers; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -49,26 +42,21 @@ public class AsyncFrameWriterTest { @Mock private OkHttpClientTransport transport; @Mock private Socket socket; - @Mock(answer = Answers.CALLS_REAL_METHODS) - private PartiallyTrackingFrameWriter frameWriter; + @Mock private FrameWriter frameWriter; private QueueingExecutor queueingExecutor; private AsyncFrameWriter asyncFrameWriter; - private OutputStream outputStream; @Before public void setUp() throws Exception { queueingExecutor = new QueueingExecutor(); asyncFrameWriter = new AsyncFrameWriter(transport, new SerializingExecutor(queueingExecutor)); asyncFrameWriter.becomeConnected(frameWriter, socket); - outputStream = new PipedOutputStream(); - when(socket.getOutputStream()).thenReturn(outputStream); } @After public void tearDown() throws Exception { asyncFrameWriter.close(); - PartiallyTrackingFrameWriter.clearMethodHistory(); } @Test @@ -104,10 +92,9 @@ public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException { queueingExecutor.runAll(); - verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); - verify(frameWriter, times(1)).flush(); - assertThat(Iterables.getLast(PartiallyTrackingFrameWriter.getAllInvokedMethodsHistory())) - .isSameAs(PartiallyTrackingFrameWriter.Method.FLUSH); + InOrder inOrder = inOrder(frameWriter); + inOrder.verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); + inOrder.verify(frameWriter).flush(); } /** @@ -130,31 +117,4 @@ public void runAll() { } } } - - private abstract static class PartiallyTrackingFrameWriter implements FrameWriter { - - private static final List methodHistory = new ArrayList(); - - static void clearMethodHistory() { - methodHistory.clear(); - } - - static ImmutableList getAllInvokedMethodsHistory() { - return ImmutableList.copyOf(methodHistory); - } - - @Override - public void ping(boolean ack, int payload1, int payload2) throws IOException { - methodHistory.add(Method.PING); - } - - @Override - public void flush() throws IOException { - methodHistory.add(Method.FLUSH); - } - - enum Method { - PING, FLUSH - } - } } From 42590dc2a08be1cd41054bc6877aca89b87f016a Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 15 Aug 2018 15:26:10 -0700 Subject: [PATCH 07/14] fix javadoc --- okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index f0846753220..e83bf7f32d4 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -98,8 +98,8 @@ public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException { } /** - * Executor queues incoming runnables instead of running it. Runnables can be invoked via {@code - * runAll} in serial order. + * Executor queues incoming runnables instead of running it. Runnables can be invoked via {@link + * QueueingExecutor#runAll} in serial order. */ private static class QueueingExecutor implements Executor { From db4a260dd580e630f841f5f2600967b4ba05b4f9 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Thu, 16 Aug 2018 14:24:24 -0700 Subject: [PATCH 08/14] Make OkHttpClientTransport extends new interface TransportExceptionHandler which makes AsyncFrameWriter accept the interface rather than a transport. --- .../okhttp/TransportExceptionHandler.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java diff --git a/okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java b/okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java new file mode 100644 index 00000000000..043d4314833 --- /dev/null +++ b/okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java @@ -0,0 +1,24 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.okhttp; + +/** A class that handles transport exception. */ +interface TransportExceptionHandler { + + /** Handles exception. */ + void onException(Throwable throwable); +} From 0b90430c184f60adf28b152dbdf577b38b894e69 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Thu, 16 Aug 2018 17:51:52 -0700 Subject: [PATCH 09/14] move TransportExceptionHandler to inner class, add other changes From 55edfe2a467e349957f7575d13eff76ea09fdfe9 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Thu, 16 Aug 2018 17:55:42 -0700 Subject: [PATCH 10/14] Move TransportExceptionHandler to inner class of AsyncFrameWriter. From ee35e4d95b01a0c7f412917581fca8f75b8eab4e Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Thu, 16 Aug 2018 17:57:56 -0700 Subject: [PATCH 11/14] Make TransportExceptionHandler inner interface --- .../java/io/grpc/okhttp/AsyncFrameWriter.java | 18 +++++++++---- .../io/grpc/okhttp/OkHttpClientTransport.java | 6 +++-- .../okhttp/TransportExceptionHandler.java | 24 ----------------- .../io/grpc/okhttp/AsyncFrameWriterTest.java | 27 +++++++++++++++---- 4 files changed, 39 insertions(+), 36 deletions(-) delete mode 100644 okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java diff --git a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java index 83c4ac88682..3049c5bd64f 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java +++ b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java @@ -37,11 +37,12 @@ class AsyncFrameWriter implements FrameWriter { // Although writes are thread-safe, we serialize them to prevent consuming many Threads that are // just waiting on each other. private final SerializingExecutor executor; - private final OkHttpClientTransport transport; + private final TransportExceptionHandler transportExceptionHandler; private final AtomicLong flushCounter = new AtomicLong(); - public AsyncFrameWriter(OkHttpClientTransport transport, SerializingExecutor executor) { - this.transport = transport; + public AsyncFrameWriter( + TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) { + this.transportExceptionHandler = transportExceptionHandler; this.executor = executor; } @@ -228,9 +229,9 @@ public final void run() { } doRun(); } catch (RuntimeException e) { - transport.onException(e); + transportExceptionHandler.onException(e); } catch (Exception e) { - transport.onException(e); + transportExceptionHandler.onException(e); } } @@ -242,4 +243,11 @@ public int maxDataLength() { return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */ : frameWriter.maxDataLength(); } + + /** A class that handles transport exception. */ + interface TransportExceptionHandler { + + /** Handles exception. */ + void onException(Throwable throwable); + } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 6a3c0881ec3..27c60a80975 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -55,6 +55,7 @@ import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; +import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.FrameReader; @@ -98,7 +99,7 @@ /** * A okhttp-based {@link ConnectionClientTransport} implementation. */ -class OkHttpClientTransport implements ConnectionClientTransport { +class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler { private static final Map ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap(); private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0]; @@ -728,7 +729,8 @@ int getPendingStreamSize() { /** * Finish all active streams due to an IOException, then close the transport. */ - void onException(Throwable failureCause) { + @Override + public void onException(Throwable failureCause) { Preconditions.checkNotNull(failureCause, "failureCause"); Status status = Status.UNAVAILABLE.withCause(failureCause); startGoAway(0, ErrorCode.INTERNAL_ERROR, status); diff --git a/okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java b/okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java deleted file mode 100644 index 043d4314833..00000000000 --- a/okhttp/src/main/java/io/grpc/okhttp/TransportExceptionHandler.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2018 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.okhttp; - -/** A class that handles transport exception. */ -interface TransportExceptionHandler { - - /** Handles exception. */ - void onException(Throwable throwable); -} diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index e83bf7f32d4..779b7224d05 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -22,7 +22,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; +import io.grpc.Status; import io.grpc.internal.SerializingExecutor; +import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler; import io.grpc.okhttp.internal.framed.FrameWriter; import java.io.IOException; import java.net.Socket; @@ -40,23 +42,25 @@ @RunWith(MockitoJUnitRunner.class) public class AsyncFrameWriterTest { - @Mock private OkHttpClientTransport transport; @Mock private Socket socket; @Mock private FrameWriter frameWriter; - private QueueingExecutor queueingExecutor; + private QueueingExecutor queueingExecutor = new QueueingExecutor(); + private TransportExceptionHandler transportExceptionHandler = + new EscalatingTransportErrorHandler(); + private AsyncFrameWriter asyncFrameWriter; @Before public void setUp() throws Exception { - queueingExecutor = new QueueingExecutor(); - asyncFrameWriter = new AsyncFrameWriter(transport, new SerializingExecutor(queueingExecutor)); + asyncFrameWriter = + new AsyncFrameWriter(transportExceptionHandler, new SerializingExecutor(queueingExecutor)); asyncFrameWriter.becomeConnected(frameWriter, socket); } @After public void tearDown() throws Exception { - asyncFrameWriter.close(); + queueingExecutor.clear(); } @Test @@ -116,5 +120,18 @@ public void runAll() { r.run(); } } + + public void clear() { + runnables.clear(); + } + } + + /** Rethrows as Internal error. */ + private static class EscalatingTransportErrorHandler implements TransportExceptionHandler { + + @Override + public void onException(Throwable throwable) { + throw Status.INTERNAL.asRuntimeException(); + } } } From fc235bf88037664b4ca975898835b051b5e8b9c8 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 17 Aug 2018 09:59:38 -0700 Subject: [PATCH 12/14] clean up code by addressing comments --- .../io/grpc/okhttp/AsyncFrameWriterTest.java | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index 779b7224d05..5ba286b4e39 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; -import io.grpc.Status; import io.grpc.internal.SerializingExecutor; import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler; import io.grpc.okhttp.internal.framed.FrameWriter; @@ -31,7 +30,6 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,21 +46,14 @@ public class AsyncFrameWriterTest { private QueueingExecutor queueingExecutor = new QueueingExecutor(); private TransportExceptionHandler transportExceptionHandler = new EscalatingTransportErrorHandler(); - - private AsyncFrameWriter asyncFrameWriter; + private AsyncFrameWriter asyncFrameWriter = + new AsyncFrameWriter(transportExceptionHandler, new SerializingExecutor(queueingExecutor)); @Before public void setUp() throws Exception { - asyncFrameWriter = - new AsyncFrameWriter(transportExceptionHandler, new SerializingExecutor(queueingExecutor)); asyncFrameWriter.becomeConnected(frameWriter, socket); } - @After - public void tearDown() throws Exception { - queueingExecutor.clear(); - } - @Test public void noCoalesceRequired() throws IOException { asyncFrameWriter.ping(true, 0, 1); @@ -120,10 +111,6 @@ public void runAll() { r.run(); } } - - public void clear() { - runnables.clear(); - } } /** Rethrows as Internal error. */ @@ -131,7 +118,7 @@ private static class EscalatingTransportErrorHandler implements TransportExcepti @Override public void onException(Throwable throwable) { - throw Status.INTERNAL.asRuntimeException(); + throw new AssertionError(throwable); } } } From 9bf590000333cdf0b12d0361e5b5957e3c329f5d Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 17 Aug 2018 10:23:48 -0700 Subject: [PATCH 13/14] fix javadoc --- .../src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index 5ba286b4e39..1638448bd95 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -111,9 +111,13 @@ public void runAll() { r.run(); } } + + public void clear() { + runnables.clear(); + } } - /** Rethrows as Internal error. */ + /** Rethrows as Assertion error. */ private static class EscalatingTransportErrorHandler implements TransportExceptionHandler { @Override From 574db490194976e7baa8d9caf5ea46fac7ea2f40 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 17 Aug 2018 10:25:21 -0700 Subject: [PATCH 14/14] remove unused method --- okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java index 1638448bd95..479a35a78b5 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -111,10 +111,6 @@ public void runAll() { r.run(); } } - - public void clear() { - runnables.clear(); - } } /** Rethrows as Assertion error. */