Skip to content

Commit

Permalink
Fixes #2672 - Max local stream count exceeded for Http2 Client (#2693)
Browse files Browse the repository at this point in the history
* Fixes #2672 - Max local stream count exceeded for HttpClient with HTTP/2 transport.

Now waiting for the server preface to arrive to the client before making
the connection available to the ConnectionPool.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jul 16, 2018
1 parent 9fc7e90 commit 980282e
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
Generator generator = new Generator(byteBufferPool);
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);

Parser parser = new Parser(byteBufferPool, session, 4096, 8192);

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicMarkableReference;

import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
Expand Down Expand Up @@ -135,7 +138,12 @@ public void connect(InetSocketAddress address, Map<String, Object> context)
if (HttpScheme.HTTPS.is(destination.getScheme()))
sslContextFactory = httpClient.getSslContextFactory();

client.connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
}

protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
getHTTP2Client().connect(sslContextFactory, address, listener, promise, context);
}

@Override
Expand Down Expand Up @@ -164,8 +172,8 @@ protected void onClose(HttpConnectionOverHTTP2 connection, GoAwayFrame frame)

private class SessionListenerPromise extends Session.Listener.Adapter implements Promise<Session>
{
private final AtomicMarkableReference<HttpConnectionOverHTTP2> connection = new AtomicMarkableReference<>(null, false);
private final Map<String, Object> context;
private HttpConnectionOverHTTP2 connection;

private SessionListenerPromise(Map<String, Object> context)
{
Expand All @@ -175,14 +183,15 @@ private SessionListenerPromise(Map<String, Object> context)
@Override
public void succeeded(Session session)
{
connection = newHttpConnection(destination(), session);
promise().succeeded(connection);
// This method is invoked when the client preface
// is sent, but we want to succeed the nested
// promise when the server preface is received.
}

@Override
public void failed(Throwable failure)
{
promise().failed(failure);
failConnectionPromise(failure);
}

private HttpDestinationOverHTTP2 destination()
Expand All @@ -191,7 +200,7 @@ private HttpDestinationOverHTTP2 destination()
}

@SuppressWarnings("unchecked")
private Promise<Connection> promise()
private Promise<Connection> connectionPromise()
{
return (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
}
Expand All @@ -202,26 +211,55 @@ public void onSettings(Session session, SettingsFrame frame)
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
if (!connection.isMarked())
onServerPreface(session);
}

private void onServerPreface(Session session)
{
HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session);
if (this.connection.compareAndSet(null, connection, false, true))
connectionPromise().succeeded(connection);
}

@Override
public void onClose(Session session, GoAwayFrame frame)
{
HttpClientTransportOverHTTP2.this.onClose(connection, frame);
if (failConnectionPromise(new ClosedChannelException()))
return;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
if (connection != null)
HttpClientTransportOverHTTP2.this.onClose(connection, frame);
}

@Override
public boolean onIdleTimeout(Session session)
{
return connection.onIdleTimeout(((HTTP2Session)session).getEndPoint().getIdleTimeout());
long idleTimeout = ((HTTP2Session)session).getEndPoint().getIdleTimeout();
if (failConnectionPromise(new TimeoutException("Idle timeout expired: " + idleTimeout + " ms")))
return true;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
if (connection != null)
return connection.onIdleTimeout(idleTimeout);
return true;
}

@Override
public void onFailure(Session session, Throwable failure)
{
HttpConnectionOverHTTP2 c = connection;
if (c != null)
c.close(failure);
if (failConnectionPromise(failure))
return;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
if (connection != null)
connection.close(failure);
}

private boolean failConnectionPromise(Throwable failure)
{
boolean result = connection.compareAndSet(null, null, false, true);
if (result)
connectionPromise().failed(failure);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -477,22 +477,33 @@ protected HttpConnectionOverHTTP2 newHttpConnection(HttpDestination destination,
ServerParser parser = new ServerParser(byteBufferPool, new ServerParser.Listener.Adapter()
{
@Override
public void onHeaders(HeadersFrame request)
public void onPreface()
{
// Server's preface.
generator.control(lease, new SettingsFrame(new HashMap<>(), false));
// Reply to client's SETTINGS.
generator.control(lease, new SettingsFrame(new HashMap<>(), true));
writeFrames();
}

@Override
public void onHeaders(HeadersFrame request)
{
// Response.
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
HeadersFrame response = new HeadersFrame(request.getStreamId(), metaData, null, true);
generator.control(lease, response);
writeFrames();
}

private void writeFrames()
{
try
{
// Write the frames.
for (ByteBuffer buffer : lease.getByteBuffers())
output.write(BufferUtil.toArray(buffer));
lease.recycle();
}
catch (Throwable x)
{
Expand Down
Loading

0 comments on commit 980282e

Please sign in to comment.