Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import okio.Buffer;
Expand All @@ -36,10 +37,12 @@ class AsyncFrameWriter implements FrameWriter {
// Although writes are thread-safe, we serialize them to prevent consuming many Threads that are
// just waiting on each other.
private final SerializingExecutor executor;
private final OkHttpClientTransport transport;
private final TransportExceptionHandler transportExceptionHandler;
private final AtomicLong flushCounter = new AtomicLong();

public AsyncFrameWriter(OkHttpClientTransport transport, SerializingExecutor executor) {
this.transport = transport;
public AsyncFrameWriter(
TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) {
this.transportExceptionHandler = transportExceptionHandler;
this.executor = executor;
}

Expand Down Expand Up @@ -89,10 +92,17 @@ public void doRun() throws IOException {

@Override
public void flush() {
// keep track of version of flushes to skip flush if another flush task is queued.
final long flushCount = flushCounter.incrementAndGet();

executor.execute(new WriteRunnable() {
@Override
public void doRun() throws IOException {
frameWriter.flush();
// There can be a flush starvation if there are continuous flood of flush is queued, this
// is not an issue with OkHttp since it flushes if the buffer is full.
if (flushCounter.get() == flushCount) {
frameWriter.flush();
}
}
});
}
Expand Down Expand Up @@ -219,9 +229,9 @@ public final void run() {
}
doRun();
} catch (RuntimeException e) {
transport.onException(e);
transportExceptionHandler.onException(e);
} catch (Exception e) {
transport.onException(e);
transportExceptionHandler.onException(e);
}
}

Expand All @@ -233,4 +243,11 @@ public int maxDataLength() {
return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */
: frameWriter.maxDataLength();
}

/** A class that handles transport exception. */
interface TransportExceptionHandler {

/** Handles exception. */
void onException(Throwable throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.FrameReader;
Expand Down Expand Up @@ -98,7 +99,7 @@
/**
* A okhttp-based {@link ConnectionClientTransport} implementation.
*/
class OkHttpClientTransport implements ConnectionClientTransport {
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler {
private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0];
Expand Down Expand Up @@ -728,7 +729,8 @@ int getPendingStreamSize() {
/**
* Finish all active streams due to an IOException, then close the transport.
*/
void onException(Throwable failureCause) {
@Override
public void onException(Throwable failureCause) {
Preconditions.checkNotNull(failureCause, "failureCause");
Status status = Status.UNAVAILABLE.withCause(failureCause);
startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
Expand Down
124 changes: 124 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.okhttp;

import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import io.grpc.internal.SerializingExecutor;
import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler;
import io.grpc.okhttp.internal.framed.FrameWriter;
import java.io.IOException;
import java.net.Socket;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class AsyncFrameWriterTest {

@Mock private Socket socket;
@Mock private FrameWriter frameWriter;

private QueueingExecutor queueingExecutor = new QueueingExecutor();
private TransportExceptionHandler transportExceptionHandler =
new EscalatingTransportErrorHandler();
private AsyncFrameWriter asyncFrameWriter =
new AsyncFrameWriter(transportExceptionHandler, new SerializingExecutor(queueingExecutor));

@Before
public void setUp() throws Exception {
asyncFrameWriter.becomeConnected(frameWriter, socket);
}

@Test
public void noCoalesceRequired() throws IOException {
asyncFrameWriter.ping(true, 0, 1);
asyncFrameWriter.flush();
queueingExecutor.runAll();

verify(frameWriter, times(1)).ping(anyBoolean(), anyInt(), anyInt());
verify(frameWriter, times(1)).flush();
}

@Test
public void flushCoalescing_shouldNotMergeTwoDistinctFlushes() throws IOException {
asyncFrameWriter.ping(true, 0, 1);
asyncFrameWriter.flush();
queueingExecutor.runAll();

asyncFrameWriter.ping(true, 0, 2);
asyncFrameWriter.flush();
queueingExecutor.runAll();

verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt());
verify(frameWriter, times(2)).flush();
}

@Test
public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException {
asyncFrameWriter.ping(true, 0, 1);
asyncFrameWriter.flush();
asyncFrameWriter.ping(true, 0, 2);
asyncFrameWriter.flush();

queueingExecutor.runAll();

InOrder inOrder = inOrder(frameWriter);
inOrder.verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt());
inOrder.verify(frameWriter).flush();
}

/**
* Executor queues incoming runnables instead of running it. Runnables can be invoked via {@link
* QueueingExecutor#runAll} in serial order.
*/
private static class QueueingExecutor implements Executor {

private final Queue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();

@Override
public void execute(Runnable command) {
runnables.add(command);
}

public void runAll() {
Runnable r;
while ((r = runnables.poll()) != null) {
r.run();
}
}
}

/** Rethrows as Assertion error. */
private static class EscalatingTransportErrorHandler implements TransportExceptionHandler {

@Override
public void onException(Throwable throwable) {
throw new AssertionError(throwable);
}
}
}