Permalink
Browse files

Added a Netty implementation. Next: Remove nio package.

  • Loading branch information...
1 parent 2280e1b commit adb816260b1cd5a5d9fbca54ea53b8f18875c22e @aslakhellesoy committed Feb 12, 2011
View
@@ -1,19 +1,10 @@
#!/usr/bin/env node
var DNode = require('dnode');
-var sys = require('sys');
-
var m = process.ARGV[2];
var client = DNode.connect(6060, function (remote) {
-
- remote[m](function (x) {
- console.log(x);
- });
-// remote.sTimesTen(5, function (m) {
-// sys.puts(m); // 50, computation executed on the server
-// remote.timesTen(m, function (n) {
-// sys.puts(n); // 50 * 10 == 500
-// });
-// });
+ remote[m](function (x) {
+ console.log(x);
+ });
});
View
@@ -2,16 +2,13 @@
var DNode = require('dnode');
var sys = require('sys');
-// server-side:
var server = DNode({
-// timesTen : function (n,reply) { reply(n * 10) },
- moo : function (reply) {
- reply(100);
- server.close();
- },
- boo : function (n, reply) {
- reply(n+1);
- server.close();
- }
-// sTimesTen : DNode.sync(function (n) { return n * 10 }),
+ moo : function (reply) {
+ reply(100);
+ server.close();
+ },
+ boo : function (n, reply) {
+ reply(n+1);
+ server.close();
+ }
}).listen(6060);
@@ -1,14 +1,18 @@
package dnode;
import com.google.gson.*;
+import org.jboss.netty.channel.Channel;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class DNode {
private final DNodeObject instance;
private Map<String, Callback> callbacks = new HashMap<String, Callback>();
+ private List<Channel> connections = new ArrayList<Channel>();
public DNode(Object instance) {
this.instance = new DNodeObject(instance);
@@ -18,6 +22,7 @@ public void listen(Server server) throws IOException {
server.listen(this);
}
+ @Deprecated
public void handle(final Connection connection) {
try {
connection.send(methods());
@@ -67,7 +72,7 @@ private JsonElement toJson(Object o) {
public void emit(String event, Object... args) {
Callback callback = callbacks.get(event);
- if(callback != null) { // TODO: Loop over a list
+ if (callback != null) { // TODO: Loop over a list
callback.call(args);
}
}
@@ -98,4 +103,38 @@ private String responseString(JsonElement method, JsonArray arguments, JsonEleme
public void on(String event, Callback callback) {
callbacks.put(event, callback);
}
+
+ public void onConnection(Channel connection) {
+ connections.add(connection);
+ connection.write(methods() + "\n");
+ }
+
+ public void handleRequest(JsonObject req, Channel channel) {
+ JsonPrimitive method = req.getAsJsonPrimitive("method");
+ if (method.isString() && method.getAsString().equals("methods")) {
+ handleMethods(req.getAsJsonArray("arguments").get(0).getAsJsonObject());
+ } else {
+ handleInvocation(req, channel);
+ }
+ }
+
+ private void handleMethods(JsonObject arguments) {
+ }
+
+ private void handleInvocation(JsonObject invocation, final Channel channel) {
+ Callback callback = new Callback() {
+ @Override
+ public void call(Object... args) throws RuntimeException {
+ JsonArray jsonArgs = transform(args);
+ channel.write(responseString(0, jsonArgs, new JsonObject(), new JsonArray()) + "\n");
+ }
+ };
+ instance.invoke(invocation, callback);
+ }
+
+ public void closeAllConnections() {
+ for (Channel connection : connections) {
+ connection.close();
+ }
+ }
}
@@ -4,6 +4,7 @@
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
+import org.jboss.netty.channel.Channel;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -43,9 +44,9 @@ public JsonElement getCallbacks() {
return callbacks;
}
- public void invoke(JsonObject invocationJson, Callback callback) {
+ public void invoke(JsonObject invocation, Callback callback) {
try {
- instance.getClass().getDeclaredMethods()[invocationJson.get("method").getAsInt()].invoke(instance, callback);
+ instance.getClass().getDeclaredMethods()[invocation.get("method").getAsInt()].invoke(instance, callback);
} catch (IllegalAccessException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (InvocationTargetException e) {
@@ -4,6 +4,5 @@
public interface Server {
void listen(DNode dnode) throws IOException;
-
void shutdown() throws IOException;
}
@@ -0,0 +1,30 @@
+package dnode.netty;
+
+import dnode.DNode;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+public class DNodePipelineFactory implements ChannelPipelineFactory {
+ private final DNode dnode;
+
+ public DNodePipelineFactory(DNode dnode) {
+ this.dnode = dnode;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = pipeline();
+ pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
+ pipeline.addLast("decoder", new StringDecoder());
+ pipeline.addLast("encoder", new StringEncoder());
+ pipeline.addLast("handler", new DNodeServerHandler(dnode));
+ return pipeline;
+ }
+}
@@ -0,0 +1,26 @@
+package dnode.netty;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import dnode.DNode;
+import org.jboss.netty.channel.*;
+
+public class DNodeServerHandler extends SimpleChannelUpstreamHandler {
+ private final DNode dnode;
+
+ public DNodeServerHandler(DNode dnode) {
+ this.dnode = dnode;
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ dnode.onConnection(e.getChannel());
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ String message = (String) e.getMessage();
+ JsonElement json = new JsonParser().parse(message);
+ dnode.handleRequest(json.getAsJsonObject(), e.getChannel());
+ }
+}
@@ -0,0 +1,42 @@
+package dnode.netty;
+
+import dnode.DNode;
+import dnode.Server;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+public class NettyServer implements Server {
+ private final int port;
+ private Channel channel;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public void listen(DNode dnode) throws IOException {
+ ChannelFactory factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+
+ ServerBootstrap bootstrap = new ServerBootstrap(factory);
+
+ bootstrap.setPipelineFactory(new DNodePipelineFactory(dnode));
+
+// bootstrap.setOption("child.tcpNoDelay", true);
+// bootstrap.setOption("child.keepAlive", true);
+
+ channel = bootstrap.bind(new InetSocketAddress(port));
+ dnode.emit("ready");
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ channel.close();
+ }
+}
@@ -1,38 +1,42 @@
package dnode;
-import dnode.nio.NIOServer;
-import dnode.webbit.WebbitServer;
+import dnode.netty.NettyServer;
import junit.framework.AssertionFailedError;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import webbit.WebServer;
import webbit.WebServers;
-import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.URI;
+import java.io.Reader;
import static org.junit.Assert.assertEquals;
public class DNodeTest {
private DNode dNode;
- private Server server;
+ private final Server server = new NettyServer(6060);
+;
- public static class Mooer {
+ public class Mooer {
private final int moo;
+ public DNode dNode;
public Mooer(int moo) {
this.moo = moo;
}
- public void moo(Callback cb) {
+ public void moo(Callback cb) throws IOException {
cb.call(moo);
+ dNode.closeAllConnections();
+ server.shutdown();
}
- public void boo(Callback cb) {
+ public void boo(Callback cb) throws IOException {
cb.call(moo * 10);
+ dNode.closeAllConnections();
+ server.shutdown();
}
}
@@ -45,37 +49,41 @@ public void shutdownServer() throws IOException {
@Test
public void shouldTalk() throws IOException, InterruptedException {
- dNode = new DNode(new Mooer(100));
+ createDnode(100);
runServer(dNode);
assertEquals("100\n", runClient("moo"));
}
@Test
public void shouldUseDataInInstance() throws IOException, InterruptedException {
- dNode = new DNode(new Mooer(200));
+ createDnode(200);
runServer(dNode);
assertEquals("200\n", runClient("moo"));
}
+ private void createDnode(int moo) {
+ Mooer instance = new Mooer(moo);
+ dNode = new DNode(instance);
+ instance.dNode = dNode;
+ }
+
@Test
public void shouldCallRightMethod() throws IOException, InterruptedException {
- dNode = new DNode(new Mooer(300));
+ createDnode(300);
runServer(dNode);
assertEquals("3000\n", runClient("boo"));
}
@Test
@Ignore
public void shouldTalkUsingWebbit() throws IOException, InterruptedException {
- dNode = new DNode(new Mooer(100));
+ createDnode(100);
runWebbitServer(dNode);
// assertEquals("100\n", runClient("moo"));
// TODO: Run HTMLUnit here.
}
private void runServer(final DNode dNode) throws InterruptedException {
- server = new NIOServer(6060);
-
Thread thread = new Thread(new Runnable() {
public void run() {
try {
@@ -130,11 +138,11 @@ private String runClient(String method) throws IOException, InterruptedException
pb.redirectErrorStream(true);
Process client = pb.start();
- BufferedReader clientStdOut = new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"));
+ Reader clientStdOut = new InputStreamReader(client.getInputStream(), "UTF-8");
StringBuilder result = new StringBuilder();
- String line;
- while ((line = clientStdOut.readLine()) != null) {
- result.append(line).append("\n");
+ int c;
+ while ((c = clientStdOut.read()) != -1) {
+ result.append((char) c);
}
int exit = client.waitFor();
if (exit != 0)

0 comments on commit adb8162

Please sign in to comment.