From 7c82ff5c5cb1e5d4f99db0447e7e3c66a4e2f0d2 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 12 Aug 2019 12:57:29 -0700 Subject: [PATCH] factor out write path so that easy to replace with other implementation --- .../AsyncServletOutputStreamWriter.java | 240 ++++++++++++++++++ .../io/grpc/servlet/ServletServerStream.java | 193 ++------------ 2 files changed, 265 insertions(+), 168 deletions(-) create mode 100644 servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java diff --git a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java new file mode 100644 index 000000000000..10faf82c8f30 --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java @@ -0,0 +1,240 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static io.grpc.servlet.ServletServerStream.toHexString; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINEST; + +import io.grpc.InternalLogId; +import io.grpc.Status; +import io.grpc.servlet.ServletServerStream.ServletTransportState; +import java.io.IOException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Logger; +import javax.annotation.CheckReturnValue; +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; + +/** + * Handles write actions from the container thread and the application thread. + */ +final class AsyncServletOutputStreamWriter { + + private static final Logger logger = + Logger.getLogger(AsyncServletOutputStreamWriter.class.getName()); + + /** + * Memory boundary for write actions. + * + *
+   * WriteState curState = writeState.get();  // mark a boundary
+   * doSomething();  // do something with in the boundary
+   * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
+   * if (successful) {
+   *   // state has not changed since
+   *   return;
+   * } else {
+   *   // state is changed by another thread while doSomething(), need recompute
+   * }
+   * 
+ * + *

There will be two threads, the container thread (calling {@code onWritePossible()}) and the + * application thread (calling {@code runOrBufferActionItem()}) that ready and updates the + * writeState. Only onWritePossible() may turn readyAndEmpty from false to true, and only + * runOrBufferActionItem() may turn it from true to false. + */ + private final AtomicReference writeState = new AtomicReference<>(WriteState.DEFAULT); + + private final ServletOutputStream outputStream; + private final ServletTransportState transportState; + private final InternalLogId logId; + private final ActionItem flushAction; + private final ActionItem completeAction; + // SPSC queue would do + private final Queue writeChain = new ConcurrentLinkedQueue<>(); + private volatile Thread parkingThread; + + AsyncServletOutputStreamWriter( + AsyncContext asyncContext, + ServletOutputStream outputStream, + ServletTransportState transportState, + InternalLogId logId) { + this.outputStream = outputStream; + this.transportState = transportState; + this.logId = logId; + this.flushAction = () -> { + logger.log(FINEST, "[{0}] flushBuffer", logId); + asyncContext.getResponse().flushBuffer(); + }; + this.completeAction = () -> { + logger.log(FINE, "[{0}] call is completing", logId); + transportState.runOnTransportThread( + () -> { + transportState.complete(); + asyncContext.complete(); + logger.log(FINE, "[{0}] call completed", logId); + }); + }; + } + + /** Called from application thread. */ + void writeBytes(byte[] bytes, int numBytes) throws IOException { + runOrBufferActionItem( + // write bytes action + () -> { + outputStream.write(bytes, 0, numBytes); + transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes)); + if (logger.isLoggable(FINEST)) { + logger.log( + FINEST, + "[{0}] outbound data: length = {1}, bytes = {2}", + new Object[]{logId, numBytes, toHexString(bytes, numBytes)}); + } + }); + } + + /** Called from application thread. */ + void flush() throws IOException { + runOrBufferActionItem(flushAction); + } + + /** Called from application thread. */ + void complete() { + try { + runOrBufferActionItem(completeAction); + } catch (IOException e) { + // actually completeAction does not throw + throw Status.fromThrowable(e).asRuntimeException(); + } + } + + /** + * Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. + */ + void onWritePossible() throws IOException { + logger.log( + FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId); + assureReadyAndEmptyFalse(); + while (outputStream.isReady()) { + WriteState curState = writeState.get(); + + ActionItem actionItem = writeChain.poll(); + if (actionItem != null) { + actionItem.run(); + continue; + } + + if (writeState.compareAndSet(curState, curState.withReadyAndEmpty(true))) { + // state has not changed since. + logger.log( + FINEST, + "[{0}] onWritePossible: EXIT. All data available now is sent out and the servlet output" + + " stream is still ready", + logId); + return; + } + // else, state changed by another thread (runOrBufferActionItem), need to drain the writeChain + // again + } + logger.log( + FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId); + } + + private void runOrBufferActionItem(ActionItem actionItem) throws IOException { + WriteState curState = writeState.get(); + if (curState.readyAndEmpty) { // write to the outputStream directly + actionItem.run(); + if (!outputStream.isReady()) { + logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId); + boolean successful = writeState.compareAndSet(curState, curState.withReadyAndEmpty(false)); + assert successful; + LockSupport.unpark(parkingThread); + } + } else { // buffer to the writeChain + writeChain.offer(actionItem); + if (!writeState.compareAndSet(curState, curState.newItemBuffered())) { + // state changed by another thread (onWritePossible) + assert writeState.get().readyAndEmpty; + ActionItem lastItem = writeChain.poll(); + if (lastItem != null) { + assert lastItem == actionItem; + runOrBufferActionItem(lastItem); + } + } // state has not changed since + } + } + + private void assureReadyAndEmptyFalse() { + // readyAndEmpty should have been set to false already or right now + // It's very very unlikely readyAndEmpty is still true due to a race condition + while (writeState.get().readyAndEmpty) { + parkingThread = Thread.currentThread(); + LockSupport.parkNanos(Duration.ofSeconds(1).toNanos()); + } + parkingThread = null; + } + + @FunctionalInterface + interface ActionItem { + void run() throws IOException; + } + + private static final class WriteState { + + static final WriteState DEFAULT = new WriteState(false); + + /** + * The servlet output stream is ready and the writeChain is empty. + * + *

The event that readyAndEmpty turns from false to true: + * {@code onWritePossible()} exits while currently there is no more data to write, but the last + * check of {@link javax.servlet.ServletOutputStream#isReady()} is true. + * + *

The event that readyAndEmpty turns from false to true: + * {@code runOrBufferActionItem()} exits while either the action item is written directly to the + * servlet output stream and check of {@link javax.servlet.ServletOutputStream#isReady()} right + * after that is false, or the action item is buffered into the writeChain. + */ + final boolean readyAndEmpty; + + WriteState(boolean readyAndEmpty) { + this.readyAndEmpty = readyAndEmpty; + } + + /** + * Only {@code onWritePossible()} can set readyAndEmpty to true, and only {@code + * runOrBufferActionItem()} can set it to false. + */ + @CheckReturnValue + WriteState withReadyAndEmpty(boolean readyAndEmpty) { + return new WriteState(readyAndEmpty); + } + + /** + * Only {@code runOrBufferActionItem()} can call it, and will set readyAndEmpty to false. + */ + @CheckReturnValue + WriteState newItemBuffered() { + return new WriteState(false); + } + } +} diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java index e1e72949ab40..fd3d68546bf3 100644 --- a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java +++ b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java @@ -43,16 +43,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; -import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import javax.servlet.AsyncContext; import javax.servlet.WriteListener; @@ -66,12 +60,10 @@ final class ServletServerStream extends AbstractServerStream { private final Sink sink = new Sink(); private final AsyncContext asyncCtx; private final HttpServletResponse resp; - private final AtomicReference writeState = new AtomicReference<>(WriteState.DEFAULT); - // SPSC queue would do - private final Queue writeChain = new ConcurrentLinkedQueue<>(); private final Attributes attributes; private final String authority; private final InternalLogId logId; + private final AsyncServletOutputStreamWriter writer; ServletServerStream( AsyncContext asyncCtx, @@ -89,6 +81,8 @@ final class ServletServerStream extends AbstractServerStream { this.asyncCtx = asyncCtx; this.resp = (HttpServletResponse) asyncCtx.getResponse(); resp.getOutputStream().setWriteListener(new GrpcWriteListener()); + this.writer = new AsyncServletOutputStreamWriter( + asyncCtx, resp.getOutputStream(), transportState, logId); } @Override @@ -137,34 +131,6 @@ private void writeHeadersToServletResponse(Metadata metadata) { } } - private void writeBufToServletResponse(ByteArrayWritableBuffer buffer) - throws IOException { - int numBytes = buffer.readableBytes(); - if (buffer == ByteArrayWritableBuffer.FLUSH) { - resp.flushBuffer(); - } else { - resp.getOutputStream().write(buffer.bytes, 0, numBytes); - transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes)); - - if (logger.isLoggable(Level.FINEST)) { - logger.log( - Level.FINEST, - "[{0}] outbound data: length = {1}, bytes = {2}", - new Object[] {logId, numBytes, toHexString(buffer.bytes, numBytes)}); - } - } - } - - private void callComplete() { - logger.log(FINE, "[{0}] call is completing", logId); - transportState.runOnTransportThread( - () -> { - transportState.complete(); - asyncCtx.complete(); - logger.log(FINE, "[{0}] call completed", logId); - }); - } - final class ServletTransportState extends TransportState { private final SerializingExecutor transportThreadExecutor = @@ -197,18 +163,12 @@ public void deframeFailed(Throwable cause) { private static final class ByteArrayWritableBuffer implements WritableBuffer { - static final ByteArrayWritableBuffer FLUSH = new ByteArrayWritableBuffer(new byte[0]); - private final int capacity; final byte[] bytes; private int index; ByteArrayWritableBuffer(int capacityHint) { - this(new byte[min(1024 * 1024, max(4096, capacityHint))]); - } - - ByteArrayWritableBuffer(byte[] bytes) { - this.bytes = bytes; + this.bytes = new byte[min(1024 * 1024, max(4096, capacityHint))]; this.capacity = bytes.length; } @@ -237,44 +197,6 @@ public int readableBytes() { public void release() {} } - private static final class WriteState { - - static final WriteState DEFAULT = new WriteState(false, false); - - /** - * {@link javax.servlet.WriteListener#onWritePossible()} exits because currently there is no - * more data to write, but the last check of {@link javax.servlet.ServletOutputStream#isReady()} - * is true. - */ - final boolean stillWritePossible; - - final boolean trailersSent; - - private WriteState(boolean stillWritePossible, boolean trailersSent) { - this.stillWritePossible = stillWritePossible; - this.trailersSent = trailersSent; - } - - @CheckReturnValue - WriteState withTrailersSent(boolean trailersSent) { - return new WriteState(stillWritePossible, trailersSent); - } - - /** - * Only {@link javax.servlet.WriteListener#onWritePossible()} can set it to true, and only - * {@link ServletServerStream.Sink#writeFrame} can set it to false; - */ - @CheckReturnValue - WriteState withStillWritePossible(boolean stillWritePossible) { - return new WriteState(stillWritePossible, trailersSent); - } - - @CheckReturnValue - WriteState newState() { - return new WriteState(stillWritePossible, trailersSent); - } - } - private final class GrpcWriteListener implements WriteListener { @Override @@ -295,71 +217,7 @@ public void onError(Throwable t) { @Override public void onWritePossible() throws IOException { - logger.log(FINEST, "[{0}] onWritePossible: ENTRY", logId); - - // stillWritePossible should have been set to false already or right now - // It's very very unlikely stillWritePossible is true due to a race condition - while (writeState.get().stillWritePossible) { - LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L)); - } - - boolean isReady; - while ((isReady = resp.getOutputStream().isReady())) { - WriteState curState = writeState.get(); - - ByteArrayWritableBuffer buffer = writeChain.poll(); - if (buffer != null) { - writeBufToServletResponse(buffer); - continue; - } - - if (writeState.compareAndSet(curState, curState.withStillWritePossible(true))) { - logger.log(FINEST, "[{0}] set stillWritePossible to true", logId); - // state has not changed since. It's possible a new entry is just enqueued into the - // writeChain, but this case is handled right after the enqueuing - break; - } // else state changed by another thread, need to drain the writeChain again - } - - if (isReady && writeState.get().trailersSent) { - callComplete(); - } - - logger.log(FINEST, "[{0}] onWritePossible: EXIT", logId); - } - } - - private void writeBuf(ByteArrayWritableBuffer byteBuffer) { - WriteState curState = writeState.get(); - if (curState.stillWritePossible) { // write to the outputStream directly - try { - writeBufToServletResponse(byteBuffer); - if (!resp.getOutputStream().isReady()) { - while (true) { - if (writeState.compareAndSet(curState, curState.withStillWritePossible(false))) { - logger.log(FINEST, "[{0}] set stillWritePossible to false", logId); - return; - } - curState = writeState.get(); - assert curState.stillWritePossible; - } - } - } catch (IOException ioe) { - logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), ioe); - cancel(Status.fromThrowable(ioe)); - } - } else { // buffer to the writeChain - writeChain.offer(byteBuffer); - if (!writeState.compareAndSet(curState, curState.newState())) { - // state changed by another thread, need to check if stillWritePossible again - if (writeState.get().stillWritePossible) { - ByteArrayWritableBuffer bf = writeChain.poll(); - if (bf != null) { - assert bf == byteBuffer; - writeBuf(bf); - } - } - } + writer.onWritePossible(); } } @@ -370,7 +228,12 @@ private final class Sink implements AbstractServerStream.Sink { public void writeHeaders(Metadata headers) { writeHeadersToServletResponse(headers); resp.setTrailerFields(trailerSupplier); - writeBuf(ByteArrayWritableBuffer.FLUSH); + try { + writer.flush(); + } catch (IOException e) { + logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e); + cancel(Status.fromThrowable(e)); + } } @Override @@ -386,16 +249,21 @@ public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMes new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages}); } - if (frame != null) { - int numBytes = frame.readableBytes(); - if (numBytes > 0) { - onSendingBytes(numBytes); + try { + if (frame != null) { + int numBytes = frame.readableBytes(); + if (numBytes > 0) { + onSendingBytes(numBytes); + } + writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes()); } - writeBuf((ByteArrayWritableBuffer) frame); - } - if (flush) { - writeBuf(ByteArrayWritableBuffer.FLUSH); + if (flush) { + writer.flush(); + } + } catch (IOException e) { + logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e); + cancel(Status.fromThrowable(e)); } } @@ -419,18 +287,7 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status) } } - while (true) { - WriteState curState = writeState.get(); - if (curState.stillWritePossible) { - // in non-error case, this condition means all messages are sent out - callComplete(); - break; - } // else, some messages are still in write queue - if (writeState.compareAndSet(curState, curState.withTrailersSent(true))) { - logger.log(FINEST, "[{0}] set withTrailersSent to true", logId); - break; - } - } + writer.complete(); } @Override