Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

bugfix: supports multiple instances of the client (netty loop setup f…

…actored out)
  • Loading branch information...
commit 516f5078a8742fecf2c7f5269ce497be3867ce2f 1 parent 766c2c3
@anomalizer anomalizer authored
View
2  pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>inmobi.messaging.client</groupId>
<artifactId>scribe</artifactId>
- <version>0.0.4</version>
+ <version>0.0.5</version>
<repositories>
<repository>
<snapshots>
View
54 src/main/java/com/inmobi/messaging/netty/NettyEventCore.java
@@ -0,0 +1,54 @@
+package com.inmobi.messaging.netty;
+
+import org.jboss.netty.channel.socket.*;
+import org.jboss.netty.channel.socket.nio.*;
+
+import java.util.concurrent.*;
+
+class NettyEventCore {
+ private static NettyEventCore ourInstance = new NettyEventCore();
+
+ private ClientSocketChannelFactory factory = null;
+ private int leases = 0;
+
+ public static NettyEventCore getInstance() {
+ return ourInstance;
+ }
+
+ private NettyEventCore() {
+ }
+
+ /**
+ * Get a handle to the netty event loop
+ *
+ * The NIO handlers are setup lazily
+ * @return an NIO handler
+ */
+ public synchronized ClientSocketChannelFactory getFactory() {
+ if(factory == null) {
+ factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ }
+ leases++;
+ return factory;
+ }
+
+ /**
+ * Application indicating that it no longer needs this
+ *
+ * In real world, assume that this will not be released,
+ * we are being nice and try and release it.
+ */
+ public synchronized void releaseFactory() {
+ if(factory != null) {
+ leases--;
+ if(leases == 0) {
+ factory.releaseExternalResources();
+ factory = null;
+ }
+ } else {
+ //WTF! releasing what you did not take
+ }
+ }
+}
View
208 src/main/java/com/inmobi/messaging/netty/ScribeNettyImpl.java
@@ -23,117 +23,109 @@
import com.inmobi.messaging.MessagePublisherMXBean;
public class ScribeNettyImpl implements MessagePublisher, MessagePublisherMXBean {
-
- // In a bigger netty world, someone the factory
- private static NioClientSocketChannelFactory factory;
-
- private static final Timer timer = new HashedWheelTimer();
-
- private final ClientBootstrap bootstrap;
- private volatile Channel ch = null;
-
- private final TimingAccumulator stats = new TimingAccumulator();
-
-
- private String host;
- private int port;
- private int backoff;
-
- private ChannelBuffer categoryAsByteStream;
-
- /**
- * This is meant to be a way for async callbacks to set the channel
- * on a successful connection
- *
- * Java does not have pointers to pointers. So have to resort to sending
- * in a wrapper object that knows to update our pointer
- */
- class ChannelSetter {
- public void setChannel(Channel ch) {
- Channel oldChannel = ScribeNettyImpl.this.ch;
- if(oldChannel != null && oldChannel.isOpen())
- oldChannel.close();
- ScribeNettyImpl.this.ch = ch;
- }
- public void connect() {bootstrap.connect(new InetSocketAddress(host, port));}
- }
-
- public ScribeNettyImpl(String hostname, int port, int timeoutSeconds,
- int backoffSeconds) {
- ExecutorService bossPool = Executors.newCachedThreadPool();
- ExecutorService workerPool = Executors.newCachedThreadPool();
- factory = new NioClientSocketChannelFactory(bossPool, workerPool);
-
- bootstrap = new ClientBootstrap(factory);
-
+ private static final Timer timer = new HashedWheelTimer();
+
+ private final ClientBootstrap bootstrap;
+ private volatile Channel ch = null;
+
+ private final TimingAccumulator stats = new TimingAccumulator();
+
+
+ private String host;
+ private int port;
+ private int backoff;
+
+ private ChannelBuffer categoryAsByteStream;
+
+ /**
+ * This is meant to be a way for async callbacks to set the channel
+ * on a successful connection
+ *
+ * Java does not have pointers to pointers. So have to resort to sending
+ * in a wrapper object that knows to update our pointer
+ */
+ class ChannelSetter {
+ public void setChannel(Channel ch) {
+ Channel oldChannel = ScribeNettyImpl.this.ch;
+ if(oldChannel != null && oldChannel.isOpen())
+ oldChannel.close();
+ ScribeNettyImpl.this.ch = ch;
+ }
+ public void connect() {bootstrap.connect(new InetSocketAddress(host, port));}
+ }
+
+ public ScribeNettyImpl(String hostname, int port, int timeoutSeconds,
+ int backoffSeconds) {
+ bootstrap = new ClientBootstrap(NettyEventCore.getInstance().getFactory());
+
ScribeHandler handler = new ScribeHandler(stats, new ChannelSetter(), backoffSeconds, timer);
ChannelPipelineFactory cfactory = new ScribePipelineFactory(handler, timeoutSeconds, timer);
-
+
bootstrap.setPipelineFactory(cfactory);
this.host = hostname;
- this.port = port;
- this.backoff = backoffSeconds;
-
- bootstrap.connect(new InetSocketAddress(host, port));
- }
-
- @Override
- public void publish(Message m) {
- stats.accumulateInvocation();
- if(ch != null ) {
- ScribeBites.publish(ch, m);
- } else {
- suggestReconnect();
- }
- }
-
- public void publish(byte stream[]) {
- stats.accumulateInvocation();
- if(ch != null ) {
- ScribeBites.publish(ch, categoryAsByteStream, stream);
- } else {
- suggestReconnect();
- }
- }
-
- public void publish(TBase thriftObject) {
- stats.accumulateInvocation();
- try {
- ScribeBites.publish(ch, categoryAsByteStream, thriftObject);
- } catch (TException e) {
- stats.accumulateOutcomeWithDelta(Outcome.UNHANDLED_FAILURE, 0);
- }
- }
-
- private void suggestReconnect() {
- stats.accumulateOutcomeWithDelta(Outcome.UNHANDLED_FAILURE, 0);
- //TODO: logic for triggering reconnect
- }
-
- @Override
- public MessagePublisherMXBean getInspector() {
- return this;
- }
-
- @Override
- public TimingAccumulator getStats() {
- return stats;
- }
-
- public void close() {
- if(ch != null) {
- ch.close().awaitUninterruptibly();
- }
- bootstrap.releaseExternalResources();
- }
-
- public synchronized boolean setFixedCategory(String c) {
- if(categoryAsByteStream != null)
- return false;
-
- categoryAsByteStream = ScribeBites.generateHeaderWithCategory(c);
-
- return true;
- }
+ this.port = port;
+ this.backoff = backoffSeconds;
+
+ bootstrap.connect(new InetSocketAddress(host, port));
+ }
+
+ @Override
+ public void publish(Message m) {
+ stats.accumulateInvocation();
+ if(ch != null ) {
+ ScribeBites.publish(ch, m);
+ } else {
+ suggestReconnect();
+ }
+ }
+
+ public void publish(byte stream[]) {
+ stats.accumulateInvocation();
+ if(ch != null ) {
+ ScribeBites.publish(ch, categoryAsByteStream, stream);
+ } else {
+ suggestReconnect();
+ }
+ }
+
+ public void publish(TBase thriftObject) {
+ stats.accumulateInvocation();
+ try {
+ ScribeBites.publish(ch, categoryAsByteStream, thriftObject);
+ } catch (TException e) {
+ stats.accumulateOutcomeWithDelta(Outcome.UNHANDLED_FAILURE, 0);
+ }
+ }
+
+ private void suggestReconnect() {
+ stats.accumulateOutcomeWithDelta(Outcome.UNHANDLED_FAILURE, 0);
+ //TODO: logic for triggering reconnect
+ }
+
+ @Override
+ public MessagePublisherMXBean getInspector() {
+ return this;
+ }
+
+ @Override
+ public TimingAccumulator getStats() {
+ return stats;
+ }
+
+ public void close() {
+ if(ch != null) {
+ ch.close().awaitUninterruptibly();
+ }
+ NettyEventCore.getInstance().releaseFactory();
+ }
+
+ public synchronized boolean setFixedCategory(String c) {
+ if(categoryAsByteStream != null)
+ return false;
+
+ categoryAsByteStream = ScribeBites.generateHeaderWithCategory(c);
+
+ return true;
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.