Skip to content

Commit

Permalink
Refactor to split between HttpClientConnection/HttpClientStream inter…
Browse files Browse the repository at this point in the history
…faces
  • Loading branch information
vietj committed Mar 13, 2016
1 parent d8e4ea9 commit de8abc8
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 64 deletions.
9 changes: 5 additions & 4 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -63,7 +63,7 @@
* *
* @author <a href="http://tfox.org">Tim Fox</a> * @author <a href="http://tfox.org">Tim Fox</a>
*/ */
class ClientConnection extends ConnectionBase implements HttpClientStream { class ClientConnection extends ConnectionBase implements HttpClientConnection, HttpClientStream {


private static final Logger log = LoggerFactory.getLogger(ClientConnection.class); private static final Logger log = LoggerFactory.getLogger(ClientConnection.class);


Expand Down Expand Up @@ -428,12 +428,13 @@ protected synchronized void handleException(Throwable e) {
} }
} }


public synchronized void beginRequest(HttpClientRequestImpl req) { public synchronized HttpClientStream beginRequest(HttpClientRequestImpl req) {
if (currentRequest != null) { if (currentRequest != null) {
throw new IllegalStateException("Connection is already writing a request"); throw new IllegalStateException("Connection is already writing a request");
} }
this.currentRequest = req; this.currentRequest = req;
this.requests.add(req); this.requests.add(req);
return this;
} }


public synchronized void endRequest() { public synchronized void endRequest() {
Expand Down Expand Up @@ -500,7 +501,7 @@ public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exceptio
} }


@Override @Override
public HttpConnection connection() { public HttpClientConnection connection() {
return null; return this;
} }
} }
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -352,14 +352,14 @@ static abstract class Pool {


abstract void closeAllConnections(); abstract void closeAllConnections();


abstract void recycle(HttpClientStream stream); abstract void recycle(HttpClientConnection stream);


/** /**
* Handle the connection if the waiter is not cancelled, otherwise recycle the connection. * Handle the connection if the waiter is not cancelled, otherwise recycle the connection.
* *
* @param conn the connection * @param conn the connection
*/ */
void deliverConnection(HttpClientStream conn, Waiter waiter) { void deliverConnection(HttpClientConnection conn, Waiter waiter) {
if (conn.isClosed()) { if (conn.isClosed()) {
// The connection has been closed - closed connections can be in the pool // The connection has been closed - closed connections can be in the pool
// Get another connection - Note that we DO NOT call connectionClosed() on the pool at this point // Get another connection - Note that we DO NOT call connectionClosed() on the pool at this point
Expand Down Expand Up @@ -398,7 +398,7 @@ public boolean getConnection(Waiter waiter) {
} }
} }


void recycle(HttpClientStream stream) { void recycle(HttpClientConnection stream) {
recycle((ClientConnection) stream); recycle((ClientConnection) stream);
} }


Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -75,7 +75,7 @@ void createConn(ChannelHandlerContext handlerCtx, ContextImpl context, int port,
} }


@Override @Override
void recycle(HttpClientStream stream) { void recycle(HttpClientConnection stream) {
// todo // todo
} }


Expand Down
38 changes: 38 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientConnection.java
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.vertx.core.Context;
import io.vertx.core.net.NetSocket;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
interface HttpClientConnection {

Context getContext();

HttpClientStream beginRequest(HttpClientRequestImpl request);

// Try to remove that ?
void reportBytesWritten(long numberOfBytes);
void reportBytesRead(long s);

NetSocket createNetSocket();

boolean isClosed();
}
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -644,7 +644,7 @@ void getConnection(int port, String host, Handler<ClientConnection> handler, Han
ContextImpl context) { ContextImpl context) {
connectionManager.getConnection(port, host, new Waiter(null, context) { connectionManager.getConnection(port, host, new Waiter(null, context) {
@Override @Override
void handleSuccess(HttpClientStream conn) { void handleSuccess(HttpClientConnection conn) {
// Use some variance for this // Use some variance for this
handler.handle((ClientConnection) conn); handler.handle((ClientConnection) conn);
} }
Expand Down
26 changes: 12 additions & 14 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -359,7 +359,7 @@ public HttpConnection connection() {
if (conn == null) { if (conn == null) {
throw new IllegalStateException("Not yet connected"); throw new IllegalStateException("Not yet connected");
} }
return conn.connection(); return (HttpConnection) conn.connection();
} }
} }


Expand Down Expand Up @@ -543,7 +543,7 @@ void handleFailure(Throwable failure) {
} }


@Override @Override
void handleSuccess(HttpClientStream conn) { void handleSuccess(HttpClientConnection conn) {
connected(conn); connected(conn);
} }


Expand All @@ -563,22 +563,20 @@ boolean isCancelled() {
} }
} }


private synchronized void connected(HttpClientStream conn) { private synchronized void connected(HttpClientConnection s) {


conn.beginRequest(this); conn = s.beginRequest(this);


if (conn instanceof ClientConnection) { if (this.conn instanceof ClientConnection) {
ClientConnection http1Conn = (ClientConnection) conn; ClientConnection http1Conn = (ClientConnection) this.conn;
this.metric = client.httpClientMetrics().requestBegin(http1Conn.metric(), http1Conn.localAddress(), http1Conn.remoteAddress(), this); this.metric = client.httpClientMetrics().requestBegin(http1Conn.metric(), http1Conn.localAddress(), http1Conn.remoteAddress(), this);
} }


this.conn = conn;

// If anything was written or the request ended before we got the connection, then // If anything was written or the request ended before we got the connection, then
// we need to write it now // we need to write it now


if (pendingMaxSize != -1) { if (pendingMaxSize != -1) {
conn.doSetWriteQueueMaxSize(pendingMaxSize); this.conn.doSetWriteQueueMaxSize(pendingMaxSize);
} }


if (pendingChunks != null) { if (pendingChunks != null) {
Expand All @@ -589,10 +587,10 @@ private synchronized void connected(HttpClientStream conn) {
// we also need to write the head so optimize this and write all out in once // we also need to write the head so optimize this and write all out in once
writeHeadWithContent(pending, true); writeHeadWithContent(pending, true);


conn.reportBytesWritten(written); s.reportBytesWritten(written);


if (respHandler != null) { if (respHandler != null) {
conn.endRequest(); this.conn.endRequest();
} }
} else { } else {
writeHeadWithContent(pending, false); writeHeadWithContent(pending, false);
Expand All @@ -602,10 +600,10 @@ private synchronized void connected(HttpClientStream conn) {
// we also need to write the head so optimize this and write all out in once // we also need to write the head so optimize this and write all out in once
writeHeadWithContent(Unpooled.EMPTY_BUFFER, true); writeHeadWithContent(Unpooled.EMPTY_BUFFER, true);


conn.reportBytesWritten(written); s.reportBytesWritten(written);


if (respHandler != null) { if (respHandler != null) {
conn.endRequest(); this.conn.endRequest();
} }
} else { } else {
if (writeHead) { if (writeHead) {
Expand Down Expand Up @@ -678,7 +676,7 @@ private void write(ByteBuf buff, boolean end) {
conn.writeBuffer(buff, end); conn.writeBuffer(buff, end);
} }
if (end) { if (end) {
conn.reportBytesWritten(written); conn.connection().reportBytesWritten(written);


if (respHandler != null) { if (respHandler != null) {
conn.endRequest(); conn.endRequest();
Expand Down
Expand Up @@ -20,7 +20,6 @@
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.MultiMap; import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
Expand Down Expand Up @@ -228,7 +227,7 @@ void handleChunk(Buffer data) {


void handleEnd(MultiMap trailers) { void handleEnd(MultiMap trailers) {
synchronized (conn) { synchronized (conn) {
conn.reportBytesRead(bytesRead); conn.connection().reportBytesRead(bytesRead);
bytesRead = 0; bytesRead = 0;
request.reportResponseEnd(this); request.reportResponseEnd(this);
if (paused) { if (paused) {
Expand Down Expand Up @@ -259,7 +258,7 @@ void handleException(Throwable e) {
public NetSocket netSocket() { public NetSocket netSocket() {
synchronized (conn) { synchronized (conn) {
if (netSocket == null) { if (netSocket == null) {
netSocket = conn.createNetSocket(); netSocket = conn.connection().createNetSocket();
} }
return netSocket; return netSocket;
} }
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/io/vertx/core/http/impl/HttpClientStream.java
Expand Up @@ -19,24 +19,20 @@
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.MultiMap; import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpMethod;
import io.vertx.core.net.NetSocket;


/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/ */
interface HttpClientStream { interface HttpClientStream {


HttpClientConnection connection();
Context getContext(); Context getContext();


void writeHead(HttpMethod method, String uri, MultiMap headers, String hostHeader, boolean chunked); void writeHead(HttpMethod method, String uri, MultiMap headers, String hostHeader, boolean chunked);
void writeHeadWithContent(HttpMethod method, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end); void writeHeadWithContent(HttpMethod method, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end);
void writeBuffer(ByteBuf buf, boolean end); void writeBuffer(ByteBuf buf, boolean end);


void beginRequest(HttpClientRequestImpl request);
void endRequest();

void doSetWriteQueueMaxSize(int size); void doSetWriteQueueMaxSize(int size);
boolean isNotWritable(); boolean isNotWritable();
void handleInterestedOpsChanged(); void handleInterestedOpsChanged();
Expand All @@ -45,12 +41,5 @@ interface HttpClientStream {


void reset(long code); void reset(long code);


// Try to remove that ? void endRequest();
void reportBytesWritten(long numberOfBytes);
void reportBytesRead(long s);

NetSocket createNetSocket();
HttpConnection connection();

boolean isClosed();
} }
54 changes: 30 additions & 24 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2ClientHandler.java
Expand Up @@ -46,7 +46,7 @@
/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/ */
class VertxHttp2ClientHandler extends VertxHttp2ConnectionHandler { class VertxHttp2ClientHandler extends VertxHttp2ConnectionHandler implements HttpClientConnection {


final Http2Pool http2Pool; final Http2Pool http2Pool;
final ChannelHandlerContext handlerCtx; final ChannelHandlerContext handlerCtx;
Expand Down Expand Up @@ -85,22 +85,46 @@ public HttpConnection exceptionHandler(Handler<Throwable> handler) {
} }


void handle(Waiter waiter) { void handle(Waiter waiter) {
Http2ClientStream stream = createStream(waiter.req); waiter.handleSuccess(this);
waiter.handleSuccess(stream);
} }


Http2ClientStream createStream(HttpClientRequestBase req) { @Override
public Context getContext() {
return context;
}

@Override
public HttpClientStream beginRequest(HttpClientRequestImpl request) {
try { try {
Http2Connection conn = connection(); Http2Connection conn = connection();
Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false); Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false);
Http2ClientStream clientStream = new Http2ClientStream(this, req, stream); Http2ClientStream clientStream = new Http2ClientStream(this, request, stream);
streams.put(clientStream.stream.id(), clientStream); streams.put(clientStream.stream.id(), clientStream);
return clientStream; return clientStream;
} catch (Http2Exception e) { } catch (Http2Exception e) {
throw new UnsupportedOperationException("handle me gracefully", e); throw new UnsupportedOperationException("handle me gracefully", e);
} }
} }


@Override
public void reportBytesWritten(long numberOfBytes) {
}

@Override
public void reportBytesRead(long s) {
}

@Override
public NetSocket createNetSocket() {
return null;
}

@Override
public boolean isClosed() {
// Todo
return false;
}

@Override @Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = streams.get(streamId); Http2ClientStream stream = streams.get(streamId);
Expand Down Expand Up @@ -276,9 +300,6 @@ public void handleInterestedOpsChanged() {
((HttpClientRequestImpl)req).handleDrained(); ((HttpClientRequestImpl)req).handleDrained();
} }
@Override @Override
public void beginRequest(HttpClientRequestImpl request) {
}
@Override
public void endRequest() { public void endRequest() {
} }
@Override @Override
Expand Down Expand Up @@ -309,26 +330,11 @@ public void reset(long code) {
encoder.writeRstStream(handlerCtx, stream.id(), code, handlerCtx.newPromise()); encoder.writeRstStream(handlerCtx, stream.id(), code, handlerCtx.newPromise());
handlerCtx.flush(); handlerCtx.flush();
} }
@Override
public void reportBytesWritten(long numberOfBytes) {
}
@Override
public void reportBytesRead(long s) {
}
@Override
public NetSocket createNetSocket() {
throw new UnsupportedOperationException();
}


@Override @Override
public HttpConnection connection() { public HttpClientConnection connection() {
return handler; return handler;
} }


@Override
public boolean isClosed() {
// Todo
return false;
}
} }
} }
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/Waiter.java
Expand Up @@ -43,7 +43,7 @@ public Waiter(HttpClientRequestImpl req, ContextImpl context) {
* *
* @param conn the connection * @param conn the connection
*/ */
abstract void handleSuccess(HttpClientStream conn); abstract void handleSuccess(HttpClientConnection conn);


/** /**
* @return true if the waiter has been cancelled * @return true if the waiter has been cancelled
Expand Down

0 comments on commit de8abc8

Please sign in to comment.