Skip to content

Commit

Permalink
470311 - Introduce a proxy-protocol module.
Browse files Browse the repository at this point in the history
Support for the PROXY protocol is now enabled via 2 new modules:
proxy-protocol and proxy-protocol-ssl, respectively for the HTTP
connector and the SSL connector.
  • Loading branch information
sbordet committed Aug 11, 2015
1 parent 8837291 commit aa684a5
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 99 deletions.
Expand Up @@ -261,7 +261,7 @@ public void connect(InetSocketAddress address, Session.Listener listener, Promis

public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
{
connect(sslContextFactory, address, listener, promise, new HashMap<String, Object>());
connect(sslContextFactory, address, listener, promise, null);
}

public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
Expand All @@ -271,15 +271,7 @@ public void connect(SslContextFactory sslContextFactory, InetSocketAddress addre
SocketChannel channel = SocketChannel.open();
configure(channel);
channel.configureBlocking(false);

context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
if (sslContextFactory != null)
context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());

context = contextFrom(sslContextFactory, address, listener, promise, context);
if (channel.connect(address))
selector.accept(channel, context);
else
Expand All @@ -291,6 +283,36 @@ public void connect(SslContextFactory sslContextFactory, InetSocketAddress addre
}
}

public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise<Session> promise)
{
try
{
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
channel.configureBlocking(false);
Map<String, Object> context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
selector.accept(channel, context);
}
catch (Throwable x)
{
promise.failed(x);
}
}

private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
if (context == null)
context = new HashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
if (sslContextFactory != null)
context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
return context;
}

protected void configure(SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(true);
Expand Down
@@ -0,0 +1,121 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.http2.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class ProxyProtocolTest
{
private Server server;
private ServerConnector connector;
private HTTP2Client client;

public void startServer(Handler handler) throws Exception
{
server = new Server();
HttpConfiguration configuration = new HttpConfiguration();
connector = new ServerConnector(server, new ProxyConnectionFactory(), new HTTP2CServerConnectionFactory(configuration));
server.addConnector(connector);
server.setHandler(handler);

client = new HTTP2Client();
server.addBean(client, true);

server.start();
}

@After
public void dispose() throws Exception
{
if (server != null)
server.stop();
}

@Test
public void test_PROXY_GET() throws Exception
{
startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
});

String request1 = "PROXY TCP4 1.2.3.4 5.6.7.8 1111 2222\r\n";
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", connector.getLocalPort()));
channel.write(ByteBuffer.wrap(request1.getBytes(StandardCharsets.UTF_8)));

FuturePromise<Session> promise = new FuturePromise<>();
client.accept(null, channel, new Session.Listener.Adapter(), promise);
Session session = promise.get(5, TimeUnit.SECONDS);

HttpFields fields = new HttpFields();
String uri = "http://localhost:" + connector.getLocalPort() + "/";
MetaData.Request metaData = new MetaData.Request("GET", new HttpURI(uri), HttpVersion.HTTP_2, fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
if (frame.isEndStream())
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
22 changes: 11 additions & 11 deletions jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
Expand Up @@ -38,7 +38,7 @@
public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractConnection.class);

private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final long _created=System.currentTimeMillis();
private final EndPoint _endPoint;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void run()
callback.failed(x);
}
}

/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
Expand All @@ -122,12 +122,12 @@ public void fillInterested()
LOG.debug("fillInterested {}",this);
getEndPoint().fillInterested(_readCallback);
}

public boolean isFillInterested()
{
return ((AbstractEndPoint)getEndPoint()).getFillInterest().isInterested();
return getEndPoint().isFillInterested();
}

/**
* <p>Callback method invoked when the endpoint is ready to be read.</p>
* @see #fillInterested()
Expand All @@ -154,10 +154,10 @@ protected void onFillInterestedFailed(Throwable cause)
else
{
_endPoint.shutdownOutput();
fillInterested();
}
fillInterested();
}
}
}
}
}

/**
Expand Down Expand Up @@ -236,9 +236,9 @@ public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}

private class ReadCallback implements Callback
{
{
@Override
public void succeeded()
{
Expand All @@ -250,7 +250,7 @@ public void failed(final Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public String toString()
{
Expand Down
28 changes: 17 additions & 11 deletions jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
Expand Up @@ -45,7 +45,7 @@ protected void needsFillInterest() throws IOException
AbstractEndPoint.this.needsFillInterest();
}
};

private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
Expand Down Expand Up @@ -79,7 +79,7 @@ public InetSocketAddress getRemoteAddress()
{
return _remote;
}

@Override
public Connection getConnection()
{
Expand Down Expand Up @@ -115,7 +115,7 @@ public void onClose()
_writeFlusher.onClose();
_fillInterest.onClose();
}

@Override
public void close()
{
Expand All @@ -129,6 +129,12 @@ public void fillInterested(Callback callback) throws IllegalStateException
_fillInterest.register(callback);
}

@Override
public boolean isFillInterested()
{
return _fillInterest.isInterested();
}

@Override
public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException
{
Expand Down Expand Up @@ -156,17 +162,17 @@ protected void onIdleExpired(TimeoutException timeout)
boolean input_shutdown=isInputShutdown();
boolean fillFailed = _fillInterest.onFail(timeout);
boolean writeFailed = _writeFlusher.onFail(timeout);

// If the endpoint is half closed and there was no fill/write handling, then close here.
// This handles the situation where the connection has completed its close handling
// This handles the situation where the connection has completed its close handling
// and the endpoint is half closed, but the other party does not complete the close.
// This perhaps should not check for half closed, however the servlet spec case allows
// for a dispatched servlet or suspended request to extend beyond the connections idle
// time. So if this test would always close an idle endpoint that is not handled, then
// for a dispatched servlet or suspended request to extend beyond the connections idle
// time. So if this test would always close an idle endpoint that is not handled, then
// we would need a mode to ignore timeouts for some HTTP states
if (isOpen() && (output_shutdown || input_shutdown) && !(fillFailed || writeFailed))
close();
else
else
LOG.debug("Ignored idle endpoint {}",this);
}

Expand All @@ -177,7 +183,7 @@ public void upgrade(Connection newConnection)

if (LOG.isDebugEnabled())
LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection);

ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom)
?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null;
old_connection.onClose();
Expand All @@ -190,7 +196,7 @@ else if (BufferUtil.hasContent(prefilled))

newConnection.onOpen();
}

@Override
public String toString()
{
Expand All @@ -201,7 +207,7 @@ public String toString()
c=c.getSuperclass();
name=c.getSimpleName();
}

return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}",
name,
hashCode(),
Expand Down
12 changes: 9 additions & 3 deletions jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
Expand Up @@ -32,9 +32,9 @@
/**
*
* A transport EndPoint
*
*
* <h3>Asynchronous Methods</h3>
* <p>The asynchronous scheduling methods of {@link EndPoint}
* <p>The asynchronous scheduling methods of {@link EndPoint}
* has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because they have
* some inefficiencies.</p>
Expand Down Expand Up @@ -170,7 +170,7 @@ public interface EndPoint extends Closeable
* are taken from the header/buffer position up until the buffer limit. The header/buffers position
* is updated to indicate how many bytes have been consumed.
* @param buffer the buffers to flush
* @return True IFF all the buffers have been consumed and the endpoint has flushed the data to its
* @return True IFF all the buffers have been consumed and the endpoint has flushed the data to its
* destination (ie is not buffering any data).
* @throws IOException If the endpoint is closed or output is shutdown.
*/
Expand Down Expand Up @@ -205,6 +205,12 @@ public interface EndPoint extends Closeable
*/
void fillInterested(Callback callback) throws ReadPendingException;

/**
* @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet
* been called
*/
boolean isFillInterested();

/**
* <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* all the data has been flushed or an error occurs.</p>
Expand Down
4 changes: 0 additions & 4 deletions jetty-server/src/main/config/etc/jetty-http.xml
Expand Up @@ -26,10 +26,6 @@
<Arg name="selectors" type="int"><Property name="jetty.http.selectors" deprecated="http.selectors" default="-1"/></Arg>
<Arg name="factories">
<Array type="org.eclipse.jetty.server.ConnectionFactory">
<!-- uncomment to support proxy protocol
<Item>
<New class="org.eclipse.jetty.server.ProxyConnectionFactory"/>
</Item>-->
<Item>
<New class="org.eclipse.jetty.server.HttpConnectionFactory">
<Arg name="config"><Ref refid="httpConfig" /></Arg>
Expand Down
10 changes: 10 additions & 0 deletions jetty-server/src/main/config/etc/jetty-proxy-protocol-ssl.xml
@@ -0,0 +1,10 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">

<Configure id="sslConnector" class="org.eclipse.jetty.server.ServerConnector">
<Call name="addFirstConnectionFactory">
<Arg>
<New class="org.eclipse.jetty.server.ProxyConnectionFactory"/>
</Arg>
</Call>
</Configure>

0 comments on commit aa684a5

Please sign in to comment.