Skip to content

Commit

Permalink
netty: back port of #6167 (converts proxy handler) (#6172)
Browse files Browse the repository at this point in the history
  • Loading branch information
creamsoup committed Sep 20, 2019
1 parent 9482db6 commit f5dc78d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 276 deletions.
14 changes: 0 additions & 14 deletions netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,6 @@ public static ChannelLogger negotiationLogger(ChannelHandlerContext ctx) {
return ProtocolNegotiators.negotiationLogger(ctx);
}

/**
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
* {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
* write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
* i.e. before it's active or the TLS Handshake is complete.
*/
public abstract static class AbstractBufferingHandler
extends ProtocolNegotiators.AbstractBufferingHandler {

protected AbstractBufferingHandler(ChannelHandler... handlers) {
super(handlers);
}
}

/**
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
Expand Down
282 changes: 46 additions & 236 deletions netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
Expand All @@ -52,7 +49,6 @@
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.ProxyConnectionEvent;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.OpenSslEngine;
import io.netty.handler.ssl.SslContext;
Expand All @@ -61,12 +57,9 @@
import io.netty.util.AsciiString;
import io.netty.util.Attribute;
import io.netty.util.AttributeMap;
import io.netty.util.ReferenceCountUtil;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -232,20 +225,16 @@ private void fail(ChannelHandlerContext ctx, Throwable exception) {
public static ProtocolNegotiator httpProxy(final SocketAddress proxyAddress,
final @Nullable String proxyUsername, final @Nullable String proxyPassword,
final ProtocolNegotiator negotiator) {
checkNotNull(negotiator, "negotiator");
checkNotNull(proxyAddress, "proxyAddress");
final AsciiString scheme = negotiator.scheme();
Preconditions.checkNotNull(proxyAddress, "proxyAddress");
Preconditions.checkNotNull(negotiator, "negotiator");
class ProxyNegotiator implements ProtocolNegotiator {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
HttpProxyHandler proxyHandler;
if (proxyUsername == null || proxyPassword == null) {
proxyHandler = new HttpProxyHandler(proxyAddress);
} else {
proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword);
}
return new BufferUntilProxyTunnelledHandler(
proxyHandler, negotiator.newHandler(http2Handler));
ChannelHandler protocolNegotiationHandler = negotiator.newHandler(http2Handler);
ChannelHandler ppnh = new ProxyProtocolNegotiationHandler(
proxyAddress, proxyUsername, proxyPassword, protocolNegotiationHandler);
return ppnh;
}

@Override
Expand All @@ -265,34 +254,57 @@ public void close() {
}

/**
* Buffers all writes until the HTTP CONNECT tunnel is established.
* A Proxy handler follows self replacing protocol negotiation pattern. Upon successful proxy
* connection, this handler will install {@code next} handler which should be a handler from
* other type of {@link ProtocolNegotiator} to continue negotiating protocol using proxy.
*/
static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler {
static final class ProxyProtocolNegotiationHandler extends ChannelDuplexHandler {

private final SocketAddress address;
@Nullable private final String userName;
@Nullable private final String password;
private final ChannelHandler next;

private ProtocolNegotiationEvent pne;

public BufferUntilProxyTunnelledHandler(ProxyHandler proxyHandler, ChannelHandler handler) {
super(proxyHandler, handler);
public ProxyProtocolNegotiationHandler(
SocketAddress address,
@Nullable String userName,
@Nullable String password,
ChannelHandler next) {
this.address = checkNotNull(address, "address");
this.userName = userName;
this.password = password;
this.next = checkNotNull(next, "next");
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ProxyConnectionEvent) {
writeBufferedAndRemove(ctx);
if (evt instanceof ProtocolNegotiationEvent) {
checkState(pne == null, "pre-existing negotiation: %s < %s", pne, evt);
this.pne = (ProtocolNegotiationEvent) evt;
protocolNegotiationEventTriggered(ctx);
} else if (evt instanceof ProxyConnectionEvent) {
fireProtocolNegotiationEvent(ctx);
} else {
super.userEventTriggered(ctx, evt);
}
super.userEventTriggered(ctx, evt);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
fail(ctx, unavailableException("Connection broken while trying to CONNECT through proxy"));
super.channelInactive(ctx);
private void protocolNegotiationEventTriggered(ChannelHandlerContext ctx) {
HttpProxyHandler nettyProxyHandler;
if (userName == null || password == null) {
nettyProxyHandler = new HttpProxyHandler(address);
} else {
nettyProxyHandler = new HttpProxyHandler(address, userName, password);
}
ctx.pipeline().addBefore(ctx.name(), /* newName= */ null, nettyProxyHandler);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
fail(ctx, unavailableException("Channel closed while trying to CONNECT through proxy"));
}
super.close(ctx, future);
private void fireProtocolNegotiationEvent(ChannelHandlerContext ctx) {
checkState(pne != null, "previous protocol negotiation event hasn't triggered");
ctx.pipeline().replace(ctx.name(), /* newName= */ null, next);
ctx.fireUserEventTriggered(pne);
}
}

Expand Down Expand Up @@ -573,208 +585,6 @@ static void logSslEngineDetails(Level level, ChannelHandlerContext ctx, String m
log.log(level, builder.toString(), t);
}

/**
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
* {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
* write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
* i.e. before it's active or the TLS Handshake is complete.
*/
public abstract static class AbstractBufferingHandler extends ChannelDuplexHandler {

private ChannelHandler[] handlers;
private Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>();
private boolean writing;
private boolean flushRequested;
private Throwable failCause;

/**
* @param handlers the ChannelHandlers are added to the pipeline on channelRegistered and
* before this handler.
*/
protected AbstractBufferingHandler(ChannelHandler... handlers) {
this.handlers = handlers;
}

/**
* When this channel is registered, we will add all the ChannelHandlers passed into our
* constructor to the pipeline.
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
/**
* This check is necessary as a channel may be registered with different event loops during it
* lifetime and we only want to configure it once.
*/
if (handlers != null && handlers.length > 0) {
for (ChannelHandler handler : handlers) {
ctx.pipeline().addBefore(ctx.name(), null, handler);
}
ChannelHandler handler0 = handlers[0];
ChannelHandlerContext handler0Ctx = ctx.pipeline().context(handlers[0]);
handlers = null;
if (handler0Ctx != null) { // The handler may have removed itself immediately
if (handler0 instanceof ChannelInboundHandler) {
((ChannelInboundHandler) handler0).channelRegistered(handler0Ctx);
} else {
handler0Ctx.fireChannelRegistered();
}
}
} else {
super.channelRegistered(ctx);
}
}

/**
* Do not rely on channel handlers to propagate exceptions to us.
* {@link NettyClientHandler} is an example of a class that does not propagate exceptions.
* Add a listener to the connect future directly and do appropriate error handling.
*/
@Override
public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress, promise);
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
fail(ctx, future.cause());
}
}
});
}

/**
* If we encounter an exception, then notify all buffered writes that we failed.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
fail(ctx, cause);
}

/**
* If this channel becomes inactive, then notify all buffered writes that we failed.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
fail(ctx, unavailableException("Connection broken while performing protocol negotiation"));
super.channelInactive(ctx);
}

/**
* Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
* called, or we have somehow failed. If we have already failed in the past, then the write
* will fail immediately.
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
/**
* This check handles a race condition between Channel.write (in the calling thread) and the
* removal of this handler (in the event loop thread).
* The problem occurs in e.g. this sequence:
* 1) [caller thread] The write method identifies the context for this handler
* 2) [event loop] This handler removes itself from the pipeline
* 3) [caller thread] The write method delegates to the invoker to call the write method in
* the event loop thread. When this happens, we identify that this handler has been
* removed with "bufferedWrites == null".
*/
if (failCause != null) {
promise.setFailure(failCause);
ReferenceCountUtil.release(msg);
} else if (bufferedWrites == null) {
super.write(ctx, msg, promise);
} else {
bufferedWrites.add(new ChannelWrite(msg, promise));
}
}

/**
* Calls to this method will not trigger an immediate flush. The flush will be deferred until
* {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
*/
@Override
public void flush(ChannelHandlerContext ctx) {
/**
* Swallowing any flushes is not only an optimization but also required
* for the SslHandler to work correctly. If the SslHandler receives multiple
* flushes while the handshake is still ongoing, then the handshake "randomly"
* times out. Not sure at this point why this is happening. Doing a single flush
* seems to work but multiple flushes don't ...
*/
if (bufferedWrites == null) {
ctx.flush();
} else {
flushRequested = true;
}
}

/**
* If we are still performing protocol negotiation, then this will propagate failures to all
* buffered writes.
*/
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
fail(ctx, unavailableException("Channel closed while performing protocol negotiation"));
}
super.close(ctx, future);
}

/**
* Propagate failures to all buffered writes.
*/
@SuppressWarnings("FutureReturnValueIgnored")
protected final void fail(ChannelHandlerContext ctx, Throwable cause) {
if (failCause == null) {
failCause = cause;
}
if (bufferedWrites != null) {
while (!bufferedWrites.isEmpty()) {
ChannelWrite write = bufferedWrites.poll();
write.promise.setFailure(cause);
ReferenceCountUtil.release(write.msg);
}
bufferedWrites = null;
}

ctx.fireExceptionCaught(cause);
}

@SuppressWarnings("FutureReturnValueIgnored")
protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
if (!ctx.channel().isActive() || writing) {
return;
}
// Make sure that method can't be reentered, so that the ordering
// in the queue can't be messed up.
writing = true;
while (!bufferedWrites.isEmpty()) {
ChannelWrite write = bufferedWrites.poll();
ctx.write(write.msg, write.promise);
}
assert bufferedWrites.isEmpty();
bufferedWrites = null;
if (flushRequested) {
ctx.flush();
}
// Removal has to happen last as the above writes will likely trigger
// new writes that have to be added to the end of queue in order to not
// mess up the ordering.
ctx.pipeline().remove(this);
}

private static class ChannelWrite {
Object msg;
ChannelPromise promise;

ChannelWrite(Object msg, ChannelPromise promise) {
this.msg = msg;
this.promise = promise;
}
}
}

/**
* Adapts a {@link ProtocolNegotiationEvent} to the {@link GrpcHttp2ConnectionHandler}.
*/
Expand Down

0 comments on commit f5dc78d

Please sign in to comment.