Skip to content

Commit

Permalink
Synchronize request/response/connection on connection
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 25, 2016
1 parent 5ce3cd6 commit a2dac30
Show file tree
Hide file tree
Showing 13 changed files with 669 additions and 506 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -265,6 +265,11 @@ int getOutstandingRequestCount() {
return requests.size(); return requests.size();
} }


@Override
public void checkDrained() {
handleInterestedOpsChanged();
}

@Override @Override
public synchronized void handleInterestedOpsChanged() { public synchronized void handleInterestedOpsChanged() {
if (!isNotWritable()) { if (!isNotWritable()) {
Expand Down
28 changes: 18 additions & 10 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -80,7 +80,7 @@ void onStreamClosed(Http2Stream nettyStream) {
http2Pool.recycle(Http2ClientConnection.this); http2Pool.recycle(Http2ClientConnection.this);
} }


HttpClientStream createStream() { synchronized HttpClientStream createStream() {
try { try {
Http2Connection conn = handler.connection(); Http2Connection conn = handler.connection();
Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false); Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false);
Expand All @@ -104,7 +104,7 @@ public boolean isValid() {
} }


@Override @Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception { public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId); Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
if (stream != null) { if (stream != null) {
context.executeFromIO(() -> { context.executeFromIO(() -> {
Expand All @@ -114,7 +114,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
} }


@Override @Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception { public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId); Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
if (stream != null) { if (stream != null) {
context.executeFromIO(() -> { context.executeFromIO(() -> {
Expand Down Expand Up @@ -175,13 +175,14 @@ void handleData(Buffer buf) {


@Override @Override
void handleReset(long errorCode) { void handleReset(long errorCode) {
if (!responseEnded) { if (responseEnded) {
responseEnded = true; return;
if (conn.metrics.isEnabled()) { }
conn.metrics.requestReset(request.metric()); responseEnded = true;
} if (conn.metrics.isEnabled()) {
handleException(new StreamResetException(errorCode)); conn.metrics.requestReset(request.metric());
} }
handleException(new StreamResetException(errorCode));
} }


@Override @Override
Expand All @@ -196,7 +197,14 @@ void handleClose() {
} }


@Override @Override
public void handleInterestedOpsChanged() { public void checkDrained() {
synchronized (conn) {
handleInterestedOpsChanged();
}
}

@Override
void handleInterestedOpsChanged() {
if (request instanceof HttpClientRequestImpl && !isNotWritable()) { if (request instanceof HttpClientRequestImpl && !isNotWritable()) {
if (!isNotWritable()) { if (!isNotWritable()) {
((HttpClientRequestImpl) request).handleDrained(); ((HttpClientRequestImpl) request).handleDrained();
Expand Down
87 changes: 46 additions & 41 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Expand Up @@ -99,41 +99,44 @@ public ContextImpl getContext() {


@Override @Override
protected void handleInterestedOpsChanged() { protected void handleInterestedOpsChanged() {
// Handled by HTTP/2
} }


boolean isSsl() { boolean isSsl() {
return channel.pipeline().get(SslHandler.class) != null; return channel.pipeline().get(SslHandler.class) != null;
} }


void onConnectionError(Throwable cause) { synchronized void onConnectionError(Throwable cause) {
for (VertxHttp2Stream stream : streams.values()) { synchronized (this) {
context.executeFromIO(() -> { for (VertxHttp2Stream stream : streams.values()) {
stream.handleException(cause); context.runOnContext(v -> {
}); synchronized (Http2ConnectionBase.this) {
} stream.handleException(cause);
Handler<Throwable> handler = exceptionHandler; }
if (handler != null) { });
context.executeFromIO(() -> { }
Handler<Throwable> handler = exceptionHandler;
if (handler != null) {
handler.handle(cause); handler.handle(cause);
}); }
} }
} }


void onStreamError(int streamId, Throwable cause) { synchronized void onStreamError(int streamId, Throwable cause) {
VertxHttp2Stream stream = streams.get(streamId); VertxHttp2Stream stream = streams.get(streamId);
if (stream != null) { if (stream != null) {
stream.handleException(cause); stream.handleException(cause);
} }
} }


void onStreamwritabilityChanged(Http2Stream s) { synchronized void onStreamwritabilityChanged(Http2Stream s) {
VertxHttp2Stream stream = streams.get(s.id()); VertxHttp2Stream stream = streams.get(s.id());
if (stream != null) { if (stream != null) {
context.executeFromIO(stream::onWritabilityChanged); context.executeFromIO(stream::onWritabilityChanged);
} }
} }


void onStreamClosed(Http2Stream stream) { synchronized void onStreamClosed(Http2Stream stream) {
checkShutdownHandler(); checkShutdownHandler();
VertxHttp2Stream removed = streams.remove(stream.id()); VertxHttp2Stream removed = streams.remove(stream.id());
if (removed != null) { if (removed != null) {
Expand All @@ -146,7 +149,7 @@ void onStreamClosed(Http2Stream stream) {
void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) { void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
} }


void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { synchronized void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
Handler<GoAway> handler = goAwayHandler; Handler<GoAway> handler = goAwayHandler;
if (handler != null) { if (handler != null) {
Buffer buffer = Buffer.buffer(debugData); Buffer buffer = Buffer.buffer(debugData);
Expand All @@ -170,16 +173,16 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
} }


@Override @Override
public void onSettingsAckRead(ChannelHandlerContext ctx) { public synchronized void onSettingsAckRead(ChannelHandlerContext ctx) {
Runnable handler = updateSettingsHandler.poll(); Runnable handler = updateSettingsHandler.poll();
if (handler != null) { if (handler != null) {
// No need to run on a particular context it shall be done by the handler already // No need to run on a particular context it shall be done by the handler instead
handler.run(); handler.run();
} }
} }


@Override @Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { public synchronized void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
Handler<io.vertx.core.http.Http2Settings> handler = clientSettingsHandler; Handler<io.vertx.core.http.Http2Settings> handler = clientSettingsHandler;
if (handler != null) { if (handler != null) {
context.executeFromIO(() -> { context.executeFromIO(() -> {
Expand Down Expand Up @@ -210,7 +213,7 @@ public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int wind
} }


@Override @Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, public synchronized void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) { Http2Flags flags, ByteBuf payload) {
VertxHttp2Stream req = streams.get(streamId); VertxHttp2Stream req = streams.get(streamId);
if (req != null) { if (req != null) {
Expand All @@ -222,7 +225,7 @@ public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int stream
} }


@Override @Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) { public synchronized void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
VertxHttp2Stream req = streams.get(streamId); VertxHttp2Stream req = streams.get(streamId);
if (req != null) { if (req != null) {
context.executeFromIO(() -> { context.executeFromIO(() -> {
Expand All @@ -232,13 +235,13 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC
} }


@Override @Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) { public synchronized int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) {
int[] consumed = { padding }; int[] consumed = { padding };
VertxHttp2Stream req = streams.get(streamId); VertxHttp2Stream req = streams.get(streamId);
if (req != null) { if (req != null) {
Buffer buff = Buffer.buffer(data.copy()); Buffer buff = Buffer.buffer(data.copy());
context.executeFromIO(() -> { context.executeFromIO(() -> {
int len = data.readableBytes(); int len = buff.length();
if (req.onDataRead(buff)) { if (req.onDataRead(buff)) {
consumed[0] += len; consumed[0] += len;
} }
Expand All @@ -251,7 +254,7 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
} }


@Override @Override
public HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData) { public synchronized HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData) {
if (errorCode < 0) { if (errorCode < 0) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
Expand All @@ -263,13 +266,13 @@ public HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData)
} }


@Override @Override
public HttpConnection goAwayHandler(Handler<GoAway> handler) { public synchronized HttpConnection goAwayHandler(Handler<GoAway> handler) {
goAwayHandler = handler; goAwayHandler = handler;
return this; return this;
} }


@Override @Override
public HttpConnection shutdownHandler(Handler<Void> handler) { public synchronized HttpConnection shutdownHandler(Handler<Void> handler) {
shutdownHandler = handler; shutdownHandler = handler;
return this; return this;
} }
Expand All @@ -287,7 +290,7 @@ public HttpConnection shutdown() {
return shutdown(null); return shutdown(null);
} }


private HttpConnection shutdown(Long timeout) { private synchronized HttpConnection shutdown(Long timeout) {
if (!shuttingdown) { if (!shuttingdown) {
shuttingdown = true; shuttingdown = true;
if (timeout != null) { if (timeout != null) {
Expand All @@ -299,7 +302,7 @@ private HttpConnection shutdown(Long timeout) {
} }


@Override @Override
public HttpConnection closeHandler(Handler<Void> handler) { public synchronized HttpConnection closeHandler(Handler<Void> handler) {
closeHandler = handler; closeHandler = handler;
return this; return this;
} }
Expand All @@ -310,18 +313,18 @@ public void close() {
} }


@Override @Override
public HttpConnection remoteSettingsHandler(Handler<io.vertx.core.http.Http2Settings> handler) { public synchronized HttpConnection remoteSettingsHandler(Handler<io.vertx.core.http.Http2Settings> handler) {
clientSettingsHandler = handler; clientSettingsHandler = handler;
return this; return this;
} }


@Override @Override
public Handler<io.vertx.core.http.Http2Settings> remoteSettingsHandler() { public synchronized Handler<io.vertx.core.http.Http2Settings> remoteSettingsHandler() {
return clientSettingsHandler; return clientSettingsHandler;
} }


@Override @Override
public io.vertx.core.http.Http2Settings remoteSettings() { public synchronized io.vertx.core.http.Http2Settings remoteSettings() {
io.vertx.core.http.Http2Settings a = new io.vertx.core.http.Http2Settings(); io.vertx.core.http.Http2Settings a = new io.vertx.core.http.Http2Settings();
a.setPushEnabled(handler.connection().remote().allowPushTo()); a.setPushEnabled(handler.connection().remote().allowPushTo());
a.setMaxConcurrentStreams((long) handler.connection().local().maxActiveStreams()); a.setMaxConcurrentStreams((long) handler.connection().local().maxActiveStreams());
Expand All @@ -333,7 +336,7 @@ public io.vertx.core.http.Http2Settings remoteSettings() {
} }


@Override @Override
public io.vertx.core.http.Http2Settings settings() { public synchronized io.vertx.core.http.Http2Settings settings() {
return HttpUtils.toVertxSettings(serverSettings); return HttpUtils.toVertxSettings(serverSettings);
} }


Expand All @@ -349,7 +352,7 @@ public HttpConnection updateSettings(io.vertx.core.http.Http2Settings settings,
return this; return this;
} }


protected void updateSettings(Http2Settings settingsUpdate, Handler<AsyncResult<Void>> completionHandler) { protected synchronized void updateSettings(Http2Settings settingsUpdate, Handler<AsyncResult<Void>> completionHandler) {
Context completionContext = completionHandler != null ? context.owner().getOrCreateContext() : null; Context completionContext = completionHandler != null ? context.owner().getOrCreateContext() : null;
Http2Settings current = handler.decoder().localSettings(); Http2Settings current = handler.decoder().localSettings();
for (Map.Entry<Character, Long> entry : current.entrySet()) { for (Map.Entry<Character, Long> entry : current.entrySet()) {
Expand All @@ -360,14 +363,16 @@ protected void updateSettings(Http2Settings settingsUpdate, Handler<AsyncResult<
} }
handler.writeSettings(settingsUpdate).addListener(fut -> { handler.writeSettings(settingsUpdate).addListener(fut -> {
if (fut.isSuccess()) { if (fut.isSuccess()) {
updateSettingsHandler.add(() -> { synchronized (Http2ConnectionBase.this) {
serverSettings.putAll(settingsUpdate); updateSettingsHandler.add(() -> {
if (completionHandler != null) { serverSettings.putAll(settingsUpdate);
completionContext.runOnContext(v -> { if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture()); completionContext.runOnContext(v -> {
}); completionHandler.handle(Future.succeededFuture());
} });
}); }
});
}
} else { } else {
if (completionHandler != null) { if (completionHandler != null) {
completionContext.runOnContext(v -> { completionContext.runOnContext(v -> {
Expand All @@ -379,13 +384,13 @@ protected void updateSettings(Http2Settings settingsUpdate, Handler<AsyncResult<
} }


@Override @Override
public HttpConnection exceptionHandler(Handler<Throwable> handler) { public synchronized HttpConnection exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler; exceptionHandler = handler;
return this; return this;
} }


@Override @Override
public Handler<Throwable> exceptionHandler() { public synchronized Handler<Throwable> exceptionHandler() {
return exceptionHandler; return exceptionHandler;
} }


Expand Down

0 comments on commit a2dac30

Please sign in to comment.