Skip to content

Commit

Permalink
Fixed client connection level events (#527)
Browse files Browse the repository at this point in the history
#### Problem

Since the pipelines for client are setup after the connection is established, callbacks like `connect` are not send to the handlers. This was the reason for missing the connection related events for clients.
`PooledConnection` acquire events were not published because the events were getting published before the event listeners were setup.

#### Modification

This change moves the work done for client on `ChannelHandler.connet()` to `ChannelActivityBufferingHandler` which is added on channel creation.

Now, publishing events for pool acquire are published using the event publisher configured on the channel.
  • Loading branch information
NiteshKant committed Jul 5, 2016
1 parent 7918e93 commit 1249344
Show file tree
Hide file tree
Showing 26 changed files with 364 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@
package io.reactivex.netty.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.reactivex.netty.channel.AbstractConnectionToChannelBridge;
import io.reactivex.netty.channel.ChannelSubscriberEvent;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.channel.ConnectionCreationFailedEvent;
import io.reactivex.netty.channel.ConnectionInputSubscriberResetEvent;
import io.reactivex.netty.channel.EmitConnectionEvent;
import io.reactivex.netty.client.events.ClientEventListener;
import io.reactivex.netty.client.pool.PooledConnection;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.events.EventAttributeKeys;
import io.reactivex.netty.events.EventPublisher;
import io.reactivex.netty.internal.ExecuteInEventloopAction;
Expand All @@ -41,10 +36,6 @@
import rx.observers.SafeSubscriber;
import rx.subscriptions.Subscriptions;

import java.net.SocketAddress;

import static java.util.concurrent.TimeUnit.*;

/**
* An implementation of {@link AbstractConnectionToChannelBridge} for clients.
*
Expand Down Expand Up @@ -72,7 +63,6 @@ public class ClientConnectionToChannelBridge<R, W> extends AbstractConnectionToC
private EventPublisher eventPublisher;
private ClientEventListener eventListener;
private final boolean isSecure;
private long connectStartTimeNanos;
private Channel channel;

private ClientConnectionToChannelBridge(boolean isSecure) {
Expand Down Expand Up @@ -119,38 +109,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
ConnectionReuseEvent<R, W> event = (ConnectionReuseEvent<R, W>) evt;

newConnectionReuseEvent(ctx.channel(), event);
} else if (evt instanceof ConnectionCreationFailedEvent) {
ConnectionCreationFailedEvent failedEvent = (ConnectionCreationFailedEvent) evt;
onConnectFailedEvent(failedEvent);
}
}

@SuppressWarnings("unchecked")
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {

connectStartTimeNanos = Clock.newStartTimeNanos();

if (eventPublisher.publishingEnabled()) {
eventListener.onConnectStart();
promise.addListener(new ChannelFutureListener() {
@SuppressWarnings("unchecked")
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (eventPublisher.publishingEnabled()) {
long endTimeNanos = Clock.onEndNanos(connectStartTimeNanos);
if (!future.isSuccess()) {
eventListener.onConnectFailed(endTimeNanos, NANOSECONDS, future.cause());
} else {
eventListener.onConnectSuccess(endTimeNanos, NANOSECONDS);
}
}
}
});
}

super.connect(ctx, remoteAddress, localAddress, promise);
}

@Override
Expand All @@ -164,7 +123,7 @@ protected void onNewReadSubscriber(Subscriber<? super R> subscriber) {
@Override
public void run() {
if (!connectionInputSubscriberExists(channel)) {
Connection connection = channel.attr(Connection.CONNECTION_ATTRIBUTE_KEY).get();
Connection<?, ?> connection = channel.attr(Connection.CONNECTION_ATTRIBUTE_KEY).get();
if (null != connection) {
connection.closeNow();
}
Expand All @@ -183,13 +142,6 @@ private void newConnectionReuseEvent(Channel channel, final ConnectionReuseEvent
}
}

@SuppressWarnings("unchecked")
private void onConnectFailedEvent(ConnectionCreationFailedEvent event) {
if (eventPublisher.publishingEnabled()) {
eventListener.onConnectFailed(connectStartTimeNanos, NANOSECONDS, event.getThrowable());
}
}

public static <R, W> ClientConnectionToChannelBridge<R, W> addToPipeline(ChannelPipeline pipeline,
boolean isSecure) {
ClientConnectionToChannelBridge<R, W> toAdd = new ClientConnectionToChannelBridge<>(isSecure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand All @@ -36,9 +38,11 @@
import io.netty.util.concurrent.EventExecutorGroup;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ChannelSubscriberEvent;
import io.reactivex.netty.channel.ConnectionCreationFailedEvent;
import io.reactivex.netty.channel.DetachedChannelPipeline;
import io.reactivex.netty.channel.WriteTransformer;
import io.reactivex.netty.client.events.ClientEventListener;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.events.EventPublisher;
import io.reactivex.netty.events.EventSource;
import io.reactivex.netty.ssl.DefaultSslCodec;
Expand All @@ -51,11 +55,13 @@
import rx.functions.Func1;

import javax.net.ssl.SSLEngine;
import java.net.SocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import static io.reactivex.netty.HandlerNames.*;
import static java.util.concurrent.TimeUnit.NANOSECONDS;


/**
Expand Down Expand Up @@ -249,7 +255,7 @@ public ChannelHandler call() {
});
}

public Bootstrap newBootstrap() {
public Bootstrap newBootstrap(final EventPublisher eventPublisher, final ClientEventListener eventListener) {
final Bootstrap nettyBootstrap = new Bootstrap().group(eventLoopGroup)
.channel(channelClass)
.option(ChannelOption.AUTO_READ, false);// by default do not read content unless asked.
Expand All @@ -265,7 +271,7 @@ public Bootstrap newBootstrap() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(ClientChannelActiveBufferingHandler.getName(),
new ChannelActivityBufferingHandler());
new ChannelActivityBufferingHandler(eventPublisher, eventListener));
}
});
return nettyBootstrap;
Expand Down Expand Up @@ -361,7 +367,7 @@ public static Class<? extends Channel> defaultSocketChannelClass() {
* before the pipeline is configured.
* This handler buffers, the channel events till the time, a subscriber appears for channel establishment.
*/
private static class ChannelActivityBufferingHandler extends ChannelInboundHandlerAdapter {
private static class ChannelActivityBufferingHandler extends ChannelDuplexHandler {

private enum State {
Initialized,
Expand All @@ -377,6 +383,42 @@ private enum State {
* Unregistered state will hide the active/inactive state, hence this is a different flag.
*/
private boolean unregistered;
private long connectStartTimeNanos;
private final EventPublisher eventPublisher;
private final ClientEventListener eventListener;

private ChannelActivityBufferingHandler(EventPublisher eventPublisher, ClientEventListener eventListener) {
this.eventPublisher = eventPublisher;
this.eventListener = eventListener;
}

@SuppressWarnings("unchecked")
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {

connectStartTimeNanos = Clock.newStartTimeNanos();

if (eventPublisher.publishingEnabled()) {
eventListener.onConnectStart();
promise.addListener(new ChannelFutureListener() {
@SuppressWarnings("unchecked")
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (eventPublisher.publishingEnabled()) {
long endTimeNanos = Clock.onEndNanos(connectStartTimeNanos);
if (!future.isSuccess()) {
eventListener.onConnectFailed(endTimeNanos, NANOSECONDS, future.cause());
} else {
eventListener.onConnectSuccess(endTimeNanos, NANOSECONDS);
}
}
}
});
}

super.connect(ctx, remoteAddress, localAddress, promise);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Expand Down Expand Up @@ -444,9 +486,20 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (unregistered) {
pipeline.fireChannelUnregistered();
}
} else if (evt instanceof ConnectionCreationFailedEvent) {
ConnectionCreationFailedEvent failedEvent = (ConnectionCreationFailedEvent) evt;
onConnectFailedEvent(failedEvent);
super.userEventTriggered(ctx, evt);
} else {
super.userEventTriggered(ctx, evt);
}
}

@SuppressWarnings("unchecked")
private void onConnectFailedEvent(ConnectionCreationFailedEvent event) {
if (eventPublisher.publishingEnabled()) {
eventListener.onConnectFailed(connectStartTimeNanos, NANOSECONDS, event.getThrowable());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public HostConnector(Host host, ConnectionProvider<W, R> connectionProvider,

public HostConnector(HostConnector<W, R> source, ConnectionProvider<W, R> connectionProvider) {
this.connectionProvider = connectionProvider;
this.host = source.host;
this.eventSource = source.eventSource;
this.clientPublisher = source.clientPublisher;
this.publisher = source.publisher;
host = source.host;
eventSource = source.eventSource;
clientPublisher = source.clientPublisher;
publisher = source.publisher;
}

public Host getHost() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ public void call() {

private PooledConnection(PooledConnection<?, ?> toCopy, Connection<R, W> unpooledDelegate) {
super(unpooledDelegate);
this.owner = toCopy.owner;
owner = toCopy.owner;
this.unpooledDelegate = unpooledDelegate;
this.lastReturnToPoolTimeMillis = toCopy.lastReturnToPoolTimeMillis;
this.releasedAtLeastOnce = toCopy.releasedAtLeastOnce;
this.maxIdleTimeMillis = toCopy.maxIdleTimeMillis;
this.releaseObservable = toCopy.releaseObservable;
lastReturnToPoolTimeMillis = toCopy.lastReturnToPoolTimeMillis;
releasedAtLeastOnce = toCopy.releasedAtLeastOnce;
maxIdleTimeMillis = toCopy.maxIdleTimeMillis;
releaseObservable = toCopy.releaseObservable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public void call() {
idleConnCleanupSubscription.unsubscribe();
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends Void>>() {
.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
@Override
public Observable<? extends Void> call(Throwable throwable) {
public Observable<Void> call(Throwable throwable) {
logger.error("Error listening to Host close notifications. Shutting down the pool.",
throwable);
return Observable.empty();
Expand Down Expand Up @@ -281,34 +281,39 @@ private class ConnectMetricsOperator implements Operator<Connection<R, W>, Poole

@Override
public Subscriber<? super PooledConnection<R, W>> call(final Subscriber<? super Connection<R, W>> o) {
final long startTimeNanos = isEventPublishingEnabled() ? Clock.newStartTimeNanos() : -1;

if (isEventPublishingEnabled()) {
hostConnector.getClientPublisher().onPoolAcquireStart();
}
final long startTimeNanos = Clock.newStartTimeNanos();

return new Subscriber<PooledConnection<R, W>>(o) {

private volatile boolean publishingEnabled;
private volatile ClientEventListener eventListener;

@Override
public void onCompleted() {
if (isEventPublishingEnabled()) {
hostConnector.getClientPublisher()
.onPoolAcquireSuccess(Clock.onEndNanos(startTimeNanos), NANOSECONDS);
if (publishingEnabled) {
eventListener.onPoolAcquireStart();
eventListener.onPoolAcquireSuccess(Clock.onEndNanos(startTimeNanos), NANOSECONDS);
}
o.onCompleted();
}

@Override
public void onError(Throwable e) {
if (isEventPublishingEnabled()) {
if (publishingEnabled) {
/*Error means no connection was received, as it always every gets at most one connection*/
hostConnector.getClientPublisher()
.onPoolAcquireFailed(Clock.onEndNanos(startTimeNanos), NANOSECONDS, e);
eventListener.onPoolAcquireStart();
eventListener.onPoolAcquireFailed(Clock.onEndNanos(startTimeNanos), NANOSECONDS, e);
}
o.onError(e);
}

@Override
public void onNext(PooledConnection<R, W> c) {
EventPublisher eventPublisher = c.unsafeNettyChannel().attr(EVENT_PUBLISHER).get();
if (eventPublisher.publishingEnabled()) {
publishingEnabled = true;
eventListener = c.unsafeNettyChannel().attr(CLIENT_EVENT_LISTENER).get();
}
o.onNext(c);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import io.netty.channel.embedded.EmbeddedChannel;
import io.reactivex.netty.channel.BackpressureManagingHandler.RequestReadIfRequiredEvent;
import io.reactivex.netty.channel.events.ConnectionEventListener;
import io.reactivex.netty.test.util.MockEventPublisher;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import rx.observers.TestSubscriber;

import static io.reactivex.netty.test.util.DisabledEventPublisher.*;
import static io.reactivex.netty.test.util.MockEventPublisher.*;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;

Expand Down Expand Up @@ -200,7 +201,7 @@ public void evaluate() throws Throwable {
ctx = channel.pipeline().firstContext();
handler = new AbstractConnectionToChannelBridge<String, String>("foo",
new ConnectionEventListener() { },
DISABLED_EVENT_PUBLISHER) { };
disabled()) { };
base.evaluate();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
import io.reactivex.netty.test.util.FlushSelector;
import io.reactivex.netty.test.util.MockEventPublisher;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
Expand All @@ -39,7 +40,7 @@
import java.util.ArrayList;
import java.util.List;

import static io.reactivex.netty.test.util.DisabledEventPublisher.*;
import static io.reactivex.netty.test.util.MockEventPublisher.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;

Expand Down Expand Up @@ -275,7 +276,7 @@ public void evaluate() throws Throwable {
writeObservableSubscribers = new ArrayList<>();
/*Since, the appropriate handler is not added to the pipeline that handles O<> writes.*/
channel = new EmbeddedChannel(new HandleObservableWrite(writeObservableSubscribers));
channelOperations = new DefaultChannelOperations<>(channel, null, DISABLED_EVENT_PUBLISHER);
channelOperations = new DefaultChannelOperations<>(channel, null, disabled());
base.evaluate();
}
};
Expand Down
Loading

0 comments on commit 1249344

Please sign in to comment.