Skip to content

Commit

Permalink
Remove generic connection message encoding/decoding and instead let e…
Browse files Browse the repository at this point in the history
…ach connection handle it when necessary - fixes #2599
  • Loading branch information
vietj committed Aug 23, 2018
1 parent 651ea7c commit d5e4110
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 142 deletions.
Expand Up @@ -33,17 +33,4 @@ final class DatagramServerHandler extends VertxHandler<DatagramSocketImpl.Connec
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
setConnection(socket.createConnection(ctx));
}

@Override
protected Object decode(Object msg, ByteBufAllocator allocator) throws Exception {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.isDirect()) {
content = safeBuffer(content, allocator);
}
return new DatagramPacketImpl(packet.sender(), Buffer.buffer(content));
}
return msg;
}
}
Expand Up @@ -11,6 +11,7 @@

package io.vertx.core.datagram.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
Expand All @@ -25,6 +26,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.http.impl.VertxHttpHandler;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
Expand Down Expand Up @@ -392,7 +394,14 @@ protected void handleClosed() {
}

public void handleMessage(Object msg) {
handlePacket((io.vertx.core.datagram.DatagramPacket) msg);
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.isDirect()) {
content = VertxHttpHandler.safeBuffer(content, chctx.alloc());
}
handlePacket(new DatagramPacketImpl(packet.sender(), Buffer.buffer(content)));
}
}

void handlePacket(io.vertx.core.datagram.DatagramPacket packet) {
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker;
Expand Down Expand Up @@ -537,8 +538,8 @@ public void handleMessage(Object msg) {
} else if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
handleHttpMessage(obj);
} else if (msg instanceof WebSocketFrameInternal) {
WebSocketFrameInternal frame = (WebSocketFrameInternal) msg;
} else if (msg instanceof WebSocketFrame) {
WebSocketFrameInternal frame = decodeFrame((WebSocketFrame) msg);
switch (frame.type()) {
case BINARY:
case CONTINUATION:
Expand Down Expand Up @@ -568,13 +569,13 @@ public void handleMessage(Object msg) {
}
}

void handleHttpMessage(HttpObject obj) {
private void handleHttpMessage(HttpObject obj) {
if (obj instanceof HttpResponse) {
handleResponseBegin((HttpResponse) obj);
} else if (obj instanceof HttpContent) {
HttpContent chunk = (HttpContent) obj;
if (chunk.content().isReadable()) {
Buffer buff = Buffer.buffer(chunk.content().slice());
Buffer buff = Buffer.buffer(VertxHttpHandler.safeBuffer(chunk.content(), chctx.alloc()));
handleResponseChunk(buff);
}
if (chunk instanceof LastHttpContent) {
Expand Down
37 changes: 25 additions & 12 deletions src/main/java/io/vertx/core/http/impl/Http1xConnectionBase.java
Expand Up @@ -34,7 +34,7 @@
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.ConnectionBase;

import static io.vertx.core.http.impl.Http2ConnectionBase.safeBuffer;
import static io.vertx.core.net.impl.VertxHandler.safeBuffer;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
Expand All @@ -45,16 +45,7 @@ abstract class Http1xConnectionBase extends ConnectionBase implements io.vertx.c
super(vertx, chctx, context);
}

@Override
protected Object encode(Object obj) {
if (obj instanceof WebSocketFrameInternal) {
return encodeFrame(obj);
}
return obj;
}

private WebSocketFrame encodeFrame(Object obj) {
WebSocketFrameImpl frame = (WebSocketFrameImpl) obj;
WebSocketFrame encodeFrame(WebSocketFrameImpl frame) {
ByteBuf buf = frame.getBinaryData();
if (buf != Unpooled.EMPTY_BUFFER) {
buf = safeBuffer(buf, chctx.alloc());
Expand All @@ -73,8 +64,30 @@ private WebSocketFrame encodeFrame(Object obj) {
case PING:
return new PingWebSocketFrame(buf);
default:
throw new IllegalStateException("Unsupported websocket msg " + obj);
throw new IllegalStateException("Unsupported websocket msg " + frame);
}
}

WebSocketFrameInternal decodeFrame(WebSocketFrame msg) {
ByteBuf payload = safeBuffer(msg, chctx.alloc());
boolean isFinal = msg.isFinalFragment();
FrameType frameType;
if (msg instanceof BinaryWebSocketFrame) {
frameType = FrameType.BINARY;
} else if (msg instanceof CloseWebSocketFrame) {
frameType = FrameType.CLOSE;
} else if (msg instanceof PingWebSocketFrame) {
frameType = FrameType.PING;
} else if (msg instanceof PongWebSocketFrame) {
frameType = FrameType.PONG;
} else if (msg instanceof TextWebSocketFrame) {
frameType = FrameType.TEXT;
} else if (msg instanceof ContinuationWebSocketFrame) {
frameType = FrameType.CONTINUATION;
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
return new WebSocketFrameImpl(frameType, payload, isFinal);
}

abstract public void closeWithPayload(ByteBuf byteBuf);
Expand Down
Expand Up @@ -27,7 +27,6 @@
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
Expand Down Expand Up @@ -149,7 +148,7 @@ private void handleContent(Object msg) {
handleError(content);
return;
}
Buffer buffer = Buffer.buffer(content.content());
Buffer buffer = Buffer.buffer(VertxHttpHandler.safeBuffer(content.content(), chctx.alloc()));
if (METRICS_ENABLED) {
reportBytesRead(buffer);
}
Expand Down Expand Up @@ -188,8 +187,8 @@ private void handleNext(HttpServerRequestImpl request) {
}

private void handleOther(Object msg) {
if (msg instanceof WebSocketFrameInternal) {
WebSocketFrameInternal frame = (WebSocketFrameInternal) msg;
if (msg instanceof WebSocketFrame) {
WebSocketFrameInternal frame = decodeFrame((WebSocketFrame) msg);
switch (frame.type()) {
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
Expand Down Expand Up @@ -530,7 +529,7 @@ private long getBytes(Object obj) {
} else if (obj instanceof HttpContent) {
return ((HttpContent) obj).content().readableBytes();
} else if (obj instanceof WebSocketFrame) {
return ((WebSocketFrameInternal) obj).length();
return ((WebSocketFrame) obj).content().readableBytes();
} else if (obj instanceof FileRegion) {
return ((FileRegion) obj).count();
} else if (obj instanceof ChunkedFile) {
Expand Down
Expand Up @@ -11,10 +11,7 @@

package io.vertx.core.http.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.*;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
Expand Down
52 changes: 0 additions & 52 deletions src/main/java/io/vertx/core/http/impl/VertxHttpHandler.java
Expand Up @@ -11,63 +11,11 @@

package io.vertx.core.http.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.*;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.VertxHandler;

/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
public abstract class VertxHttpHandler<C extends ConnectionBase> extends VertxHandler<C> {

private static ByteBuf safeBuffer(ByteBufHolder holder, ByteBufAllocator allocator) {
return safeBuffer(holder.content(), allocator);
}

@Override
protected Object decode(Object msg, ByteBufAllocator allocator) throws Exception {
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
if (buf != Unpooled.EMPTY_BUFFER && buf.isDirect()) {
ByteBuf newBuf = safeBuffer(content, allocator);
if (msg instanceof LastHttpContent) {
LastHttpContent last = (LastHttpContent) msg;
return new AssembledLastHttpContent(newBuf, last.trailingHeaders(), last.decoderResult());
} else {
return new DefaultHttpContent(newBuf);
}
}
} else if (msg instanceof WebSocketFrame) {
ByteBuf payload = safeBuffer((WebSocketFrame) msg, allocator);
boolean isFinal = ((WebSocketFrame) msg).isFinalFragment();
FrameType frameType;
if (msg instanceof BinaryWebSocketFrame) {
frameType = FrameType.BINARY;
} else if (msg instanceof CloseWebSocketFrame) {
frameType = FrameType.CLOSE;
} else if (msg instanceof PingWebSocketFrame) {
frameType = FrameType.PING;
} else if (msg instanceof PongWebSocketFrame) {
frameType = FrameType.PONG;
} else if (msg instanceof TextWebSocketFrame) {
frameType = FrameType.TEXT;
} else if (msg instanceof ContinuationWebSocketFrame) {
frameType = FrameType.CONTINUATION;
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
return new WebSocketFrameImpl(frameType, payload, isFinal);
}
return msg;
}

}
Expand Up @@ -259,7 +259,7 @@ public S writeFrame(WebSocketFrame frame) {
synchronized (conn) {
checkClosed();
conn.reportBytesWritten(((WebSocketFrameInternal)frame).length());
conn.writeToChannel(frame);
conn.writeToChannel(conn.encodeFrame((WebSocketFrameImpl) frame));
}
return (S) this;
}
Expand Down
11 changes: 0 additions & 11 deletions src/main/java/io/vertx/core/net/impl/ConnectionBase.java
Expand Up @@ -78,16 +78,6 @@ public void fail(Throwable error) {
handler().fail(error);
}

/**
* Encode to message before writing to the channel
*
* @param obj the object to encode
* @return the encoded message
*/
protected Object encode(Object obj) {
return obj;
}

public VertxHandler handler() {
return (VertxHandler) chctx.handler();
}
Expand All @@ -108,7 +98,6 @@ protected synchronized final void endReadAndFlush() {
}

private void write(Object msg, ChannelPromise promise) {
msg = encode(msg);
if (read || writeInProgress > 0) {
needsFlush = true;
chctx.write(msg, promise);
Expand Down
60 changes: 26 additions & 34 deletions src/main/java/io/vertx/core/net/impl/VertxHandler.java
Expand Up @@ -11,10 +11,7 @@

package io.vertx.core.net.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -28,6 +25,30 @@
*/
public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDuplexHandler {

public static ByteBuf safeBuffer(ByteBufHolder holder, ByteBufAllocator allocator) {
return safeBuffer(holder.content(), allocator);
}

public static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
if (buf == Unpooled.EMPTY_BUFFER) {
return buf;
}
if (buf.isDirect() || buf instanceof CompositeByteBuf) {
try {
if (buf.isReadable()) {
ByteBuf buffer = allocator.heapBuffer(buf.readableBytes());
buffer.writeBytes(buf);
return buffer;
} else {
return Unpooled.EMPTY_BUFFER;
}
} finally {
buf.release();
}
}
return buf;
}

private static final Handler<Object> NULL_HANDLER = m -> { };

private C conn;
Expand Down Expand Up @@ -87,26 +108,6 @@ public C getConnection() {
return conn;
}

public static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
if (buf == Unpooled.EMPTY_BUFFER) {
return buf;
}
if (buf.isDirect() || buf instanceof CompositeByteBuf) {
try {
if (buf.isReadable()) {
ByteBuf buffer = allocator.heapBuffer(buf.readableBytes());
buffer.writeBytes(buf);
return buffer;
} else {
return Unpooled.EMPTY_BUFFER;
}
} finally {
buf.release();
}
}
return buf;
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
C conn = getConnection();
Expand Down Expand Up @@ -152,9 +153,8 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

@Override
public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception {
Object message = decode(msg, chctx.alloc());
ContextInternal ctx = conn.getContext();
ctx.executeFromIO(message, messageHandler);
ctx.executeFromIO(msg, messageHandler);
}

@Override
Expand All @@ -164,12 +164,4 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
ctx.fireUserEventTriggered(evt);
}

/**
* Decode the message before passing it to the channel.
*
* @param msg the message to decode
* @return the decoded message
*/
protected abstract Object decode(Object msg, ByteBufAllocator allocator) throws Exception;
}
5 changes: 0 additions & 5 deletions src/main/java/io/vertx/core/net/impl/VertxNetHandler.java
Expand Up @@ -35,9 +35,4 @@ public VertxNetHandler(NetSocketImpl conn) {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
setConnection(connectionFactory.apply(ctx));
}

@Override
protected Object decode(Object msg, ByteBufAllocator allocator) throws Exception {
return msg;
}
}

0 comments on commit d5e4110

Please sign in to comment.