Skip to content

Commit

Permalink
Load Balancer abstraction.
Browse files Browse the repository at this point in the history
Refactoring the `ConnectionProvider` interface to make it simple.
Additionally provided a load balancer abstraction to provide higher level load balancing semantics later.
  • Loading branch information
NiteshKant committed Jan 8, 2016
1 parent a8713a6 commit 3d25c1e
Show file tree
Hide file tree
Showing 107 changed files with 3,415 additions and 2,318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
*/
public enum HandlerNames {

SslHandler("ssl-handler"),
SslConnectionEmissionHandler("ssl-connection-emitter"),
WireLogging("wire-logging-handler"),
PrimitiveConverter("primitive-converter"),
ClientReadTimeoutHandler("client-read-timeout-handler"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* Lazy subscriptions are allowed on {@link Connection#getInput()} if and only if the channel is configured to
* not read data automatically (i.e. {@link ChannelOption#AUTO_READ} is set to {@code false}). Otherwise,
* if {@link Connection#getInput()} is subscribed lazily, the subscriber always recieves an error. The content
* if {@link Connection#getInput()} is subscribed lazily, the subscriber always receives an error. The content
* in this case is disposed upon reading.
*
* @param <R> Type read from the connection held by this handler.
Expand Down Expand Up @@ -77,7 +77,7 @@ public abstract class AbstractConnectionToChannelBridge<R, W> extends Backpressu

protected ConnectionEventListener eventListener;
protected EventPublisher eventPublisher;
private Subscriber<? super Connection<R, W>> newConnectionSub;
private Subscriber<? super Channel> newChannelSub;
private ReadProducer<R> readProducer;
private boolean raiseErrorOnInputSubscription;
private boolean connectionEmitted;
Expand Down Expand Up @@ -105,13 +105,6 @@ protected AbstractConnectionToChannelBridge(String thisHandlerName,
this.eventPublisherAttributeKey = eventPublisherAttributeKey;
}

protected AbstractConnectionToChannelBridge(String thisHandlerName, Subscriber<? super Connection<R, W>> connSub,
AttributeKey<ConnectionEventListener> eventListenerAttributeKey,
AttributeKey<EventPublisher> eventPublisherAttributeKey) {
this(thisHandlerName, eventListenerAttributeKey, eventPublisherAttributeKey);
newConnectionSub = connSub;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (null == eventListener && null == eventPublisher) {
Expand All @@ -136,6 +129,15 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (!connectionEmitted && isValidToEmit(newChannelSub)) {
emitNewConnection(ctx.channel());
connectionEmitted = true;
}
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

Expand All @@ -151,29 +153,29 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof EmitConnectionEvent) {
if (!connectionEmitted) {
createNewConnection(ctx.channel());
emitNewConnection(ctx.channel());
connectionEmitted = true;
}
} else if (evt instanceof ConnectionCreationFailedEvent) {
if (isValidToEmit(newConnectionSub)) {
newConnectionSub.onError(((ConnectionCreationFailedEvent)evt).getThrowable());
if (isValidToEmit(newChannelSub)) {
newChannelSub.onError(((ConnectionCreationFailedEvent)evt).getThrowable());
}
} else if (evt instanceof ConnectionSubscriberEvent) {
} else if (evt instanceof ChannelSubscriberEvent) {
@SuppressWarnings("unchecked")
final ConnectionSubscriberEvent<R, W> connectionSubscriberEvent = (ConnectionSubscriberEvent<R, W>) evt;
final ChannelSubscriberEvent<R, W> channelSubscriberEvent = (ChannelSubscriberEvent<R, W>) evt;

newConnectionSubscriber(connectionSubscriberEvent);
newConnectionSubscriber(channelSubscriberEvent);
} else if (evt instanceof ConnectionInputSubscriberEvent) {
@SuppressWarnings("unchecked")
ConnectionInputSubscriberEvent<R, W> event = (ConnectionInputSubscriberEvent<R, W>) evt;

newConnectionInputSubscriber(ctx.channel(), event.getSubscriber(), event.getConnection(), false);
newConnectionInputSubscriber(ctx.channel(), event.getSubscriber(), false);
} else if (evt instanceof ConnectionInputSubscriberResetEvent) {
resetConnectionInputSubscriber();
} else if (evt instanceof ConnectionInputSubscriberReplaceEvent) {
@SuppressWarnings("unchecked")
ConnectionInputSubscriberReplaceEvent<R, W> event = (ConnectionInputSubscriberReplaceEvent<R, W>) evt;
replaceConnectionInputSubscriber(event);
replaceConnectionInputSubscriber(ctx.channel(), event);
}

super.userEventTriggered(ctx, evt);
Expand Down Expand Up @@ -205,8 +207,8 @@ public boolean shouldReadMore(ChannelHandlerContext ctx) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!connectionEmitted && isValidToEmit(newConnectionSub)) {
newConnectionSub.onError(cause);
if (!connectionEmitted && isValidToEmit(newChannelSub)) {
newChannelSub.onError(cause);
} else if (isValidToEmitToReadSubscriber(readProducer)) {
readProducer.sendOnError(cause);
} else {
Expand All @@ -228,34 +230,38 @@ protected boolean connectionInputSubscriberExists(Channel channel) {
return null != readProducer && null != readProducer.subscriber && !readProducer.subscriber.isUnsubscribed();
}

protected void onNewReadSubscriber(Connection<R, W> connection, Subscriber<? super R> subscriber) {
protected void onNewReadSubscriber(Subscriber<? super R> subscriber) {
// NOOP
}

protected final void checkEagerSubscriptionIfConfigured(Connection<R, W> connection, Channel channel) {
protected final void checkEagerSubscriptionIfConfigured(Channel channel) {
if (channel.config().isAutoRead() && null == readProducer) {
// If the channel is set to auto-read and there is no eager subscription then, we should raise errors
// when a subscriber arrives.
raiseErrorOnInputSubscription = true;
final Subscriber<? super R> discardAll = ConnectionInputSubscriberEvent.discardAllInput(connection)
final Subscriber<? super R> discardAll = ConnectionInputSubscriberEvent.discardAllInput()
.getSubscriber();
final ReadProducer<R> producer = new ReadProducer<>(discardAll, channel);
discardAll.setProducer(producer);
readProducer = producer;
}
}

protected final Subscriber<? super Connection<R, W>> getNewConnectionSub() {
return newConnectionSub;
protected final Subscriber<? super Channel> getNewChannelSub() {
return newChannelSub;
}

private void createNewConnection(Channel channel) {
if (isValidToEmit(newConnectionSub)) {
Connection<R, W> connection = ConnectionImpl.create(channel, eventListener, eventPublisher);
newConnectionSub.onNext(connection);
connectionEmitted = true;
checkEagerSubscriptionIfConfigured(connection, channel);
newConnectionSub.onCompleted();
private void emitNewConnection(Channel channel) {
if (isValidToEmit(newChannelSub)) {
try {
newChannelSub.onNext(channel);
connectionEmitted = true;
checkEagerSubscriptionIfConfigured(channel);
newChannelSub.onCompleted();
} catch (Exception e) {
logger.error("Error emitting a new connection. Closing this channel.", e);
channel.close();
}
} else {
channel.close(); // Closing the connection if not sent to a subscriber.
}
Expand All @@ -271,39 +277,39 @@ private void resetConnectionInputSubscriber() {
}

private void newConnectionInputSubscriber(final Channel channel, final Subscriber<? super R> subscriber,
final Connection<R, W> connection, boolean replace) {
boolean replace) {
final Subscriber<? super R> connInputSub = null == readProducer ? null : readProducer.subscriber;
if (isValidToEmit(connInputSub)) {
if (!replace) {
/*Allow only once concurrent input subscriber but allow concatenated subscribers*/
subscriber.onError(ONLY_ONE_CONN_INPUT_SUB_ALLOWED);
} else {
setNewReadProducer(channel, subscriber, connection);
setNewReadProducer(channel, subscriber);
connInputSub.onCompleted();
}
} else if (raiseErrorOnInputSubscription) {
subscriber.onError(LAZY_CONN_INPUT_SUB);
} else {
setNewReadProducer(channel, subscriber, connection);
setNewReadProducer(channel, subscriber);
}
}

private void setNewReadProducer(Channel channel, Subscriber<? super R> subscriber, Connection<R, W> connection) {
private void setNewReadProducer(Channel channel, Subscriber<? super R> subscriber) {
final ReadProducer<R> producer = new ReadProducer<>(subscriber, channel);
subscriber.setProducer(producer);
onNewReadSubscriber(connection, subscriber);
onNewReadSubscriber(subscriber);
readProducer = producer;
}

private void replaceConnectionInputSubscriber(ConnectionInputSubscriberReplaceEvent<R, W> event) {
private void replaceConnectionInputSubscriber(Channel channel, ConnectionInputSubscriberReplaceEvent<R, W> event) {
ConnectionInputSubscriberEvent<R, W> newSubEvent = event.getNewSubEvent();
newConnectionInputSubscriber(newSubEvent.getConnection().unsafeNettyChannel(), newSubEvent.getSubscriber(),
newSubEvent.getConnection(), true);
newConnectionInputSubscriber(channel, newSubEvent.getSubscriber(),
true);
}

private void newConnectionSubscriber(ConnectionSubscriberEvent<R, W> event) {
if (null == newConnectionSub) {
newConnectionSub = event.getSubscriber();
private void newConnectionSubscriber(ChannelSubscriberEvent<R, W> event) {
if (null == newChannelSub) {
newChannelSub = event.getSubscriber();
} else {
event.getSubscriber().onError(ONLY_ONE_CONN_SUB_ALLOWED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public boolean acceptOutboundMessage(Object msg) throws Exception {
}

@Override
public abstract Observable<RR> getInput();
public abstract ContentSource<RR> getInput();

@Override
public Observable<Void> write(Observable<WW> msgs) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.reactivex.netty.channel;

import io.netty.channel.Channel;
import rx.Subscriber;

/**
* An event to communicate the subscriber of a new channel created by {@link AbstractConnectionToChannelBridge}.
*
* <h2>Connection reuse</h2>
*
* For cases, where the {@link Connection} is pooled, reuse should be indicated explicitly via
* {@link ConnectionInputSubscriberResetEvent}. There can be multiple {@link ConnectionInputSubscriberResetEvent}s
* sent to the same channel and hence the same instance of {@link AbstractConnectionToChannelBridge}.
*
* @param <R> Type read from the connection held by the event.
* @param <W> Type written to the connection held by the event.
*/
public class ChannelSubscriberEvent<R, W> {

private final Subscriber<? super Channel> subscriber;

public ChannelSubscriberEvent(Subscriber<? super Channel> subscriber) {
this.subscriber = subscriber;
}

public Subscriber<? super Channel> getSubscriber() {
return subscriber;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 Netflix, Inc.
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutorGroup;
import rx.Observable;
Expand All @@ -44,6 +45,8 @@
*/
public abstract class Connection<R, W> implements ChannelOperations<W> {

public static final AttributeKey<Connection> CONNECTION_ATTRIBUTE_KEY = AttributeKey.valueOf("rx-netty-conn-attr");

private final Channel nettyChannel;

protected final MarkAwarePipeline markAwarePipeline;
Expand Down Expand Up @@ -74,7 +77,7 @@ public ContentSource<R> getInput() {
return new ContentSource<>(nettyChannel, new Func1<Subscriber<? super R>, Object>() {
@Override
public Object call(Subscriber<? super R> subscriber) {
return new ConnectionInputSubscriberEvent<>(subscriber, Connection.this);
return new ConnectionInputSubscriberEvent<>(subscriber);
}
});
}
Expand Down Expand Up @@ -278,11 +281,13 @@ public Channel unsafeNettyChannel() {
* constructed completely. IOW, "this" escapes from the constructor if the listener is added in the constructor.
*/
protected void connectCloseToChannelClose() {
nettyChannel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
close(false); // Close this connection when the channel is closed.
}
});
nettyChannel.closeFuture()
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
close(false); // Close this connection when the channel is closed.
}
});
nettyChannel.attr(CONNECTION_ATTRIBUTE_KEY).set(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

/**
* An event to indicate to {@link AbstractConnectionToChannelBridge} that the subscriber as published by
* {@link ConnectionSubscriberEvent} should be informed of a connection creation failure, instead of a new connection.
* {@link ChannelSubscriberEvent} should be informed of a connection creation failure, instead of a new connection.
*
* <h2>Why do we need this?</h2>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.FileRegion;
import io.netty.util.concurrent.EventExecutorGroup;
import io.reactivex.netty.channel.events.ConnectionEventListener;
import io.reactivex.netty.events.EventAttributeKeys;
import io.reactivex.netty.events.EventPublisher;
import rx.Observable;
import rx.functions.Action1;
Expand Down Expand Up @@ -132,9 +133,21 @@ public Observable<Void> closeListener() {
return delegate.closeListener();
}

public static <R, W> ConnectionImpl<R, W> create(Channel nettyChannel, ConnectionEventListener eventListener,
EventPublisher eventPublisher) {
final ConnectionImpl<R, W> toReturn = new ConnectionImpl<>(nettyChannel, eventListener, eventPublisher);
public static <R, W> ConnectionImpl<R, W> fromChannel(Channel nettyChannel) {
EventPublisher ep = nettyChannel.attr(EventAttributeKeys.EVENT_PUBLISHER).get();
if (null == ep) {
throw new IllegalArgumentException("No event publisher set in the channel.");
}

ConnectionEventListener l = null;
if (ep.publishingEnabled()) {
l = nettyChannel.attr(EventAttributeKeys.CONNECTION_EVENT_LISTENER).get();
if (null == l) {
throw new IllegalArgumentException("No event listener set in the channel.");
}
}

final ConnectionImpl<R, W> toReturn = new ConnectionImpl<>(nettyChannel, l, ep);
toReturn.connectCloseToChannelClose();
return toReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,19 @@
public final class ConnectionInputSubscriberEvent<R, W> {

private final Subscriber<? super R> subscriber;
private final Connection<R, W> connection;

public ConnectionInputSubscriberEvent(Subscriber<? super R> subscriber, final Connection<R, W> connection) {
public ConnectionInputSubscriberEvent(Subscriber<? super R> subscriber) {
if (null == subscriber) {
throw new NullPointerException("Subscriber can not be null");
}
if (null == connection) {
throw new NullPointerException("Connection can not be null");
}
this.subscriber = subscriber;
this.connection = connection;
}

public Subscriber<? super R> getSubscriber() {
return subscriber;
}

public Connection<R, W> getConnection() {
return connection;
}

public static <II, OO> ConnectionInputSubscriberEvent<II, OO> discardAllInput(Connection<II, OO> connection) {
public static <II, OO> ConnectionInputSubscriberEvent<II, OO> discardAllInput() {
return new ConnectionInputSubscriberEvent<>(Subscribers.create(new Action1<II>() {
@Override
public void call(II msg) {
Expand All @@ -68,6 +59,6 @@ public void call(II msg) {
public void call(Throwable throwable) {
// Empty as we are discarding input anyways.
}
}), connection);
}));
}
}
Loading

0 comments on commit 3d25c1e

Please sign in to comment.