Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Nifty : Thrift server on Netty

Summary:
- moved repo to github
- added NettyConfigBuilder hook for configuring socket options
- added exception handling in NiftyDispatcher

Test Plan: mvn clean test

Reviewers: martint, dain, srash, andrewcox, davejwatson

Reviewed By: dain

Differential Revision: https://phabricator.fb.com/D472425

Revert Plan: N/A

Task ID: 888209
  • Loading branch information...
commit eaa1210c92f55b8dcb54b3b0d61e624f8585629d 1 parent aa70e44
Jax Law authored
Showing with 811 additions and 76 deletions.
  1. +4 −0 .arcconfig
  2. +52 −0 nifty-client/pom.xml
  3. +28 −0 nifty-client/src/main/java/com/facebook/nifty/client/NettyClientConfigBuilder.java
  4. +152 −0 nifty-client/src/main/java/com/facebook/nifty/client/NiftyClient.java
  5. +117 −0 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyAsyncClientTransport.java
  6. +18 −0 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyClientAdapter.java
  7. +14 −0 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyClientListener.java
  8. +61 −0 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyClientTransport.java
  9. +53 −0 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyReadOnlyTransport.java
  10. +26 −0 nifty-client/src/main/java/com/facebook/nifty/client/Test.java
  11. +4 −2 nifty-core-asf/src/main/java/com/facebook/nifty/core/NettyServerTransport.java
  12. +2 −3 nifty-core-asf/src/main/java/com/facebook/nifty/core/NiftyBootstrap.java
  13. +10 −6 nifty-core-asf/src/main/java/com/facebook/nifty/core/NiftyDispatcher.java
  14. +4 −2 nifty-core-fbcode/src/main/java/com/facebook/nifty/core/NettyServerTransport.java
  15. +2 −3 nifty-core-fbcode/src/main/java/com/facebook/nifty/core/NiftyBootstrap.java
  16. +10 −5 nifty-core-fbcode/src/main/java/com/facebook/nifty/core/NiftyDispatcher.java
  17. +0 −1  nifty-core/src/main/java/com/facebook/nifty/core/ChannelStatistics.java
  18. +39 −0 nifty-core/src/main/java/com/facebook/nifty/core/NettyConfigBuilder.java
  19. +67 −0 nifty-core/src/main/java/com/facebook/nifty/core/NettyConfigBuilderBase.java
  20. +0 −1  nifty-core/src/main/java/com/facebook/nifty/core/NettyThriftDecoder.java
  21. +6 −13 nifty-core/src/main/java/com/facebook/nifty/core/TNiftyTransport.java
  22. +0 −1  nifty-core/src/main/java/com/facebook/nifty/core/ThriftServerDef.java
  23. +0 −1  nifty-core/src/main/java/com/facebook/nifty/core/ThriftServerDefBuilder.java
  24. +28 −5 nifty-core/src/main/java/com/facebook/nifty/guice/NiftyModule.java
  25. +0 −5 nifty-core/src/test/java/com/facebook/nifty/core/TestCodec.java
  26. +51 −0 nifty-core/src/test/java/com/facebook/nifty/core/TestNettyConfigBuilder.java
  27. +0 −5 nifty-core/src/test/java/com/facebook/nifty/core/TestServerDefBuilder.java
  28. +20 −0 nifty-examples/nifty-asf-server/pom.xml
  29. +15 −1 nifty-examples/nifty-asf-server/src/main/java/com/facebook/nifty/server/Plain.java
  30. +17 −6 nifty-examples/nifty-asf-server/src/test/java/com/facebook/nifty/server/TestPlainServer.java
  31. +1 −1  nifty-examples/nifty-fbcode-server/src/main/java/com/facebook/nifty/server/Main.java
  32. +3 −6 nifty-examples/nifty-fbcode-server/src/test/java/com/facebook/nifty/server/TestNettyServerTransport.java
  33. +3 −6 ...xamples/nifty-fbcode-server/src/test/java/com/facebook/nifty/server/TestNettyServerWithHeaderTransport.java
  34. +4 −3 pom.xml
View
4 .arcconfig
@@ -0,0 +1,4 @@
+{
+ "project_id" : "nifty",
+ "conduit_uri" : "https://phabricator.fb.com/api/"
+}
View
52 nifty-client/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>com.facebook.nifty</groupId>
+ <artifactId>nifty-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>com.facebook.nifty</groupId>
+ <artifactId>nifty-client</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.facebook.nifty</groupId>
+ <artifactId>nifty-core</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+
+ </dependencies>
+</project>
View
28 nifty-client/src/main/java/com/facebook/nifty/client/NettyClientConfigBuilder.java
@@ -0,0 +1,28 @@
+package com.facebook.nifty.client;
+
+import com.facebook.nifty.core.NettyConfigBuilderBase;
+import com.google.inject.Inject;
+import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
+
+import java.lang.reflect.Proxy;
+
+/*
+* Hooks for configuring various parts of Netty.
+*/
+public class NettyClientConfigBuilder extends NettyConfigBuilderBase {
+
+ private final NioSocketChannelConfig socketChannelConfig = (NioSocketChannelConfig)
+ Proxy.newProxyInstance(
+ getClass().getClassLoader(),
+ new Class<?>[]{NioSocketChannelConfig.class},
+ new Magic("")
+ );
+
+ @Inject
+ public NettyClientConfigBuilder() {
+ }
+
+ public NioSocketChannelConfig getSocketChannelConfig() {
+ return socketChannelConfig;
+ }
+}
View
152 nifty-client/src/main/java/com/facebook/nifty/client/NiftyClient.java
@@ -0,0 +1,152 @@
+package com.facebook.nifty.client;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.thrift.transport.TTransportException;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class NiftyClient {
+
+ private static final int DEFAULT_MAX_FRAME_SIZE = 1048576;
+ private final NettyClientConfigBuilder configBuilder;
+ private final ExecutorService boss;
+ private final ExecutorService worker;
+ private final int maxFrameSize;
+ private NioClientSocketChannelFactory channelFactory;
+
+ public NiftyClient() {
+ this(DEFAULT_MAX_FRAME_SIZE);
+ }
+
+ public NiftyClient(int maxFrameSize) {
+ this(new NettyClientConfigBuilder(),
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool(),
+ maxFrameSize
+ );
+ }
+
+ public NiftyClient(
+ NettyClientConfigBuilder configBuilder,
+ ExecutorService boss,
+ ExecutorService worker,
+ int maxFrameSize
+ ) {
+ this.configBuilder = configBuilder;
+ this.boss = boss;
+ this.worker = worker;
+ this.maxFrameSize = maxFrameSize;
+ this.channelFactory = new NioClientSocketChannelFactory(boss, worker);
+ }
+
+ public ListenableFuture<TNiftyAsyncClientTransport> connectAsync(InetSocketAddress addr) {
+ ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+ bootstrap.setOptions(configBuilder.getOptions());
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline cp = Channels.pipeline();
+ cp.addLast("frameEncoder", new LengthFieldPrepender(4));
+ cp.addLast(
+ "frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)
+ );
+ return cp;
+ }
+ });
+ return new TNiftyFuture(bootstrap.connect(addr));
+ }
+
+ // trying to mirror the synchronous nature of TSocket as much as possible here.
+ public TNiftyClientTransport connectSync(InetSocketAddress addr)
+ throws TTransportException {
+ ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+ bootstrap.setOptions(configBuilder.getOptions());
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline cp = Channels.pipeline();
+ cp.addLast("frameEncoder", new LengthFieldPrepender(4));
+ cp.addLast(
+ "frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)
+ );
+ return cp;
+ }
+ });
+ ChannelFuture f = bootstrap.connect(addr);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Channel channel[] = new Channel[1];
+ final Throwable throwable[] = new Throwable[1];
+ f.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ channel[0] = future.getChannel();
+ } else {
+ throwable[0] = future.getCause();
+ }
+ latch.countDown();
+ }
+ });
+ try {
+ latch.await(
+ f.getChannel().getConfig().getConnectTimeoutMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (throwable[0] != null) {
+ throw new TTransportException(throwable[0]);
+ }
+ if (channel[0] != null) {
+ TNiftyClientTransport transport = new TNiftyClientTransport(channel[0]);
+ channel[0].getPipeline().addLast("thrift", transport);
+ return transport;
+ }
+ throw new TTransportException("unknown connect error");
+ }
+
+ private static class TNiftyFuture
+ extends AbstractFuture<TNiftyAsyncClientTransport> {
+ private TNiftyFuture(ChannelFuture channelFuture) {
+ channelFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ set(new TNiftyAsyncClientTransport(future.getChannel()));
+ } else if (future.isCancelled()) {
+ cancel(true);
+ } else {
+ setException(future.getCause());
+ }
+ }
+ });
+ }
+ }
+
+ public void shutdown() {
+ boss.shutdownNow();
+ worker.shutdownNow();
+ }
+
+
+
+}
View
117 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyAsyncClientTransport.java
@@ -0,0 +1,117 @@
+package com.facebook.nifty.client;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.MessageEvent;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This already has a built in TFramedTransport. No need to wrap.
+ */
+@NotThreadSafe
+public class TNiftyAsyncClientTransport extends TTransport implements ChannelUpstreamHandler, ChannelDownstreamHandler {
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+ // this is largely a guess. there shouldn't really be more than 2 write buffers at any given time.
+ private static final int MAX_BUFFERS_IN_QUEUE = 3;
+ private final Channel channel;
+ private final Queue<ChannelBuffer> writeBuffers;
+ private volatile TNiftyClientListener listener;
+
+ public TNiftyAsyncClientTransport(Channel channel) {
+ this.channel = channel;
+ this.writeBuffers = new ConcurrentLinkedQueue<ChannelBuffer>();
+ }
+
+ public void setListener(TNiftyClientListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ // no-op
+ }
+
+ @Override
+ public void close() {
+ channel.close();
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) throws TTransportException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws TTransportException {
+ getWriteBuffer().writeBytes(bytes, offset, length);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ // all these is to re-use the write buffer. We can only clear
+ // and re-use a write buffer when the write operation completes,
+ // which is an async operation in netty. the future listener
+ // down here will be invoked by Netty I/O thread.
+ if (!writeBuffers.isEmpty()) {
+ final ChannelBuffer channelBuffer = writeBuffers.remove();
+ channel.write(channelBuffer).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ channelBuffer.clear();
+ if (writeBuffers.size() < MAX_BUFFERS_IN_QUEUE) {
+ writeBuffers.add(channelBuffer);
+ }
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
+ if (e instanceof MessageEvent) {
+ messageReceived(ctx, (MessageEvent) e);
+ }
+ ctx.sendUpstream(e);
+ // for all other stuff we drop it on the floor
+ }
+
+ private void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ if (e.getMessage() instanceof ChannelBuffer) {
+ if (listener != null) {
+ listener.onFrameRead(ctx.getChannel(), (ChannelBuffer) e.getMessage());
+ }
+ }
+ // drop it
+ }
+
+ @Override
+ public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
+ ctx.sendDownstream(e);
+ }
+
+ public ChannelBuffer getWriteBuffer() {
+ if (writeBuffers.isEmpty()) {
+ writeBuffers.add(ChannelBuffers.dynamicBuffer(DEFAULT_BUFFER_SIZE));
+ }
+ return writeBuffers.peek();
+ }
+}
View
18 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyClientAdapter.java
@@ -0,0 +1,18 @@
+package com.facebook.nifty.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+
+public abstract class TNiftyClientAdapter implements TNiftyClientListener {
+ @Override
+ public void onFrameRead(Channel channel, ChannelBuffer buffer) {
+ onInput(new TNiftyReadOnlyTransport(channel, buffer));
+ }
+
+ /**
+ * called when a frame is ready to be read.
+ * @param tNiftyReadOnlyTransport a one-time-use transport for the frame
+ */
+ public abstract void onInput(TNiftyReadOnlyTransport tNiftyReadOnlyTransport);
+
+}
View
14 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyClientListener.java
@@ -0,0 +1,14 @@
+package com.facebook.nifty.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+
+public interface TNiftyClientListener {
+ /**
+ * Called when a full frame as defined in TFramedTransport is available.
+ *
+ * @param channel the channel
+ * @param buffer the payload of the frame, without the leading 4-bytes length header
+ */
+ void onFrameRead(Channel channel, ChannelBuffer buffer) ;
+}
View
61 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyClientTransport.java
@@ -0,0 +1,61 @@
+package com.facebook.nifty.client;
+
+import org.apache.thrift.transport.TTransportException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Netty Equivalent to a TFrameTransport over a TSocket.
+ *
+ * This is just for a proof-of-concept to show that it can be done.
+ *
+ * You should just use a TSocket for sync client.
+ *
+ * This already has a built in TFramedTransport. No need to wrap.
+ */
+public class TNiftyClientTransport extends TNiftyAsyncClientTransport {
+
+ private final ChannelBuffer readBuffer;
+
+ public TNiftyClientTransport(Channel channel) {
+ super(channel);
+ this.readBuffer = ChannelBuffers.dynamicBuffer(256);
+ setListener(new TNiftyClientListener() {
+ @Override
+ public void onFrameRead(Channel c, ChannelBuffer buffer) {
+ transferReadBuffer(buffer);
+ }
+ });
+ }
+
+ // yeah, mimicking sync with async is just horrible
+ @Override
+ public int read(byte[] bytes, int offset, int length) throws TTransportException {
+ while (true) {
+ synchronized (readBuffer) {
+ int bytesAvailable = readBuffer.readableBytes();
+ if (bytesAvailable > 0) {
+ int begin = readBuffer.readerIndex();
+ readBuffer.readBytes(bytes, offset, Math.min(bytesAvailable, length));
+ int end = readBuffer.readerIndex();
+ return end - begin;
+ }
+ try {
+ readBuffer.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ // yeah, mimicking sync with async is just horrible
+ void transferReadBuffer(ChannelBuffer incoming) {
+ synchronized (readBuffer) {
+ readBuffer.discardReadBytes();
+ readBuffer.writeBytes(incoming);
+ readBuffer.notify();
+ }
+ }
+}
View
53 nifty-client/src/main/java/com/facebook/nifty/client/TNiftyReadOnlyTransport.java
@@ -0,0 +1,53 @@
+package com.facebook.nifty.client;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Wraps incoming channel buffer into TTransport.
+ *
+ */
+public class TNiftyReadOnlyTransport extends TTransport {
+ private final Channel channel;
+ private final ChannelBuffer in;
+
+ public TNiftyReadOnlyTransport(Channel channel, ChannelBuffer in) {
+ this.channel = channel;
+ this.in = in ;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ // no-op
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ channel.close();
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) throws TTransportException {
+ int _read = Math.min(in.readableBytes(), length);
+ in.readBytes(bytes, offset, _read);
+ return _read;
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws TTransportException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ throw new UnsupportedOperationException();
+ }
+}
View
26 nifty-client/src/main/java/com/facebook/nifty/client/Test.java
@@ -0,0 +1,26 @@
+package com.facebook.nifty.client;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class Test {
+ public static void main(String[] args) {
+ File[] files = new File("/Users/jaxlaw").listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith("screenshot-");
+ }
+ });
+ Arrays.sort(files, new Comparator<File>() {
+ @Override
+ public int compare(File o1, File o2) {
+ return o2.getName().compareTo(o1.getName());
+ }
+ });
+ for (File file : files) {
+ System.out.println(file.getName());
+ }
+ }
+}
View
6 nifty-core-asf/src/main/java/com/facebook/nifty/core/NettyServerTransport.java
@@ -22,7 +22,6 @@
* A core channel the decode framed Thrift message, dispatches to the TProcessor given
* and then encode message back to Thrift frame.
*
- * @author jaxlaw
*/
public class NettyServerTransport {
private static final Logger log = LoggerFactory.getLogger(NettyServerTransport.class);
@@ -32,10 +31,12 @@
private ServerBootstrap bootstrap;
private Channel serverChannel;
private final ThriftServerDef def;
+ private final NettyConfigBuilder configBuilder;
@Inject
- public NettyServerTransport(final ThriftServerDef def) {
+ public NettyServerTransport(final ThriftServerDef def, NettyConfigBuilder configBuilder) {
this.def = def;
+ this.configBuilder = configBuilder;
this.port = def.getServerPort();
if (def.isHeaderTransport()) {
throw new UnsupportedOperationException("ASF version does not support THeaderTransport !");
@@ -65,6 +66,7 @@ public void start(ExecutorService bossExecutor, ExecutorService workerExecutor)
workerExecutor
)
);
+ bootstrap.setOptions(configBuilder.getOptions());
bootstrap.setPipelineFactory(pipelineFactory);
log.info("starting transport {}:{}", def.getName(), port);
serverChannel = bootstrap.bind(new InetSocketAddress(port));
View
5 nifty-core-asf/src/main/java/com/facebook/nifty/core/NiftyBootstrap.java
@@ -17,7 +17,6 @@
/**
* A lifecycle object that manages starting up and shutting down multiple core channels.
*
- * @author jaxlaw
*/
public class NiftyBootstrap {
private static final Logger log = LoggerFactory.getLogger(NiftyBootstrap.class);
@@ -33,11 +32,11 @@
* @param thriftServerDefs
*/
@Inject
- public NiftyBootstrap(Set<ThriftServerDef> thriftServerDefs) {
+ public NiftyBootstrap(Set<ThriftServerDef> thriftServerDefs, NettyConfigBuilder configBuilder) {
this.thriftServerDefs = thriftServerDefs;
this.transports = new ArrayList<NettyServerTransport>();
for (ThriftServerDef thriftServerDef : thriftServerDefs) {
- transports.add(new NettyServerTransport(thriftServerDef));
+ transports.add(new NettyServerTransport(thriftServerDef, configBuilder));
}
}
View
16 nifty-core-asf/src/main/java/com/facebook/nifty/core/NiftyDispatcher.java
@@ -17,7 +17,6 @@
/**
* Dispatch TNiftyTransport to the TProcessor and write output back.
*
- * @author jaxlaw
*/
public class NiftyDispatcher extends SimpleChannelUpstreamHandler {
@@ -52,9 +51,9 @@ public void run() {
outProtocol
);
} catch (TException e1) {
- log.error("Exception during thrift message dispatch", e1);
+ log.error("Exception while invoking!", e1);
+ closeChannel(ctx);
}
-
}
}
);
@@ -65,8 +64,13 @@ public void run() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- // most of the time this is just socket close by client
- // thrift protocol does not define any connection shutdown
- log.debug("Exception caught in dispatcher : ", e.getCause());
+ // Any out of band exception are caught here and we tear down the socket
+ closeChannel(ctx);
+ }
+
+ private void closeChannel(ChannelHandlerContext ctx) {
+ if (ctx.getChannel().isOpen()) {
+ ctx.getChannel().close();
+ }
}
}
View
6 nifty-core-fbcode/src/main/java/com/facebook/nifty/core/NettyServerTransport.java
@@ -25,7 +25,6 @@
* A core channel the decode framed Thrift message, dispatches to the TProcessor given
* and then encode message back to Thrift frame.
*
- * @author jaxlaw
*/
public class NettyServerTransport {
private static final Logger log = LoggerFactory.getLogger(NettyServerTransport.class);
@@ -35,10 +34,12 @@
private ServerBootstrap bootstrap;
private Channel serverChannel;
private final ThriftServerDef def;
+ private final NettyConfigBuilder configBuilder;
@Inject
- public NettyServerTransport(final ThriftServerDef def) {
+ public NettyServerTransport(final ThriftServerDef def, NettyConfigBuilder configBuilder) {
this.def = def;
+ this.configBuilder = configBuilder;
this.port = def.getServerPort();
if (def.isHeaderTransport()) {
this.pipelineFactory = new ChannelPipelineFactory() {
@@ -87,6 +88,7 @@ public void start(ExecutorService bossExecutor, ExecutorService workerExecutor)
workerExecutor
)
);
+ bootstrap.setOptions(configBuilder.getOptions());
bootstrap.setPipelineFactory(pipelineFactory);
log.info("starting transport {}:{}", def.getName(), port);
serverChannel = bootstrap.bind(new InetSocketAddress(port));
View
5 nifty-core-fbcode/src/main/java/com/facebook/nifty/core/NiftyBootstrap.java
@@ -17,7 +17,6 @@
/**
* A lifecycle object that manages starting up and shutting down multiple core channels.
*
- * @author jaxlaw
*/
public class NiftyBootstrap {
private static final Logger log = LoggerFactory.getLogger(NiftyBootstrap.class);
@@ -33,11 +32,11 @@
* @param thriftServerDefs
*/
@Inject
- public NiftyBootstrap(Set<ThriftServerDef> thriftServerDefs) {
+ public NiftyBootstrap(Set<ThriftServerDef> thriftServerDefs, NettyConfigBuilder configBuilder) {
this.thriftServerDefs = thriftServerDefs;
this.transports = new ArrayList<NettyServerTransport>();
for (ThriftServerDef thriftServerDef : thriftServerDefs) {
- transports.add(new NettyServerTransport(thriftServerDef));
+ transports.add(new NettyServerTransport(thriftServerDef, configBuilder));
}
}
View
15 nifty-core-fbcode/src/main/java/com/facebook/nifty/core/NiftyDispatcher.java
@@ -20,7 +20,6 @@
/**
* Dispatch TNiftyTransport to the TProcessor and write output back.
*
- * @author jaxlaw
*/
public class NiftyDispatcher extends SimpleChannelUpstreamHandler {
@@ -61,7 +60,8 @@ public InetAddress getPeerAddress() {
}
);
} catch (TException e1) {
- log.error("Exception during thrift message dispatch", e1);
+ log.error("Exception while invoking!", e1);
+ closeChannel(ctx);
}
}
@@ -74,8 +74,13 @@ public InetAddress getPeerAddress() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- // most of the time this is just socket close by client
- // thrift protocol does not define any connection shutdown
- log.debug("Exception caught in dispatcher : ", e.getCause());
+ // Any out of band exception are caught here and we tear down the socket
+ closeChannel(ctx);
+ }
+
+ private void closeChannel(ChannelHandlerContext ctx) {
+ if (ctx.getChannel().isOpen()) {
+ ctx.getChannel().close();
+ }
}
}
View
1  nifty-core/src/main/java/com/facebook/nifty/core/ChannelStatistics.java
@@ -15,7 +15,6 @@
/**
* Counters for number of channels open, generic traffic stats and maybe cleanup logic here.
*
- * @author jaxlaw
*/
public class ChannelStatistics extends SimpleChannelHandler {
View
39 nifty-core/src/main/java/com/facebook/nifty/core/NettyConfigBuilder.java
@@ -0,0 +1,39 @@
+package com.facebook.nifty.core;
+
+import com.google.inject.Inject;
+import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
+import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
+
+import java.lang.reflect.Proxy;
+
+/*
+* Hooks for configuring various parts of Netty.
+*/
+public class NettyConfigBuilder extends NettyConfigBuilderBase {
+
+ private final NioSocketChannelConfig socketChannelConfig = (NioSocketChannelConfig)
+ Proxy.newProxyInstance(
+ getClass().getClassLoader(),
+ new Class<?>[]{NioSocketChannelConfig.class},
+ new Magic("child.")
+ );
+ private final ServerSocketChannelConfig serverSocketChannelConfig = (ServerSocketChannelConfig)
+ Proxy.newProxyInstance(
+ getClass().getClassLoader(),
+ new Class<?>[]{ServerSocketChannelConfig.class},
+ new Magic("")
+ );
+
+ @Inject
+ public NettyConfigBuilder() {
+ }
+
+
+ public NioSocketChannelConfig getSocketChannelConfig() {
+ return socketChannelConfig;
+ }
+
+ public ServerSocketChannelConfig getServerSocketChannelConfig() {
+ return serverSocketChannelConfig;
+ }
+}
View
67 nifty-core/src/main/java/com/facebook/nifty/core/NettyConfigBuilderBase.java
@@ -0,0 +1,67 @@
+package com.facebook.nifty.core;
+
+import com.google.inject.Inject;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/*
+* Hooks for configuring various parts of Netty.
+*/
+public abstract class NettyConfigBuilderBase {
+
+ private final Map<String, Object> options = new HashMap<String, Object>();
+
+ @Inject
+ public NettyConfigBuilderBase() {
+ }
+
+ public Map<String, Object> getOptions() {
+ return Collections.unmodifiableMap(options);
+ }
+
+ // Magic alert ! Content of this class is considered ugly and magical.
+ // For all intents and purposes this is to create a Map with the correct
+ // key and value pairs for Netty's Bootstrap to consume.
+ //
+ // sadly Netty does not define any constant strings whatsoever for the proper key to
+ // use and it's all based on standard java bean attributes.
+ //
+ // A ChannelConfig impl in netty is also tied with a socket, but since all
+ // these configs are interfaces we can do a bit of magic hacking here.
+
+ protected class Magic implements InvocationHandler {
+ private final String prefix;
+
+ public Magic(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ // we are only interested in setters with single arg
+ if (proxy != null) {
+ if (method.getName().equals("toString")) {
+ return "this is a magic proxy";
+ } else if (method.getName().equals("equals")) {
+ return Boolean.FALSE;
+ } else if (method.getName().equals("hashCode")) {
+ return 0;
+ }
+ }
+ // we don't support multi-arg setters
+ if (method.getName().startsWith("set") && args.length == 1) {
+ String attributeName = method.getName().substring(3);
+ // camelCase it
+ attributeName = attributeName.substring(0, 1).toLowerCase() + attributeName.substring(1);
+ // now this is our key
+ options.put(prefix + attributeName, args[0]);
+ return null;
+ }
+ throw new UnsupportedOperationException();
+ }
+ }
+}
View
1  nifty-core/src/main/java/com/facebook/nifty/core/NettyThriftDecoder.java
@@ -9,7 +9,6 @@
/**
* Converts ChannelBuffer into TNiftyTransport.
*
- * @author jaxlaw
*/
public class NettyThriftDecoder extends OneToOneDecoder {
@Override
View
19 nifty-core/src/main/java/com/facebook/nifty/core/TNiftyTransport.java
@@ -8,18 +8,16 @@
/**
* Wraps incoming channel buffer into TTransport and provides a output buffer.
- *
- * @author jaxlaw
*/
public class TNiftyTransport extends TTransport {
private final Channel channel;
- private final ChannelBuffer buf;
+ private final ChannelBuffer in;
private final ChannelBuffer out;
private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 1024;
- public TNiftyTransport(Channel channel, ChannelBuffer buf) {
+ public TNiftyTransport(Channel channel, ChannelBuffer in) {
this.channel = channel;
- this.buf = buf;
+ this.in = in;
this.out = ChannelBuffers.dynamicBuffer(DEFAULT_OUTPUT_BUFFER_SIZE);
}
@@ -41,14 +39,9 @@ public void close() {
@Override
public int read(byte[] bytes, int offset, int length) throws TTransportException {
- int remaining = buf.readableBytes();
- if (length > remaining) {
- buf.readBytes(bytes, offset, remaining);
- return remaining;
- } else {
- buf.readBytes(bytes, offset, length);
- return length;
- }
+ int _read = Math.min(in.readableBytes(), length);
+ in.readBytes(bytes, offset, _read);
+ return _read;
}
@Override
View
1  nifty-core/src/main/java/com/facebook/nifty/core/ThriftServerDef.java
@@ -8,7 +8,6 @@
/**
* Descriptor for a Thrift Server. This defines a listener port that Nifty need to start a Thrift endpoint.
*
- * @author jaxlaw
*/
public class ThriftServerDef {
private final int serverPort;
View
1  nifty-core/src/main/java/com/facebook/nifty/core/ThriftServerDefBuilder.java
@@ -23,7 +23,6 @@
* <p/>
* </code>
*
- * @author jaxlaw
*/
public class ThriftServerDefBuilder {
private static final AtomicInteger ID = new AtomicInteger(1);
View
33 nifty-core/src/main/java/com/facebook/nifty/guice/NiftyModule.java
@@ -1,22 +1,39 @@
package com.facebook.nifty.guice;
+import com.facebook.nifty.core.NettyConfigBuilder;
import com.facebook.nifty.core.ThriftServerDef;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import javax.inject.Provider;
-/**
- * Author @jaxlaw
- * Date: 4/22/12
- * Time: 10:25 PM
- */
public abstract class NiftyModule extends AbstractModule {
+
+ private boolean configBound = false;
+
@Override
protected void configure() {
configureNifty();
}
+ public NiftyModule withDefaultNettyConfig() {
+ if (!configBound) {
+ binder().bind(NettyConfigBuilder.class).toInstance(new NettyConfigBuilder());
+ configBound = true;
+ return this;
+ }
+ throw iae();
+ }
+
+ public NiftyModule withNettyConfig(Class<? extends Provider<NettyConfigBuilder>> provider) {
+ if (!configBound) {
+ binder().bind(NettyConfigBuilder.class).toProvider(provider);
+ configBound = true;
+ return this;
+ }
+ throw iae();
+ }
+
/**
* User of Nifty via guice should override this method and use the little DSL defined here.
*/
@@ -71,4 +88,10 @@ public void to(com.google.inject.Key<? extends ThriftServerDef> key) {
.addBinding().to(key).asEagerSingleton();
}
}
+
+ private IllegalStateException iae() {
+ return new IllegalStateException(
+ "config already bound ! call useDefaultNettyConfig or withNettyConfig only once."
+ );
+ }
}
View
5 nifty-core/src/test/java/com/facebook/nifty/core/TestCodec.java
@@ -10,11 +10,6 @@
import org.jboss.netty.channel.ChannelHandlerContext;
import org.testng.annotations.Test;
-/**
- * Author @jaxlaw
- * Date: 4/20/12
- * Time: 2:38 PM
- */
public class TestCodec {
@Test(groups = "fast")
public void testDecoder() {
View
51 nifty-core/src/test/java/com/facebook/nifty/core/TestNettyConfigBuilder.java
@@ -0,0 +1,51 @@
+package com.facebook.nifty.core;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.ServerSocketChannelConfig;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+public class TestNettyConfigBuilder {
+
+ private int port;
+
+ @BeforeTest(alwaysRun = true)
+ public void setup() {
+ try {
+ ServerSocket s = new ServerSocket();
+ s.bind(new InetSocketAddress(0));
+ port = s.getLocalPort();
+ s.close();
+ } catch (IOException e) {
+ port = 8080;
+ }
+ }
+
+ @Test
+ public void testNettyConfigBuilder() {
+ NettyConfigBuilder configBuilder = new NettyConfigBuilder();
+
+ configBuilder.getServerSocketChannelConfig().setReceiveBufferSize(10000);
+ configBuilder.getServerSocketChannelConfig().setBacklog(1000);
+ configBuilder.getServerSocketChannelConfig().setReuseAddress(true);
+
+ ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory());
+ bootstrap.setOptions(configBuilder.getOptions());
+ bootstrap.setPipelineFactory(Channels.pipelineFactory(Channels.pipeline()));
+ Channel serverChannel = bootstrap.bind(new InetSocketAddress(port));
+
+ Assert.assertEquals(((ServerSocketChannelConfig)serverChannel.getConfig()).getReceiveBufferSize(), 10000);
+ Assert.assertEquals(((ServerSocketChannelConfig)serverChannel.getConfig()).getBacklog(), 1000);
+ Assert.assertTrue(((ServerSocketChannelConfig)serverChannel.getConfig()).isReuseAddress());
+
+
+ }
+}
View
5 nifty-core/src/test/java/com/facebook/nifty/core/TestServerDefBuilder.java
@@ -5,11 +5,6 @@
import org.testng.Assert;
import org.testng.annotations.Test;
-/**
- * Author @jaxlaw
- * Date: 4/20/12
- * Time: 3:51 PM
- */
public class TestServerDefBuilder {
@Test(groups = "fast")
View
20 nifty-examples/nifty-asf-server/pom.xml
@@ -24,6 +24,11 @@
</dependency>
<dependency>
<groupId>com.facebook.nifty</groupId>
+ <artifactId>nifty-client</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.nifty</groupId>
<artifactId>nifty-core-asf</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
@@ -44,6 +49,21 @@
<artifactId>libthrift</artifactId>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.4</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.13</version>
+ </dependency>
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
View
16 nifty-examples/nifty-asf-server/src/main/java/com/facebook/nifty/server/Plain.java
@@ -1,5 +1,6 @@
package com.facebook.nifty.server;
+import com.facebook.nifty.core.NettyConfigBuilder;
import com.facebook.nifty.core.NiftyBootstrap;
import com.facebook.nifty.core.ThriftServerDefBuilder;
import com.facebook.nifty.guice.NiftyModule;
@@ -12,6 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Provider;
import java.util.List;
/**
@@ -46,7 +48,7 @@ public ResultCode Log(List<LogEntry> messages) throws TException {
.build()
);
}
- }
+ }.withNettyConfig(NettyConfigProvider.class)
)
.getInstance(NiftyBootstrap.class);
bootstrap.start();
@@ -60,4 +62,16 @@ public void run() {
);
}
+ public static class NettyConfigProvider implements Provider<NettyConfigBuilder> {
+
+ @Override
+ public NettyConfigBuilder get() {
+ NettyConfigBuilder nettyConfigBuilder = new NettyConfigBuilder();
+ nettyConfigBuilder.getSocketChannelConfig().setTcpNoDelay(true);
+ nettyConfigBuilder.getSocketChannelConfig().setConnectTimeoutMillis(5000);
+ nettyConfigBuilder.getSocketChannelConfig().setTcpNoDelay(true);
+ return nettyConfigBuilder;
+ }
+ }
+
}
View
23 nifty-examples/nifty-asf-server/src/test/java/com/facebook/nifty/server/TestPlainServer.java
@@ -1,5 +1,6 @@
package com.facebook.nifty.server;
+import com.facebook.nifty.client.NiftyClient;
import com.facebook.nifty.core.NiftyBootstrap;
import com.facebook.nifty.core.ThriftServerDefBuilder;
import com.facebook.nifty.guice.NiftyModule;
@@ -25,11 +26,6 @@
import java.util.Arrays;
import java.util.List;
-/**
- * Author @jaxlaw
- * Date: 4/24/12
- * Time: 3:56 PM
- */
public class TestPlainServer {
private static final Logger log = LoggerFactory.getLogger(TestPlainServer.class);
@@ -65,7 +61,7 @@ protected void configureNifty() {
@Override
public ResultCode Log(List<LogEntry> messages) throws TException {
for (LogEntry message : messages) {
- log.info("%s: %s", message.getCategory(), message.getMessage());
+ log.info("{}: {}", message.getCategory(), message.getMessage());
}
return ResultCode.OK;
}
@@ -88,6 +84,16 @@ public void testMethodCalls() throws Exception {
client.Log(Arrays.asList(new LogEntry("hello", "world")));
}
+ @Test(groups = "fast")
+ public void testMethodCallsWithNiftyClient() throws Exception {
+ scribe.Client client = makeNiftyClient();
+ int max = (int) (Math.random() * 100);
+ for (int i = 0; i < max; i++) {
+ client.Log(Arrays.asList(new LogEntry("hello", "world " + i)));
+ }
+ }
+
+
private scribe.Client makeClient() throws TTransportException {
TSocket socket = new TSocket("localhost", port);
socket.open();
@@ -95,6 +101,11 @@ public void testMethodCalls() throws Exception {
return new scribe.Client(tp);
}
+ private scribe.Client makeNiftyClient() throws TTransportException {
+ TBinaryProtocol tp = new TBinaryProtocol((new NiftyClient().connectSync(new InetSocketAddress("localhost", port))));
+ return new scribe.Client(tp);
+ }
+
@AfterTest(alwaysRun = true)
public void teardown() throws InterruptedException {
View
2  nifty-examples/nifty-fbcode-server/src/main/java/com/facebook/nifty/server/Main.java
@@ -49,7 +49,7 @@ public void configure() {
protected void configureNifty() {
bind().toProvider(ExampleThriftServerProvider.class);
}
- }
+ }.withDefaultNettyConfig()
)
.getInstance(LifeCycleManager.class)
.start();
View
9 nifty-examples/nifty-fbcode-server/src/test/java/com/facebook/nifty/server/TestNettyServerTransport.java
@@ -3,6 +3,7 @@
import com.facebook.fb303.FacebookBase;
import com.facebook.fb303.FacebookService;
import com.facebook.fb303.fb_status;
+import com.facebook.nifty.core.NettyConfigBuilder;
import com.facebook.nifty.core.NettyServerTransport;
import com.facebook.nifty.core.ThriftServerDefBuilder;
import junit.framework.Assert;
@@ -20,11 +21,6 @@
import java.net.ServerSocket;
import java.util.concurrent.Executors;
-/**
- * Author @jaxlaw
- * Date: 4/20/12
- * Time: 3:54 PM
- */
public class TestNettyServerTransport {
public static final String VERSION = "1.0";
@@ -62,7 +58,8 @@ public int getStatus() {
}
)
)
- .build()
+ .build(),
+ new NettyConfigBuilder()
);
transport.start(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
}
View
9 ...ples/nifty-fbcode-server/src/test/java/com/facebook/nifty/server/TestNettyServerWithHeaderTransport.java
@@ -3,6 +3,7 @@
import com.facebook.fb303.FacebookBase;
import com.facebook.fb303.FacebookService;
import com.facebook.fb303.fb_status;
+import com.facebook.nifty.core.NettyConfigBuilder;
import com.facebook.nifty.core.NettyServerTransport;
import com.facebook.nifty.core.ThriftServerDefBuilder;
import junit.framework.Assert;
@@ -23,11 +24,6 @@
import java.net.ServerSocket;
import java.util.concurrent.Executors;
-/**
- * Author @jaxlaw
- * Date: 4/20/12
- * Time: 3:54 PM
- */
public class TestNettyServerWithHeaderTransport {
public static final String VERSION = "1.0";
@@ -66,7 +62,8 @@ public int getStatus() {
)
)
.usingHeaderTransport()
- .build()
+ .build(),
+ new NettyConfigBuilder()
);
transport.start(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
}
View
7 pom.xml
@@ -13,6 +13,7 @@
<module>nifty-core</module>
<module>nifty-core-asf</module>
<module>nifty-core-fbcode</module>
+ <module>nifty-client</module>
<module>nifty-examples</module>
</modules>
@@ -129,7 +130,7 @@
<version>3.0.0</version>
</requireMavenVersion>
<requireJavaVersion>
- <version>1.6</version>
+ <version>1.7</version>
</requireJavaVersion>
</rules>
</configuration>
@@ -179,8 +180,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
</configuration>
</plugin>
Please sign in to comment.
Something went wrong with that request. Please try again.