Skip to content

Commit

Permalink
Delay the end handler notification when an stream end handler is call…
Browse files Browse the repository at this point in the history
…ed - see #2852
  • Loading branch information
vietj committed Feb 25, 2019
1 parent 90455f8 commit 24434e9
Show file tree
Hide file tree
Showing 17 changed files with 229 additions and 76 deletions.
34 changes: 22 additions & 12 deletions src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java
Expand Up @@ -69,6 +69,7 @@ public class AsyncFileImpl implements AsyncFile {
private int lwm = maxWrites / 2; private int lwm = maxWrites / 2;
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private InboundBuffer<Buffer> queue; private InboundBuffer<Buffer> queue;
private Handler<Buffer> handler;
private Handler<Void> endHandler; private Handler<Void> endHandler;
private long readPos; private long readPos;
private long readLength = Long.MAX_VALUE; private long readLength = Long.MAX_VALUE;
Expand Down Expand Up @@ -102,7 +103,13 @@ public class AsyncFileImpl implements AsyncFile {
} }
this.context = context; this.context = context;
this.queue = new InboundBuffer<>(context, 0); this.queue = new InboundBuffer<>(context, 0);

queue.handler(buff -> {
if (buff.length() > 0) {
handleBuffer(buff);
} else {
handleEnd();
}
});
queue.drainHandler(v -> { queue.drainHandler(v -> {
doRead(); doRead();
}); });
Expand Down Expand Up @@ -235,7 +242,7 @@ public synchronized AsyncFile handler(Handler<Buffer> handler) {
if (closed) { if (closed) {
return this; return this;
} }
queue.handler(handler); this.handler = handler;
if (handler != null) { if (handler != null) {
doRead(); doRead();
} else { } else {
Expand Down Expand Up @@ -346,24 +353,27 @@ private synchronized void doRead(ByteBuffer bb) {
doRead(buff, 0, bb, readPos, ar -> { doRead(buff, 0, bb, readPos, ar -> {
if (ar.succeeded()) { if (ar.succeeded()) {
Buffer buffer = ar.result(); Buffer buffer = ar.result();
if (buffer.length() == 0) { readPos += buffer.length();
// Empty buffer represents end of file readLength -= buffer.length();
handleEnd(); // Empty buffer represents end of file
} else { if (queue.write(buffer) && buffer.length() > 0) {
readPos += buffer.length(); doRead(bb);
readLength -= buffer.length();
if (queue.write(buffer)) {
doRead(bb);
}
} }
} else { } else {
handleException(ar.cause()); handleException(ar.cause());
} }
}); });
} }


private synchronized void handleBuffer(Buffer buff) {
if (handler != null) {
checkContext();
handler.handle(buff);
}
}

private synchronized void handleEnd() { private synchronized void handleEnd() {
queue.handler(null); handler = null;
if (endHandler != null) { if (endHandler != null) {
checkContext(); checkContext();
endHandler.handle(null); endHandler.handle(null);
Expand Down
Expand Up @@ -348,12 +348,9 @@ public void doPause() {


@Override @Override
public void doFetch(long amount) { public void doFetch(long amount) {
queue.fetch(amount); if (!queue.fetch(amount)) {
} response.handleEnd(trailers);

}
@Override
public void doResume() {
queue.resume();
} }


@Override @Override
Expand Down Expand Up @@ -489,7 +486,7 @@ private boolean endResponse(LastHttpContent trailer) {
} }
} }
trailers = new HeadersAdaptor(trailer.trailingHeaders()); trailers = new HeadersAdaptor(trailer.trailingHeaders());
if (queue.isEmpty()) { if (queue.isEmpty() && !queue.isPaused()) {
response.handleEnd(trailers); response.handleEnd(trailers);
} }
responseEnded = true; responseEnded = true;
Expand Down
Expand Up @@ -225,19 +225,22 @@ public HttpServerRequest handler(Handler<Buffer> handler) {


@Override @Override
public HttpServerRequest pause() { public HttpServerRequest pause() {
doPause(); synchronized (conn) {
checkEnded();
doPause();
}
return this; return this;
} }


@Override @Override
public HttpServerRequest resume() { public HttpServerRequest resume() {
doResume(); return fetch(Long.MAX_VALUE);
return this;
} }


@Override @Override
public HttpServerRequest fetch(long amount) { public HttpServerRequest fetch(long amount) {
synchronized (conn) { synchronized (conn) {
checkEnded();
doFetch(amount); doFetch(amount);
} }
return this; return this;
Expand Down
Expand Up @@ -215,11 +215,6 @@ public void doPause() {
stream.doPause(); stream.doPause();
} }


@Override
public void doResume() {
stream.doResume();
}

@Override @Override
public void doFetch(long amount) { public void doFetch(long amount) {
stream.doFetch(amount); stream.doFetch(amount);
Expand Down
64 changes: 46 additions & 18 deletions src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java
Expand Up @@ -132,26 +132,41 @@ public List<String> cookies() {
} }
} }


private void checkEnded() {
if (trailers != null) {
throw new IllegalStateException();
}
}

@Override @Override
public HttpClientResponse handler(Handler<Buffer> dataHandler) { public HttpClientResponse handler(Handler<Buffer> handle) {
synchronized (conn) { synchronized (conn) {
this.dataHandler = dataHandler; if (handle != null) {
checkEnded();
}
dataHandler = handle;
return this; return this;
} }
} }


@Override @Override
public HttpClientResponse endHandler(Handler<Void> endHandler) { public HttpClientResponse endHandler(Handler<Void> handler) {
synchronized (conn) { synchronized (conn) {
this.endHandler = endHandler; if (handler != null) {
checkEnded();
}
endHandler = handler;
return this; return this;
} }
} }


@Override @Override
public HttpClientResponse exceptionHandler(Handler<Throwable> exceptionHandler) { public HttpClientResponse exceptionHandler(Handler<Throwable> handler) {
synchronized (conn) { synchronized (conn) {
this.exceptionHandler = exceptionHandler; if (handler != null) {
checkEnded();
}
exceptionHandler = handler;
return this; return this;
} }
} }
Expand All @@ -164,8 +179,7 @@ public HttpClientResponse pause() {


@Override @Override
public HttpClientResponse resume() { public HttpClientResponse resume() {
stream.doResume(); return fetch(Long.MAX_VALUE);
return this;
} }


@Override @Override
Expand All @@ -175,16 +189,24 @@ public HttpClientResponse fetch(long amount) {
} }


@Override @Override
public HttpClientResponse bodyHandler(final Handler<Buffer> bodyHandler) { public HttpClientResponse bodyHandler(final Handler<Buffer> handler) {
BodyHandler handler = new BodyHandler(); if (handler != null) {
handler(handler); BodyHandler bodyHandler = new BodyHandler();
endHandler(v -> handler.notifyHandler(bodyHandler)); handler(bodyHandler);
endHandler(v -> bodyHandler.notifyHandler(handler));
} else {
handler(null);
endHandler(null);
}
return this; return this;
} }


@Override @Override
public HttpClientResponse customFrameHandler(Handler<HttpFrame> handler) { public HttpClientResponse customFrameHandler(Handler<HttpFrame> handler) {
synchronized (conn) { synchronized (conn) {
if (endHandler != null) {
checkEnded();
}
customFrameHandler = handler; customFrameHandler = handler;
return this; return this;
} }
Expand Down Expand Up @@ -217,16 +239,19 @@ void handleChunk(Buffer data) {
} }


void handleEnd(MultiMap trailers) { void handleEnd(MultiMap trailers) {
Handler<Void> handler;
synchronized (conn) { synchronized (conn) {
stream.reportBytesRead(bytesRead); stream.reportBytesRead(bytesRead);
bytesRead = 0; bytesRead = 0;
this.trailers = trailers; this.trailers = trailers;
if (endHandler != null) { handler = endHandler;
try { endHandler = null;
endHandler.handle(null); }
} catch (Throwable t) { if (handler != null) {
handleException(t); try {
} handler.handle(null);
} catch (Throwable t) {
handleException(t);
} }
} }
} }
Expand Down Expand Up @@ -277,6 +302,9 @@ void notifyHandler(Handler<Buffer> bodyHandler) {
@Override @Override
public HttpClientResponse streamPriorityHandler(Handler<StreamPriority> handler) { public HttpClientResponse streamPriorityHandler(Handler<StreamPriority> handler) {
synchronized (conn) { synchronized (conn) {
if (handler != null) {
checkEnded();
}
priorityHandler = handler; priorityHandler = handler;
} }
return this; return this;
Expand Down
Expand Up @@ -49,7 +49,6 @@ interface HttpClientStream {
void doSetWriteQueueMaxSize(int size); void doSetWriteQueueMaxSize(int size);
boolean isNotWritable(); boolean isNotWritable();
void doPause(); void doPause();
void doResume();
void doFetch(long amount); void doFetch(long amount);


void reset(long code); void reset(long code);
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/io/vertx/core/http/impl/HttpServerRequestImpl.java
Expand Up @@ -161,7 +161,7 @@ void handlePipelined() {
boolean end = ended; boolean end = ended;
ended = false; ended = false;
handleBegin(); handleBegin();
if (pending != null && pending.size() > 0) { if (pending != null && pending.isPaused()) {
pending.resume(); pending.resume();
} }
if (end) { if (end) {
Expand Down Expand Up @@ -340,7 +340,13 @@ public HttpServerRequest fetch(long amount) {
public HttpServerRequest resume() { public HttpServerRequest resume() {
synchronized (conn) { synchronized (conn) {
if (!isEnded()) { if (!isEnded()) {
pendingQueue().resume(); if (ended) {
if (!pending.resume()) {
doEnd();
}
} else if (pending != null) {
pending.resume();
}
} }
return this; return this;
} }
Expand Down Expand Up @@ -478,7 +484,7 @@ public SocketAddress localAddress() {
@Override @Override
public boolean isEnded() { public boolean isEnded() {
synchronized (conn) { synchronized (conn) {
return ended && (pending == null || pending.isEmpty()); return ended && (pending == null || (!pending.isPaused() && pending.isEmpty()));
} }
} }


Expand Down
Expand Up @@ -151,8 +151,7 @@ public NetSocket pause() {


@Override @Override
public NetSocket resume() { public NetSocket resume() {
doResume(); return fetch(Long.MAX_VALUE);
return this;
} }


@Override @Override
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java
Expand Up @@ -36,7 +36,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
protected final ChannelHandlerContext handlerContext; protected final ChannelHandlerContext handlerContext;
protected final Http2Stream stream; protected final Http2Stream stream;


private InboundBuffer<Buffer> pending; private final InboundBuffer<Buffer> pending;
private int pendingBytes; private int pendingBytes;
private MultiMap trailers; private MultiMap trailers;
private boolean writable; private boolean writable;
Expand Down Expand Up @@ -95,7 +95,7 @@ void onEnd() {
void onEnd(MultiMap map) { void onEnd(MultiMap map) {
synchronized (conn) { synchronized (conn) {
trailers = map; trailers = map;
if (pending.isEmpty()) { if (pending.isEmpty() && !pending.isPaused()) {
handleEnd(trailers); handleEnd(trailers);
} }
} }
Expand All @@ -109,12 +109,12 @@ public void doPause() {
pending.pause(); pending.pause();
} }


public void doResume() {
pending.resume();
}

public void doFetch(long amount) { public void doFetch(long amount) {
pending.fetch(amount); if (!pending.fetch(amount)) {
if (trailers != null) {
handleEnd(trailers);
}
}
} }


boolean isNotWritable() { boolean isNotWritable() {
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java
Expand Up @@ -21,6 +21,7 @@
import io.vertx.core.http.WebSocketFrame; import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.SocketAddress; import io.vertx.core.net.SocketAddress;
import io.vertx.core.streams.impl.InboundBuffer; import io.vertx.core.streams.impl.InboundBuffer;
Expand Down Expand Up @@ -444,7 +445,7 @@ void handleClosed() {
Handler<Void> endHandler; Handler<Void> endHandler;
Handler<Void> closeHandler; Handler<Void> closeHandler;
synchronized (conn) { synchronized (conn) {
endHandler = this.endHandler; endHandler = pending.isPaused() ? null : this.endHandler;
closeHandler = this.closeHandler; closeHandler = this.closeHandler;
closed = true; closed = true;
binaryHandlerRegistration = null; binaryHandlerRegistration = null;
Expand Down Expand Up @@ -548,8 +549,17 @@ public S pause() {


@Override @Override
public S resume() { public S resume() {
if (!isClosed()) { synchronized (this) {
pending.resume(); if (isClosed()) {
Handler<Void> handler = endHandler;
endHandler = null;
if (handler != null) {
ContextInternal ctx = conn.getContext();
ctx.runOnContext(v -> handler.handle(null));
}
} else {
pending.resume();
}
} }
return (S) this; return (S) this;
} }
Expand Down

0 comments on commit 24434e9

Please sign in to comment.