Skip to content

Commit

Permalink
Make Http1xPool and ClientHandler not anymore inner classes of Connec…
Browse files Browse the repository at this point in the history
…tionManager like Http2Pool
  • Loading branch information
vietj committed Apr 6, 2016
1 parent ed26767 commit 6df887e
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 212 deletions.
9 changes: 4 additions & 5 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -31,7 +31,6 @@
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;
Expand Down Expand Up @@ -71,7 +70,7 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H
private final boolean ssl; private final boolean ssl;
private final String host; private final String host;
private final int port; private final int port;
private final ConnectionManager.Http1xPool pool; private final Http1xPool pool;
// Requests can be pipelined so we need a queue to keep track of requests // Requests can be pipelined so we need a queue to keep track of requests
private final Queue<HttpClientRequestImpl> requests = new ArrayDeque<>(); private final Queue<HttpClientRequestImpl> requests = new ArrayDeque<>();
private final Handler<Throwable> exceptionHandler; private final Handler<Throwable> exceptionHandler;
Expand All @@ -88,9 +87,9 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H
private boolean paused; private boolean paused;
private Buffer pausedChunk; private Buffer pausedChunk;


ClientConnection(HttpVersion version, VertxInternal vertx, HttpClientImpl client, Handler<Throwable> exceptionHandler, Channel channel, boolean ssl, String host, ClientConnection(HttpVersion version, HttpClientImpl client, Handler<Throwable> exceptionHandler, Channel channel, boolean ssl, String host,
int port, ContextImpl context, ConnectionManager.Http1xPool pool, HttpClientMetrics metrics) { int port, ContextImpl context, Http1xPool pool, HttpClientMetrics metrics) {
super(vertx, channel, context, metrics); super(client.getVertx(), channel, context, metrics);
this.client = client; this.client = client;
this.ssl = ssl; this.ssl = ssl;
this.host = host; this.host = host;
Expand Down
102 changes: 102 additions & 0 deletions src/main/java/io/vertx/core/http/impl/ClientHandler.java
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl;

import java.util.Map;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class ClientHandler extends VertxHttpHandler<ClientConnection> {
private boolean closeFrameSent;
private ContextImpl context;

public ClientHandler(ContextImpl context, Map<Channel, ClientConnection> connectionMap) {
super(connectionMap);
this.context = context;
}

@Override
protected ContextImpl getContext(ClientConnection connection) {
return context;
}

@Override
protected void doMessageReceived(ClientConnection conn, ChannelHandlerContext ctx, Object msg) {
if (conn == null) {
return;
}
boolean valid = false;
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
conn.handleResponse(response);
valid = true;
}
if (msg instanceof HttpContent) {
HttpContent chunk = (HttpContent) msg;
if (chunk.content().isReadable()) {
Buffer buff = Buffer.buffer(chunk.content().slice());
conn.handleResponseChunk(buff);
}
if (chunk instanceof LastHttpContent) {
conn.handleResponseEnd((LastHttpContent) chunk);
}
valid = true;
} else if (msg instanceof WebSocketFrameInternal) {
WebSocketFrameInternal frame = (WebSocketFrameInternal) msg;
switch (frame.type()) {
case BINARY:
case CONTINUATION:
case TEXT:
conn.handleWsFrame(frame);
break;
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
ctx.writeAndFlush(new WebSocketFrameImpl(FrameType.PONG, frame.getBinaryData()));
break;
case PONG:
// Just ignore it
break;
case CLOSE:
if (!closeFrameSent) {
// Echo back close frame and close the connection once it was written.
// This is specified in the WebSockets RFC 6455 Section 5.4.1
ctx.writeAndFlush(frame).addListener(ChannelFutureListener.CLOSE);
closeFrameSent = true;
}
break;
default:
throw new IllegalStateException("Invalid type: " + frame.type());
}
valid = true;
}
if (!valid) {
throw new IllegalStateException("Invalid object " + msg);
}
}
}
209 changes: 2 additions & 207 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -19,7 +19,6 @@
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
Expand All @@ -29,24 +28,17 @@
import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler; import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.Context;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
Expand All @@ -57,10 +49,8 @@
import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLHandshakeException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;


/** /**
Expand Down Expand Up @@ -201,7 +191,7 @@ public class ConnQueue {
if (version == HttpVersion.HTTP_2) { if (version == HttpVersion.HTTP_2) {
pool = new Http2Pool(this, client, mgr.connectionMap); pool = new Http2Pool(this, client, mgr.connectionMap);
} else { } else {
pool = new Http1xPool(this, version); pool = new Http1xPool(client, options, this, mgr.connectionMap, version);
} }
} }


Expand Down Expand Up @@ -380,7 +370,7 @@ private void handshakeFailure(ContextImpl context, Channel ch, Throwable cause,
private void fallbackToHttp1x(Channel ch, ContextImpl context, HttpVersion fallbackVersion, int port, String host, Waiter waiter) { private void fallbackToHttp1x(Channel ch, ContextImpl context, HttpVersion fallbackVersion, int port, String host, Waiter waiter) {
// change the pool to Http1xPool // change the pool to Http1xPool
synchronized (this) { synchronized (this) {
pool = new Http1xPool(this, fallbackVersion); pool = new Http1xPool(client, options, this, mgr.connectionMap, fallbackVersion);
} }
applyHttp1xConnectionOptions(ch.pipeline(), context); applyHttp1xConnectionOptions(ch.pipeline(), context);
http1xConnected(fallbackVersion, context, port, host, ch, waiter); http1xConnected(fallbackVersion, context, port, host, ch, waiter);
Expand Down Expand Up @@ -504,199 +494,4 @@ void deliverStream(C conn, Waiter waiter) {
} }
} }
} }

public class Http1xPool extends Pool<ClientConnection> {

private final HttpVersion version;
private final Set<ClientConnection> allConnections = new HashSet<>();
private final Queue<ClientConnection> availableConnections = new ArrayDeque<>();

public Http1xPool(ConnQueue queue, HttpVersion version) {
super(queue, client.getOptions().getMaxPoolSize());
this.version = version;
}

@Override
HttpVersion version() {
// Correct this
return version;
}

public boolean getConnection(Waiter waiter) {
ClientConnection conn = availableConnections.poll();
if (conn != null && conn.isValid()) {
ContextImpl context = waiter.context;
if (context == null) {
context = conn.getContext();
} else if (context != conn.getContext()) {
log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
}
context.runOnContext(v -> deliverStream(conn, waiter));
return true;
} else {
return false;
}
}

@Override
HttpClientStream createStream(ClientConnection conn) {
return conn;
}

// Called when the request has ended
void recycle(ClientConnection conn) {
synchronized (queue) {
if (pipelining) {
// Maybe the connection can be reused
Waiter waiter = queue.getNextWaiter();
if (waiter != null) {
Context context = waiter.context;
if (context == null) {
context = conn.getContext();
}
context.runOnContext(v -> deliverStream(conn, waiter));
}
}
}
}

// Called when the response has ended
public synchronized void responseEnded(ClientConnection conn, boolean close) {
synchronized (queue) {
if ((pipelining || keepAlive) && !close) {
if (conn.getCurrentRequest() == null) {
Waiter waiter = queue.getNextWaiter();
if (waiter != null) {
Context context = waiter.context;
if (context == null) {
context = conn.getContext();
}
context.runOnContext(v -> deliverStream(conn, waiter));
} else if (conn.getOutstandingRequestCount() == 0) {
// Return to set of available from here to not return it several times
availableConnections.add(conn);
}
}
} else {
// Close it now
conn.close();
}
}
}

private void createConn(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) {
ClientConnection conn = new ClientConnection(version, vertx, client, waiter::handleFailure, ch,
options.isSsl(), host, port, context, this, client.metrics);
conn.closeHandler(v -> {
// The connection has been closed - tell the pool about it, this allows the pool to create more
// connections. Note the pool doesn't actually remove the connection, when the next person to get a connection
// gets the closed on, they will check if it's closed and if so get another one.
connectionClosed(conn);
});
synchronized (queue) {
allConnections.add(conn);
}
queue.mgr.connectionMap.put(ch, conn);
deliverStream(conn, waiter);
}

// Called if the connection is actually closed, OR the connection attempt failed - in the latter case
// conn will be null
public synchronized void connectionClosed(ClientConnection conn) {
synchronized (queue) {
allConnections.remove(conn);
availableConnections.remove(conn);
queue.connectionClosed();
}
}

void closeAllConnections() {
Set<ClientConnection> copy;
synchronized (this) {
copy = new HashSet<>(allConnections);
allConnections.clear();
}
// Close outside sync block to avoid deadlock
for (ClientConnection conn: copy) {
try {
conn.close();
} catch (Throwable t) {
log.error("Failed to close connection", t);
}
}
}

void removeChannel(Channel channel) {
queue.mgr.connectionMap.remove(channel);
}
}

private class ClientHandler extends VertxHttpHandler<ClientConnection> {
private boolean closeFrameSent;
private ContextImpl context;

public ClientHandler(ContextImpl context, Map<Channel, ClientConnection> connectionMap) {
super(connectionMap);
this.context = context;
}

@Override
protected ContextImpl getContext(ClientConnection connection) {
return context;
}

@Override
protected void doMessageReceived(ClientConnection conn, ChannelHandlerContext ctx, Object msg) {
if (conn == null) {
return;
}
boolean valid = false;
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
conn.handleResponse(response);
valid = true;
}
if (msg instanceof HttpContent) {
HttpContent chunk = (HttpContent) msg;
if (chunk.content().isReadable()) {
Buffer buff = Buffer.buffer(chunk.content().slice());
conn.handleResponseChunk(buff);
}
if (chunk instanceof LastHttpContent) {
conn.handleResponseEnd((LastHttpContent)chunk);
}
valid = true;
} else if (msg instanceof WebSocketFrameInternal) {
WebSocketFrameInternal frame = (WebSocketFrameInternal) msg;
switch (frame.type()) {
case BINARY:
case CONTINUATION:
case TEXT:
conn.handleWsFrame(frame);
break;
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
ctx.writeAndFlush(new WebSocketFrameImpl(FrameType.PONG, frame.getBinaryData()));
break;
case PONG:
// Just ignore it
break;
case CLOSE:
if (!closeFrameSent) {
// Echo back close frame and close the connection once it was written.
// This is specified in the WebSockets RFC 6455 Section 5.4.1
ctx.writeAndFlush(frame).addListener(ChannelFutureListener.CLOSE);
closeFrameSent = true;
}
break;
default:
throw new IllegalStateException("Invalid type: " + frame.type());
}
valid = true;
}
if (!valid) {
throw new IllegalStateException("Invalid object " + msg);
}
}
}
} }

0 comments on commit 6df887e

Please sign in to comment.