Skip to content

Commit

Permalink
Remove the ContextTask since we can use Runnable in practice instead
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 5, 2018
1 parent ee7d694 commit e82072c
Show file tree
Hide file tree
Showing 15 changed files with 16 additions and 39 deletions.
Expand Up @@ -35,7 +35,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
} }


@Override @Override
protected void handleMessage(final DatagramSocketImpl.Connection server, final Object msg) throws Exception { protected void handleMessage(final DatagramSocketImpl.Connection server, final Object msg) {
server.handlePacket((io.vertx.core.datagram.DatagramPacket) msg); server.handlePacket((io.vertx.core.datagram.DatagramPacket) msg);
} }


Expand Down
Expand Up @@ -321,7 +321,7 @@ public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exceptio
super.channelRead(chctx, msg); super.channelRead(chctx, msg);
} }
@Override @Override
protected void handleMessage(NetSocketImpl connection, Object msg) throws Exception { protected void handleMessage(NetSocketImpl connection, Object msg) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
connection.handleMessageReceived(buf); connection.handleMessageReceived(buf);
} }
Expand Down
Expand Up @@ -89,7 +89,7 @@ public void channelInactive(ChannelHandlerContext chctx) throws Exception {
} }


@Override @Override
protected void handleMessage(Http1xClientConnection conn, Object msg) throws Exception { protected void handleMessage(Http1xClientConnection conn, Object msg) {
if (msg instanceof HttpObject) { if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg; HttpObject obj = (HttpObject) msg;
DecoderResult result = obj.decoderResult(); DecoderResult result = obj.decoderResult();
Expand Down
Expand Up @@ -267,7 +267,7 @@ public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exceptio
} }


@Override @Override
protected void handleMessage(NetSocketImpl connection, Object msg) throws Exception { protected void handleMessage(NetSocketImpl connection, Object msg) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
connection.handleMessageReceived(buf); connection.handleMessageReceived(buf);
} }
Expand Down
Expand Up @@ -75,7 +75,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
} }


@Override @Override
protected void handleMessage(Http1xServerConnection conn, Object msg) throws Exception { protected void handleMessage(Http1xServerConnection conn, Object msg) {
conn.handleMessage(msg); conn.handleMessage(msg);
} }


Expand Down
Expand Up @@ -151,7 +151,7 @@ static class Http2ClientStream extends VertxHttp2Stream<Http2ClientConnection> i
private boolean requestEnded; private boolean requestEnded;
private boolean responseEnded; private boolean responseEnded;


public Http2ClientStream(Http2ClientConnection conn, HttpClientRequestBase request, Http2Stream stream, boolean writable) throws Http2Exception { public Http2ClientStream(Http2ClientConnection conn, HttpClientRequestBase request, Http2Stream stream, boolean writable) {
super(conn, stream, writable); super(conn, stream, writable);
this.request = request; this.request = request;
} }
Expand Down
Expand Up @@ -11,7 +11,6 @@


package io.vertx.core.http.impl; package io.vertx.core.http.impl;


import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Handler; import io.vertx.core.Handler;
Expand Down Expand Up @@ -44,7 +43,7 @@ public HttpClientRequestPushPromise(
String uri, String uri,
String host, String host,
int port, int port,
MultiMap headers) throws Http2Exception { MultiMap headers) {
super(client, ssl, method, host, port, uri); super(client, ssl, method, host, port, uri);
this.conn = conn; this.conn = conn;
this.stream = new Http2ClientConnection.Http2ClientStream(conn, this, stream, false); this.stream = new Http2ClientConnection.Http2ClientStream(conn, this, stream, false);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -645,7 +645,7 @@ public ServerHandlerWithWebSockets(SSLHelper sslHelper, HttpServerOptions option
} }


@Override @Override
protected void handleMessage(Http1xServerConnection conn, Object msg) throws Exception { protected void handleMessage(Http1xServerConnection conn, Object msg) {
Channel ch = conn.channel(); Channel ch = conn.channel();
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
final HttpRequest request = (HttpRequest) msg; final HttpRequest request = (HttpRequest) msg;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -174,7 +174,7 @@ static boolean isOnVertxThread(boolean worker) {
// This is called to execute code where the origin is IO (from Netty probably). // This is called to execute code where the origin is IO (from Netty probably).
// In such a case we should already be on an event loop thread (as Netty manages the event loops) // In such a case we should already be on an event loop thread (as Netty manages the event loops)
// but check this anyway, then execute directly // but check this anyway, then execute directly
public void executeFromIO(ContextTask task) { public void executeFromIO(Runnable task) {
if (THREAD_CHECKS) { if (THREAD_CHECKS) {
checkCorrectThread(); checkCorrectThread();
} }
Expand Down Expand Up @@ -298,7 +298,7 @@ public synchronized ConcurrentMap<Object, Object> contextData() {
return contextData; return contextData;
} }


protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) { protected Runnable wrapTask(Runnable cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
Object metric = metrics != null ? metrics.submitted() : null; Object metric = metrics != null ? metrics.submitted() : null;
return () -> { return () -> {
Thread th = Thread.currentThread(); Thread th = Thread.currentThread();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/ContextInternal.java
Expand Up @@ -79,7 +79,7 @@ static boolean isOnVertxThread() {
* *
* @param task the task to execute * @param task the task to execute
*/ */
void executeFromIO(ContextTask task); void executeFromIO(Runnable task);


/** /**
* @return the {@link ConcurrentMap} used to store context data * @return the {@link ConcurrentMap} used to store context data
Expand Down
21 changes: 0 additions & 21 deletions src/main/java/io/vertx/core/impl/ContextTask.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/WorkerContext.java
Expand Up @@ -47,7 +47,7 @@ protected void checkCorrectThread() {
// In the case of a worker context, the IO will always be provided on an event loop thread, not a worker thread // In the case of a worker context, the IO will always be provided on an event loop thread, not a worker thread
// so we need to execute it on the worker thread // so we need to execute it on the worker thread
@Override @Override
public void executeFromIO(ContextTask task) { public void executeFromIO(Runnable task) {
orderedTasks.execute(wrapTask(task, null, true, workerPool.metrics()), workerPool.executor()); orderedTasks.execute(wrapTask(task, null, true, workerPool.metrics()), workerPool.executor());
} }


Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -235,7 +235,7 @@ private void connected(ContextInternal context, Channel ch, Handler<AsyncResult<


VertxNetHandler handler = new VertxNetHandler(ctx -> new NetSocketImpl(vertx, ctx, remoteAddress, context, sslHelper, metrics)) { VertxNetHandler handler = new VertxNetHandler(ctx -> new NetSocketImpl(vertx, ctx, remoteAddress, context, sslHelper, metrics)) {
@Override @Override
protected void handleMessage(NetSocketImpl connection, Object msg) throws Exception { protected void handleMessage(NetSocketImpl connection, Object msg) {
connection.handleMessageReceived(msg);; connection.handleMessageReceived(msg);;
} }
}; };
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/NetServerImpl.java
Expand Up @@ -436,7 +436,7 @@ private void connected(HandlerHolder<Handlers> handler, Channel ch) {


VertxNetHandler nh = new VertxNetHandler(ctx -> new NetSocketImpl(vertx, ctx, handler.context, sslHelper, metrics)) { VertxNetHandler nh = new VertxNetHandler(ctx -> new NetSocketImpl(vertx, ctx, handler.context, sslHelper, metrics)) {
@Override @Override
protected void handleMessage(NetSocketImpl connection, Object msg) throws Exception { protected void handleMessage(NetSocketImpl connection, Object msg) {
connection.handleMessageReceived(msg); connection.handleMessageReceived(msg);
} }
}; };
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/core/net/impl/VertxHandler.java
Expand Up @@ -22,15 +22,14 @@
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.ContextTask;


/** /**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a> * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/ */
public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDuplexHandler { public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDuplexHandler {


private C conn; private C conn;
private ContextTask endReadAndFlush; private Runnable endReadAndFlush;
private Handler<C> addHandler; private Handler<C> addHandler;
private Handler<C> removeHandler; private Handler<C> removeHandler;


Expand Down Expand Up @@ -155,7 +154,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
ctx.fireUserEventTriggered(evt); ctx.fireUserEventTriggered(evt);
} }


protected abstract void handleMessage(C connection, Object msg) throws Exception; protected abstract void handleMessage(C connection, Object msg);


/** /**
* Decode the message before passing it to the channel * Decode the message before passing it to the channel
Expand Down

0 comments on commit e82072c

Please sign in to comment.