Permalink
Browse files

chapter3 java examples

  • Loading branch information...
1 parent cb368e0 commit f749e65e467b54193b17f4c096848051a75b3731 @miniway miniway committed Feb 14, 2013
View
@@ -0,0 +1,39 @@
+import org.zeromq.ZMQ.Socket;
+
+import java.util.Random;
+
+public class ZHelper
+{
+ private static Random rand = new Random(System.currentTimeMillis ());
+
+ /**
+ * Receives all message parts from socket, prints neatly
+ */
+ public static void dump (Socket sock)
+ {
+ System.out.println("----------------------------------------");
+ while(true) {
+ byte [] msg = sock.recv (0);
+ boolean isText = true;
+ String data = "";
+ for (int i = 0; i< msg.length; i++) {
+ if (msg[i] < 32 || msg[i] > 127)
+ isText = false;
+ data += String.format ("%02X", msg[i]);
+ }
+ if (isText)
+ data = new String (msg);
+
+ System.out.println (String.format ("[%03d] %s", msg.length, data));
+ if (!sock.hasReceiveMore ())
+ break;
+ }
+ }
+
+ public static void setId (Socket sock)
+ {
+ String identity = String.format ("%04X-%04X", rand.nextInt (), rand.nextInt ());
+
+ sock.setIdentity (identity.getBytes ());
+ }
+}
View
@@ -1,158 +1,150 @@
import java.util.Random;
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
-import org.zeromq.ZMQ.Context;
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZMsg;
+import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
-/**
- * Asynchronous client-to-server (DEALER to ROUTER)
- *
- * This example contains clients and server in order to easily start and stop
- * the demo.
- *
- * They are working independently and communicate only by using the tcp
- * connections. They conceptually act as separate processes.
- *
- * @author Raphaël P. Barazzutti
- */
-
-// The client task connects to the server, then sends a request every second
-// while printing incoming messages as they arrive
-class ClientTask implements Runnable {
- final private int id;
-
- public ClientTask(int id) {
- this.id = id;
- }
-
- public void run() {
- Context context = ZMQ.context(1);
- Socket client = context.socket(ZMQ.DEALER);
- String identity = "worker-" + id;
- client.setIdentity(identity.getBytes());
- client.connect("tcp://localhost:5555");
- System.out.println("Client " + identity + " started");
- ZMQ.Poller poller = context.poller(1);
- poller.register(client, ZMQ.Poller.POLLIN);
- int requestNbr = 0;
- while (true)
- for (int i = 0; i < 100; i++) {
- poller.poll(10000);
- if (poller.pollin(0)) {
- byte msg[] = client.recv(0);
- System.out.println("Client " + identity + " received "
- + new String(msg));
- }
- System.out.println("Req #" + (++requestNbr) + " sent");
- client.send(("request " + requestNbr).getBytes(), 0);
- }
- }
-
-}
-
-// The server worker receives messages and replies by re-sending them a random
-// number of times (with random delays between replies)
-class ServerWorker implements Runnable {
- private Context context;
- final private int id;
-
- public ServerWorker(Context context, int id) {
- super();
- this.id = id;
- this.context = context;
- }
-
- public void run() {
- Random randomGenerator = new Random();
- Socket worker = context.socket(ZMQ.DEALER);
- worker.connect("inproc://backend");
- System.out.println("Server worker " + id + " started");
- while (true) {
- byte id[] = worker.recv(0);
- byte msg[] = worker.recv(0);
- System.out.println("Server worker " + id + " received "
- + new String(msg) + " from " + new String(id));
- // sending 0..4 replies
- for (int i = 0; i < randomGenerator.nextInt(5); i++) {
- try {
- // sleeping 1s or 1s/2 or 1s/3 .. 1s/9
- Thread.sleep(1000 / (1 + randomGenerator.nextInt(8)));
- worker.send(id, ZMQ.SNDMORE);
- worker.send(msg, 0);
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
- }
-
- }
-
- }
-}
-
-// The server task uses a pool of workers to handle the messages coming from
-// clients.
//
-// The main server thread forwards messages between the front-end (connected to
-// clients) and the back-end (connected to workers)
+//Asynchronous client-to-server (DEALER to ROUTER)
//
-// The workers handle one request at a time, but a client might have its
-// messages handled by more than one worker
-class ServerTask implements Runnable {
- @Override
- public void run() {
- Context context = ZMQ.context(1);
- ZMQ.Socket frontend = context.socket(ZMQ.ROUTER);
- frontend.bind("tcp://*:5555");
-
- ZMQ.Socket backend = context.socket(ZMQ.DEALER);
- backend.bind("inproc://backend");
-
- for (int i = 0; i < 5; i++)
- (new Thread(new ServerWorker(context, i))).start();
-
- ZMQ.Poller poller = context.poller(2);
- poller.register(frontend, ZMQ.Poller.POLLIN);
- poller.register(backend, ZMQ.Poller.POLLIN);
-
- // It is possible to easily connect frontend to backend using a queue
- // device
- // ZMQQueue queue = new ZMQQueue(context, frontend, backend);
- // queue.run();
- //
- // Doing it manually gives a better understanding of the mechanisms
- // (it's a tuto) and might be useful in debugging
- while (true) {
- poller.poll();
- if (poller.pollin(0)) {
- byte[] id = frontend.recv(0);
- byte[] msg = frontend.recv(0);
- System.out.println("Server received " + new String(msg)
- + " id " + new String(id));
-
- backend.send(id, ZMQ.SNDMORE);
- backend.send(msg, 0);
- }
- if (poller.pollin(1)) {
- byte[] id = backend.recv(0);
- byte[] msg = backend.recv(0);
- System.out.println("Sending to frontend " + new String(msg)
- + " id " + new String(id));
- frontend.send(id, ZMQ.SNDMORE);
- frontend.send(msg, 0);
- }
- }
- }
-}
-
-// The main thread of the Java program, it starts three clients then starts a
-// server
-public class AsyncSrv {
- public static void main(String args[]) {
- // starting three clients
- for (int i = 0; i < 3; i++)
- (new Thread(new ClientTask(i))).start();
-
- // starting server
- new Thread(new ServerTask()).start();
- }
+//While this example runs in a single process, that is just to make
+//it easier to start and stop the example. Each task has its own
+//context and conceptually acts as a separate process.
+
+
+public class asyncsrv
+{
+ //---------------------------------------------------------------------
+ //This is our client task
+ //It connects to the server, and then sends a request once per second
+ //It collects responses as they arrive, and it prints them out. We will
+ //run several client tasks in parallel, each with a different random ID.
+
+ private static Random rand = new Random(System.nanoTime());
+
+ private static class client_task implements Runnable
+ {
+
+ public void run()
+ {
+ ZContext ctx = new ZContext();
+ Socket client = ctx.createSocket(ZMQ.DEALER);
+
+ // Set random identity to make tracing easier
+ String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
+ client.setIdentity(identity.getBytes());
+ client.connect("tcp://localhost:5570");
+
+ PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) };
+
+ int requestNbr = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ // Tick once per second, pulling in arriving messages
+ for (int centitick = 0; centitick < 100; centitick++) {
+ ZMQ.poll(items, 10);
+ if (items[0].isReadable()) {
+ ZMsg msg = ZMsg.recvMsg(client);
+ msg.getLast().print(identity);
+ msg.destroy();
+ }
+ }
+ client.send(String.format("request #%d", ++requestNbr), 0);
+ }
+ ctx.destroy();
+ }
+ }
+
+ //This is our server task.
+ //It uses the multithreaded server model to deal requests out to a pool
+ //of workers and route replies back to clients. One worker can handle
+ //one request at a time but one client can talk to multiple workers at
+ //once.
+
+ private static class server_task implements Runnable
+ {
+ public void run()
+ {
+ ZContext ctx = new ZContext();
+
+ // Frontend socket talks to clients over TCP
+ Socket frontend = ctx.createSocket(ZMQ.ROUTER);
+ frontend.bind("tcp://*:5570");
+
+ // Backend socket talks to workers over inproc
+ Socket backend = ctx.createSocket(ZMQ.DEALER);
+ backend.bind("inproc://backend");
+
+ // Launch pool of worker threads, precise number is not critical
+ for (int threadNbr = 0; threadNbr < 5; threadNbr++)
+ new Thread(new server_worker(ctx)).start();
+
+ // Connect backend to frontend via a proxy
+ ZMQ.proxy(frontend, backend, null);
+
+ ctx.destroy();
+ }
+ }
+
+ //Each worker task works on one request at a time and sends a random number
+ //of replies back, with random delays between replies:
+
+ private static class server_worker implements Runnable
+ {
+ private ZContext ctx;
+
+ public server_worker(ZContext ctx)
+ {
+ this.ctx = ctx;
+ }
+
+ public void run()
+ {
+ Socket worker = ctx.createSocket(ZMQ.DEALER);
+ worker.connect("inproc://backend");
+
+ while (!Thread.currentThread().isInterrupted()) {
+ // The DEALER socket gives us the address envelope and message
+ ZMsg msg = ZMsg.recvMsg(worker);
+ ZFrame address = msg.pop();
+ ZFrame content = msg.pop();
+ assert (content != null);
+ msg.destroy();
+
+ // Send 0..4 replies back
+ int replies = rand.nextInt(5);
+ for (int reply = 0; reply < replies; reply++) {
+ // Sleep for some fraction of a second
+ try {
+ Thread.sleep(rand.nextInt(1000) + 1);
+ } catch (InterruptedException e) {
+ }
+ address.send(worker, ZFrame.REUSE + ZFrame.MORE);
+ content.send(worker, ZFrame.REUSE);
+ }
+ address.destroy();
+ content.destroy();
+ }
+ ctx.destroy();
+ }
+ }
+
+ //The main thread simply starts several clients, and a server, and then
+ //waits for the server to finish.
+
+ public static void main(String[] args) throws Exception
+ {
+ ZContext ctx = new ZContext();
+ new Thread(new client_task()).start();
+ new Thread(new client_task()).start();
+ new Thread(new client_task()).start();
+ new Thread(new server_task()).start();
+
+ // Run for 5 seconds then quit
+ Thread.sleep(5 * 1000);
+ ctx.destroy();
+ }
}
View
@@ -9,7 +9,7 @@ VERSION="0.3.0-SNAPSHOT"
JEROMQ="jeromq-$VERSION.jar"
CLASSPATH=".:$JEROMQ"
-JZMQ_VERSION="2.1.0-SNAPSHOT"
+JZMQ_VERSION="2.1.1-SNAPSHOT"
JZMQ="jzmq-$JZMQ_VERSION.jar"
JZMQ_CLASSPATH=".:$JZMQ"
@@ -19,7 +19,7 @@ if [ ! -f $JEROMQ ]; then
fi
if [ ! -f $JZMQ ]; then
wget -O $JZMQ --no-check-certificate \
- "http://amuraru.github.com/zeromq-maven/repository/releases/org/zeromq/jzmq/$JZMQ_VERSION/$JZMQ"
+ "https://oss.sonatype.org/service/local/artifact/maven/redirect?r=snapshots&g=org.zeromq&a=jzmq&v=$JZMQ_VERSION&e=jar"
fi
if [ /$1/ = /all/ ]; then
@@ -31,7 +31,7 @@ if [ /$1/ = /all/ ]; then
elif [ /$1/ = /jzmq/ ]; then
rm -f *.class
echo "Building JZMQ Java examples..."
- for MAIN in `egrep -l "main \(" *.java`; do
+ for MAIN in `egrep -l "main ?\(" *.java`; do
echo "$MAIN"
javac -cp $JZMQ_CLASSPATH $MAIN
done
Oops, something went wrong.

0 comments on commit f749e65

Please sign in to comment.