Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #292 from miniway/master

Java chapter 2 and chapter 3 examples
  • Loading branch information...
commit 207903241ed223e805b05405de1e156f33e41f0d 2 parents 478408a + f749e65
@hintjens hintjens authored
Showing with 2,174 additions and 1,006 deletions.
  1. +1 −1  chapter2.txt
  2. +39 −0 examples/Java/ZHelper.java
  3. +142 −150 examples/Java/asyncsrv.java
  4. +2 −2 examples/Java/build
  5. +8 −16 examples/Java/hwclient.java
  6. +5 −13 examples/Java/hwserver.java
  7. +36 −0 examples/Java/identity.java
  8. +13 −19 examples/Java/interrupt.java
  9. +172 −174 examples/Java/lbbroker.java
  10. +149 −0 examples/Java/lbbroker2.java
  11. +181 −0 examples/Java/lbbroker3.java
  12. +20 −24 examples/Java/msgqueue.java
  13. +29 −29 examples/Java/mspoller.java
  14. +29 −29 examples/Java/msreader.java
  15. +66 −38 examples/Java/mtrelay.java
  16. +59 −58 examples/Java/mtserver.java
  17. +68 −0 examples/Java/peering1.java
  18. +244 −0 examples/Java/peering2.java
  19. +324 −0 examples/Java/peering3.java
  20. +18 −16 examples/Java/psenvpub.java
  21. +19 −16 examples/Java/psenvsub.java
  22. +52 −53 examples/Java/rrbroker.java
  23. +22 −24 examples/Java/rrclient.java
  24. +25 −35 examples/Java/rrworker.java
  25. +67 −69 examples/Java/rtdealer.java
  26. +92 −0 examples/Java/rtreq.java
  27. +10 −5 examples/Java/run
  28. +48 −53 examples/Java/syncpub.java
  29. +37 −40 examples/Java/syncsub.java
  30. +41 −45 examples/Java/tasksink2.java
  31. +2 −2 examples/Java/taskvent.java
  32. +51 −56 examples/Java/taskwork2.java
  33. +1 −1  examples/Java/wuclient.java
  34. +19 −36 examples/Java/wuproxy.java
  35. +1 −1  examples/Java/wuserver.java
  36. +1 −1  fragments/C/iothreads.c
  37. +2 −0  fragments/Java/README
  38. +3 −0  fragments/Java/binding.java
  39. +10 −0 fragments/Java/errorhandling.java
  40. +5 −0 fragments/Java/highreader.java
  41. +8 −0 fragments/Java/interrupt.java
  42. +2 −0  fragments/Java/iothreads.java
  43. +5 −0 fragments/Java/killsignal.java
  44. +14 −0 fragments/Java/lbreader.java
  45. +24 −0 fragments/Java/lowreader.java
  46. +2 −0  fragments/Java/polling.java
  47. +1 −0  fragments/Java/proxy.java
  48. +4 −0 fragments/Java/reactor.java
  49. +1 −0  fragments/Java/zerocopy.java
View
2  chapter2.txt
@@ -148,7 +148,7 @@ Since 0MQ/3.3, however, the {{ZMQ_ROUTER_RAW}} socket option lets you read and w
We said that 0MQ does I/O in a background thread. One I/O thread (for all sockets) is sufficient for all but the most extreme applications. When you create a new context it starts with one I/O thread. The general rule of thumb is to allow one I/O thread per gigabyte of data in or out per second. To raise the number of I/O threads, use the {{zmq_ctx_set[3]}} call //before// creating any sockets:
[[code type="fragment" name="iothreads"]]
-int io-threads = 4;
+int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);
View
39 examples/Java/ZHelper.java
@@ -0,0 +1,39 @@
+import org.zeromq.ZMQ.Socket;
+
+import java.util.Random;
+
+public class ZHelper
+{
+ private static Random rand = new Random(System.currentTimeMillis ());
+
+ /**
+ * Receives all message parts from socket, prints neatly
+ */
+ public static void dump (Socket sock)
+ {
+ System.out.println("----------------------------------------");
+ while(true) {
+ byte [] msg = sock.recv (0);
+ boolean isText = true;
+ String data = "";
+ for (int i = 0; i< msg.length; i++) {
+ if (msg[i] < 32 || msg[i] > 127)
+ isText = false;
+ data += String.format ("%02X", msg[i]);
+ }
+ if (isText)
+ data = new String (msg);
+
+ System.out.println (String.format ("[%03d] %s", msg.length, data));
+ if (!sock.hasReceiveMore ())
+ break;
+ }
+ }
+
+ public static void setId (Socket sock)
+ {
+ String identity = String.format ("%04X-%04X", rand.nextInt (), rand.nextInt ());
+
+ sock.setIdentity (identity.getBytes ());
+ }
+}
View
292 examples/Java/asyncsrv.java
@@ -1,158 +1,150 @@
import java.util.Random;
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
-import org.zeromq.ZMQ.Context;
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZMsg;
+import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
-/**
- * Asynchronous client-to-server (DEALER to ROUTER)
- *
- * This example contains clients and server in order to easily start and stop
- * the demo.
- *
- * They are working independently and communicate only by using the tcp
- * connections. They conceptually act as separate processes.
- *
- * @author Raphaël P. Barazzutti
- */
-
-// The client task connects to the server, then sends a request every second
-// while printing incoming messages as they arrive
-class ClientTask implements Runnable {
- final private int id;
-
- public ClientTask(int id) {
- this.id = id;
- }
-
- public void run() {
- Context context = ZMQ.context(1);
- Socket client = context.socket(ZMQ.DEALER);
- String identity = "worker-" + id;
- client.setIdentity(identity.getBytes());
- client.connect("tcp://localhost:5555");
- System.out.println("Client " + identity + " started");
- ZMQ.Poller poller = context.poller(1);
- poller.register(client, ZMQ.Poller.POLLIN);
- int requestNbr = 0;
- while (true)
- for (int i = 0; i < 100; i++) {
- poller.poll(10000);
- if (poller.pollin(0)) {
- byte msg[] = client.recv(0);
- System.out.println("Client " + identity + " received "
- + new String(msg));
- }
- System.out.println("Req #" + (++requestNbr) + " sent");
- client.send(("request " + requestNbr).getBytes(), 0);
- }
- }
-
-}
-
-// The server worker receives messages and replies by re-sending them a random
-// number of times (with random delays between replies)
-class ServerWorker implements Runnable {
- private Context context;
- final private int id;
-
- public ServerWorker(Context context, int id) {
- super();
- this.id = id;
- this.context = context;
- }
-
- public void run() {
- Random randomGenerator = new Random();
- Socket worker = context.socket(ZMQ.DEALER);
- worker.connect("inproc://backend");
- System.out.println("Server worker " + id + " started");
- while (true) {
- byte id[] = worker.recv(0);
- byte msg[] = worker.recv(0);
- System.out.println("Server worker " + id + " received "
- + new String(msg) + " from " + new String(id));
- // sending 0..4 replies
- for (int i = 0; i < randomGenerator.nextInt(5); i++) {
- try {
- // sleeping 1s or 1s/2 or 1s/3 .. 1s/9
- Thread.sleep(1000 / (1 + randomGenerator.nextInt(8)));
- worker.send(id, ZMQ.SNDMORE);
- worker.send(msg, 0);
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
- }
-
- }
-
- }
-}
-
-// The server task uses a pool of workers to handle the messages coming from
-// clients.
//
-// The main server thread forwards messages between the front-end (connected to
-// clients) and the back-end (connected to workers)
+//Asynchronous client-to-server (DEALER to ROUTER)
//
-// The workers handle one request at a time, but a client might have its
-// messages handled by more than one worker
-class ServerTask implements Runnable {
- @Override
- public void run() {
- Context context = ZMQ.context(1);
- ZMQ.Socket frontend = context.socket(ZMQ.ROUTER);
- frontend.bind("tcp://*:5555");
-
- ZMQ.Socket backend = context.socket(ZMQ.DEALER);
- backend.bind("inproc://backend");
-
- for (int i = 0; i < 5; i++)
- (new Thread(new ServerWorker(context, i))).start();
-
- ZMQ.Poller poller = context.poller(2);
- poller.register(frontend, ZMQ.Poller.POLLIN);
- poller.register(backend, ZMQ.Poller.POLLIN);
-
- // It is possible to easily connect frontend to backend using a queue
- // device
- // ZMQQueue queue = new ZMQQueue(context, frontend, backend);
- // queue.run();
- //
- // Doing it manually gives a better understanding of the mechanisms
- // (it's a tuto) and might be useful in debugging
- while (true) {
- poller.poll();
- if (poller.pollin(0)) {
- byte[] id = frontend.recv(0);
- byte[] msg = frontend.recv(0);
- System.out.println("Server received " + new String(msg)
- + " id " + new String(id));
-
- backend.send(id, ZMQ.SNDMORE);
- backend.send(msg, 0);
- }
- if (poller.pollin(1)) {
- byte[] id = backend.recv(0);
- byte[] msg = backend.recv(0);
- System.out.println("Sending to frontend " + new String(msg)
- + " id " + new String(id));
- frontend.send(id, ZMQ.SNDMORE);
- frontend.send(msg, 0);
- }
- }
- }
-}
-
-// The main thread of the Java program, it starts three clients then starts a
-// server
-public class AsyncSrv {
- public static void main(String args[]) {
- // starting three clients
- for (int i = 0; i < 3; i++)
- (new Thread(new ClientTask(i))).start();
-
- // starting server
- new Thread(new ServerTask()).start();
- }
+//While this example runs in a single process, that is just to make
+//it easier to start and stop the example. Each task has its own
+//context and conceptually acts as a separate process.
+
+
+public class asyncsrv
+{
+ //---------------------------------------------------------------------
+ //This is our client task
+ //It connects to the server, and then sends a request once per second
+ //It collects responses as they arrive, and it prints them out. We will
+ //run several client tasks in parallel, each with a different random ID.
+
+ private static Random rand = new Random(System.nanoTime());
+
+ private static class client_task implements Runnable
+ {
+
+ public void run()
+ {
+ ZContext ctx = new ZContext();
+ Socket client = ctx.createSocket(ZMQ.DEALER);
+
+ // Set random identity to make tracing easier
+ String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
+ client.setIdentity(identity.getBytes());
+ client.connect("tcp://localhost:5570");
+
+ PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) };
+
+ int requestNbr = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ // Tick once per second, pulling in arriving messages
+ for (int centitick = 0; centitick < 100; centitick++) {
+ ZMQ.poll(items, 10);
+ if (items[0].isReadable()) {
+ ZMsg msg = ZMsg.recvMsg(client);
+ msg.getLast().print(identity);
+ msg.destroy();
+ }
+ }
+ client.send(String.format("request #%d", ++requestNbr), 0);
+ }
+ ctx.destroy();
+ }
+ }
+
+ //This is our server task.
+ //It uses the multithreaded server model to deal requests out to a pool
+ //of workers and route replies back to clients. One worker can handle
+ //one request at a time but one client can talk to multiple workers at
+ //once.
+
+ private static class server_task implements Runnable
+ {
+ public void run()
+ {
+ ZContext ctx = new ZContext();
+
+ // Frontend socket talks to clients over TCP
+ Socket frontend = ctx.createSocket(ZMQ.ROUTER);
+ frontend.bind("tcp://*:5570");
+
+ // Backend socket talks to workers over inproc
+ Socket backend = ctx.createSocket(ZMQ.DEALER);
+ backend.bind("inproc://backend");
+
+ // Launch pool of worker threads, precise number is not critical
+ for (int threadNbr = 0; threadNbr < 5; threadNbr++)
+ new Thread(new server_worker(ctx)).start();
+
+ // Connect backend to frontend via a proxy
+ ZMQ.proxy(frontend, backend, null);
+
+ ctx.destroy();
+ }
+ }
+
+ //Each worker task works on one request at a time and sends a random number
+ //of replies back, with random delays between replies:
+
+ private static class server_worker implements Runnable
+ {
+ private ZContext ctx;
+
+ public server_worker(ZContext ctx)
+ {
+ this.ctx = ctx;
+ }
+
+ public void run()
+ {
+ Socket worker = ctx.createSocket(ZMQ.DEALER);
+ worker.connect("inproc://backend");
+
+ while (!Thread.currentThread().isInterrupted()) {
+ // The DEALER socket gives us the address envelope and message
+ ZMsg msg = ZMsg.recvMsg(worker);
+ ZFrame address = msg.pop();
+ ZFrame content = msg.pop();
+ assert (content != null);
+ msg.destroy();
+
+ // Send 0..4 replies back
+ int replies = rand.nextInt(5);
+ for (int reply = 0; reply < replies; reply++) {
+ // Sleep for some fraction of a second
+ try {
+ Thread.sleep(rand.nextInt(1000) + 1);
+ } catch (InterruptedException e) {
+ }
+ address.send(worker, ZFrame.REUSE + ZFrame.MORE);
+ content.send(worker, ZFrame.REUSE);
+ }
+ address.destroy();
+ content.destroy();
+ }
+ ctx.destroy();
+ }
+ }
+
+ //The main thread simply starts several clients, and a server, and then
+ //waits for the server to finish.
+
+ public static void main(String[] args) throws Exception
+ {
+ ZContext ctx = new ZContext();
+ new Thread(new client_task()).start();
+ new Thread(new client_task()).start();
+ new Thread(new client_task()).start();
+ new Thread(new server_task()).start();
+
+ // Run for 5 seconds then quit
+ Thread.sleep(5 * 1000);
+ ctx.destroy();
+ }
}
View
4 examples/Java/build
@@ -9,7 +9,7 @@ VERSION="0.3.0-SNAPSHOT"
JEROMQ="jeromq-$VERSION.jar"
CLASSPATH=".:$JEROMQ"
-JZMQ_VERSION="2.1.0-SNAPSHOT" ## This has to be updated when the lastest version is updated
+JZMQ_VERSION="2.1.1-SNAPSHOT"
JZMQ="jzmq-$JZMQ_VERSION.jar"
JZMQ_CLASSPATH=".:$JZMQ"
@@ -31,7 +31,7 @@ if [ /$1/ = /all/ ]; then
elif [ /$1/ = /jzmq/ ]; then
rm -f *.class
echo "Building JZMQ Java examples..."
- for MAIN in `egrep -l "main \(" *.java`; do
+ for MAIN in `egrep -l "main ?\(" *.java`; do
echo "$MAIN"
javac -cp $JZMQ_CLASSPATH $MAIN
done
View
24 examples/Java/hwclient.java
@@ -9,29 +9,21 @@
public class hwclient{
public static void main (String[] args){
- // Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket socket = context.socket(ZMQ.REQ);
- socket.connect ("tcp://localhost:5555");
+ // Socket to talk to server
System.out.println("Connecting to hello world server");
- // Do 10 requests, waiting each time for a response
+ ZMQ.Socket socket = context.socket(ZMQ.REQ);
+ socket.connect ("tcp://localhost:5555");
+
for(int requestNbr = 0; requestNbr != 10; requestNbr++) {
- // Create a "Hello" message.
- // Ensure that the last byte of our "Hello" message is 0 because
- // our "Hello World" server is expecting a 0-terminated string:
- String requestString = "Hello" ;
- byte[] request = requestString.getBytes();
- // Send the message
- System.out.println("Sending request " + requestNbr );
- socket.send(request, 0);
+ String request = "Hello" ;
+ System.out.println("Sending Hello " + requestNbr );
+ socket.send(request.getBytes (), 0);
- // Get the reply.
byte[] reply = socket.recv(0);
- // When displaying reply as a String, omit the last byte because
- // our "Hello World" server has sent us a 0-terminated string:
- System.out.println("Received reply " + requestNbr + ": [" + new String(reply) + "]");
+ System.out.println("Received " + new String (reply) + " " + requestNbr);
}
socket.close();
View
18 examples/Java/hwserver.java
@@ -9,32 +9,24 @@
public class hwserver{
public static void main (String[] args) throws Exception{
- // Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
+
+ // Socket to talk to clients
ZMQ.Socket socket = context.socket(ZMQ.REP);
- System.out.println("Binding hello world server");
socket.bind ("tcp://*:5555");
while (!Thread.currentThread ().isInterrupted ()) {
- // Wait for next request from client
byte[] reply = socket.recv(0);
- System.out.println("Received " + reply.length );
System.out.println("Received " + ": [" + new String(reply) + "]");
- Thread.sleep(1000);
// Create a "Hello" message.
- // Ensure that the last byte of our "Hello" message is 0 because
- // our "Hello World" server is expecting a 0-terminated string:
- String requestString = "Hello" ;
- byte[] request = requestString.getBytes();
- //request[request.length-1]=0; //Sets the last byte to 0
+ String request = "world" ;
// Send the message
- System.out.println("Sending response " + requestString );
- socket.send(request, 0);
+ socket.send(request.getBytes (), 0);
- // Get the reply.
+ Thread.sleep(1000); // Do some 'work'
}
socket.close();
View
36 examples/Java/identity.java
@@ -0,0 +1,36 @@
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.Context;
+import org.zeromq.ZMQ.Socket;
+
+/**
+ * Demonstrate identities as used by the request-reply pattern.
+ */
+public class identity {
+
+ public static void main (String[] args) throws InterruptedException {
+
+ Context context = ZMQ.context(1);
+ Socket sink = context.socket(ZMQ.ROUTER);
+ sink.bind("inproc://example");
+
+ // First allow 0MQ to set the identity, [00] + random 4byte
+ Socket anonymous = context.socket(ZMQ.REQ);
+
+ anonymous.connect("inproc://example");
+ anonymous.send ("ROUTER uses a generated UUID",0);
+ ZHelper.dump (sink);
+
+ // Then set the identity ourself
+ Socket identified = context.socket(ZMQ.REQ);
+ identified.setIdentity("Hello".getBytes ());
+ identified.connect ("inproc://example");
+ identified.send("ROUTER socket uses REQ's socket identity", 0);
+ ZHelper.dump (sink);
+
+ sink.close ();
+ anonymous.close ();
+ identified.close();
+ context.term();
+
+ }
+}
View
32 examples/Java/interrupt.java
@@ -1,17 +1,14 @@
/*
- *
- * Interrupt in Java
- * Shows how to handle Ctrl-C
- *
- * @author Vadim Shalts
- * @email vshalts@gmail.com
+*
+* Interrupt in Java
+* Shows how to handle Ctrl-C
*/
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
public class interrupt {
- public static void main(String[] args) {
+ public static void main (String[] args) {
// Prepare our context and socket
final ZMQ.Context context = ZMQ.context(1);
@@ -22,26 +19,23 @@ public void run() {
socket.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
- try {
- socket.recv(0);
- } catch (ZMQException e) {
- // context destroyed, exit
- if (ZMQ.Error.ETERM.getCode() == e.getErrorCode()) {
- break;
- }
- throw e;
- }
+ try {
+ socket.recv (0);
+ } catch (ZMQException e) {
+ if (e.getErrorCode () == ZMQ.Error.ETERM.getCode ()) {
+ break;
+ }
+ }
}
socket.close();
- System.out.println("ZMQ socket shutdown complete");
}
};
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- System.out.println("ShutdownHook called");
+ System.out.println("W: interrupt received, killing server...");
context.term();
try {
zmqThread.interrupt();
@@ -53,4 +47,4 @@ public void run() {
zmqThread.start();
}
-}
+}
View
346 examples/Java/lbbroker.java
@@ -1,187 +1,185 @@
-/**
- * @author Mariusz Ryndzionek
- * @email mryndzionek@gmail.com
- *
- * Least-recently used (LRU) queue device
- * Clients and workers are shown here in-process
- *
- * While this example runs in a single process, that is just to make
- * it easier to start and stop the example. Each thread has its own
- * context and conceptually acts as a separate process.
- *
- **/
-
import java.util.LinkedList;
import java.util.Queue;
-import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
-class ClientThread extends Thread
-{
- public void run()
- {
- Context context = ZMQ.context(1);
-
- // Prepare our context and sockets
- Socket client = context.socket(ZMQ.REQ);
-
- // Initialize random number generator
- Random srandom = new Random(System.nanoTime());
- String id = String.format("%04x-%04x", srandom.nextInt(0x10000)+1,srandom.nextInt(0x10000)+1);
- client.setIdentity(id.getBytes());
-
- client.connect("ipc://frontend.ipc");
-
- // Send request, get reply
- client.send("HELLO".getBytes(), 0);
- String reply = new String(client.recv(0));
- System.out.println("Client: " + reply);
-
- }
-}
-
-class WorkerThread extends Thread
-{
- public void run()
- {
- Context context = ZMQ.context(1);
-
- // Prepare our context and sockets
- Socket worker = context.socket(ZMQ.REQ);
-
- // Initialize random number generator
- Random srandom = new Random(System.nanoTime());
- String id = String.format("%04x-%04x", srandom.nextInt(0x10000)+1,srandom.nextInt(0x10000)+1);
- worker.setIdentity(id.getBytes()); // Makes tracing easier
-
- worker.connect("ipc://backend.ipc");
-
- // Tell backend we're ready for work
- worker.send("READY".getBytes(), 0);
-
- while(true)
- {
- String address = new String(worker.recv(0));
- String empty = new String(worker.recv(0));
- assert empty.length()==0 | true;
-
- // Get request, send reply
- String request = new String(worker.recv(0));
- System.out.println("Worker: " + request);
-
- worker.send(address.getBytes(), ZMQ.SNDMORE);
- worker.send("".getBytes(), ZMQ.SNDMORE);
- worker.send("OK".getBytes(), 0);
- }
-
- }
-}
-
-public class lruqueue {
-
- public static void main(String[] args) {
- Context context = ZMQ.context(1);
-
- // Prepare our context and sockets
- Socket frontend = context.socket(ZMQ.ROUTER);
- Socket backend = context.socket(ZMQ.ROUTER);
- frontend.bind("ipc://frontend.ipc");
- backend.bind("ipc://backend.ipc");
-
- int client_nbr;
- for (client_nbr = 0; client_nbr < 10; client_nbr++)
- new ClientThread().start();
-
- int worker_nbr;
- for (worker_nbr = 0; worker_nbr < 3; worker_nbr++)
- new WorkerThread().start();
-
- // Logic of LRU loop
- // - Poll backend always, frontend only if 1+ worker ready
- // - If worker replies, queue worker as ready and forward reply
- // to client if necessary
- // - If client requests, pop next worker and send request to it
- //
- // A very simple queue structure with known max size
- Queue<String> worker_queue = new LinkedList<String>();
-
-
- while (!Thread.currentThread().isInterrupted()) {
-
- // Initialize poll set
- Poller items = context.poller(2);
-
- //  Always poll for worker activity on backend
- items.register(backend, Poller.POLLIN);
-
- //  Poll front-end only if we have available workers
- if(worker_queue.size()>0)
- items.register(frontend, Poller.POLLIN);
-
- items.poll();
-
- // Handle worker activity on backend
- if (items.pollin(0)) {
-
- // Queue worker address for LRU routing
- worker_queue.add(new String(backend.recv(0)));
-
- // Second frame is empty
- String empty = new String(backend.recv(0));
- assert empty.length()==0 | true;
-
- // Third frame is READY or else a client reply address
- String client_addr = new String(backend.recv(0));
-
- // If client reply, send rest back to frontend
- if (!client_addr.equals("READY")) {
-
- empty = new String(backend.recv(0));
- assert empty.length()==0 | true;
-
- String reply = new String(backend.recv(0));
- frontend.send(client_addr.getBytes(), ZMQ.SNDMORE);
- frontend.send("".getBytes(), ZMQ.SNDMORE);
- frontend.send(reply.getBytes(), 0);
-
- if (--client_nbr == 0)
- break;
- }
-
- }
-
- if (items.pollin(1)) {
- // Now get next client request, route to LRU worker
- // Client request is [address][empty][request]
- String client_addr = new String(frontend.recv(0));
-
- String empty = new String(frontend.recv(0));
- assert empty.length()==0 | true;
-
- String request = new String(frontend.recv(0));
-
- String worker_addr = worker_queue.poll();//worker_queue [0];
-
- backend.send(worker_addr.getBytes(), ZMQ.SNDMORE);
- backend.send("".getBytes(), ZMQ.SNDMORE);
- backend.send(client_addr.getBytes(), ZMQ.SNDMORE);
- backend.send("".getBytes(), ZMQ.SNDMORE);
- backend.send(request.getBytes(), 0);
-
- }
-
- }
+public class lbbroker {
+
+ private static final int NBR_CLIENTS = 10;
+ private static final int NBR_WORKERS = 3;
+
+ /**
+ * Basic request-reply client using REQ socket
+ */
+ private static class ClientTask extends Thread
+ {
+ public void run()
+ {
+ Context context = ZMQ.context(1);
+
+ // Prepare our context and sockets
+ Socket client = context.socket(ZMQ.REQ);
+ ZHelper.setId (client); // Set a printable identity
+
+ client.connect("ipc://frontend.ipc");
+
+ // Send request, get reply
+ client.send("HELLO");
+ String reply = client.recvStr ();
+ System.out.println("Client: " + reply);
+
+ client.close();
+ context.term();
+ }
+ }
+
+ /**
+ * While this example runs in a single process, that is just to make
+ * it easier to start and stop the example. Each thread has its own
+ * context and conceptually acts as a separate process.
+ * This is the worker task, using a REQ socket to do load-balancing.
+ */
+ private static class WorkerTask extends Thread
+ {
+ public void run()
+ {
+ Context context = ZMQ.context(1);
+ // Prepare our context and sockets
+ Socket worker = context.socket(ZMQ.REQ);
+ ZHelper.setId (worker); // Set a printable identity
+
+ worker.connect("ipc://backend.ipc");
+
+ // Tell backend we're ready for work
+ worker.send("READY");
+
+ while(!Thread.currentThread ().isInterrupted ())
+ {
+ String address = worker.recvStr ();
+ String empty = worker.recvStr ();
+ assert (empty.length() == 0);
+
+ // Get request, send reply
+ String request = worker.recvStr ();
+ System.out.println("Worker: " + request);
+
+ worker.sendMore (address);
+ worker.sendMore ("");
+ worker.send("OK");
+ }
+ worker.close ();
+ context.term ();
+ }
+ }
+
+ /**
+ * This is the main task. It starts the clients and workers, and then
+ * routes requests between the two layers. Workers signal READY when
+ * they start; after that we treat them as ready when they reply with
+ * a response back to a client. The load-balancing data structure is
+ * just a queue of next available workers.
+ */
+ public static void main (String[] args) {
+ Context context = ZMQ.context(1);
+ // Prepare our context and sockets
+ Socket frontend = context.socket(ZMQ.ROUTER);
+ Socket backend = context.socket(ZMQ.ROUTER);
+ frontend.bind("ipc://frontend.ipc");
+ backend.bind("ipc://backend.ipc");
+
+ int clientNbr;
+ for (clientNbr = 0; clientNbr < NBR_CLIENTS; clientNbr++)
+ new ClientTask().start();
+
+ for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
+ new WorkerTask().start();
+
+ // Here is the main loop for the least-recently-used queue. It has two
+ // sockets; a frontend for clients and a backend for workers. It polls
+ // the backend in all cases, and polls the frontend only when there are
+ // one or more workers ready. This is a neat way to use 0MQ's own queues
+ // to hold messages we're not ready to process yet. When we get a client
+ // reply, we pop the next available worker, and send the request to it,
+ // including the originating client identity. When a worker replies, we
+ // re-queue that worker, and we forward the reply to the original client,
+ // using the reply envelope.
+
+ // Queue of available workers
+ Queue<String> workerQueue = new LinkedList<String>();
+
+ while (!Thread.currentThread().isInterrupted()) {
+
+ // Initialize poll set
+ Poller items = new Poller (2);
+
+ //  Always poll for worker activity on backend
+ items.register(backend, Poller.POLLIN);
+
+ //  Poll front-end only if we have available workers
+ if(workerQueue.size() > 0)
+ items.register(frontend, Poller.POLLIN);
+
+ if (items.poll() < 0)
+ break;
+
+ // Handle worker activity on backend
+ if (items.pollin(0)) {
+
+ // Queue worker address for LRU routing
+ workerQueue.add (backend.recvStr ());
+
+ // Second frame is empty
+ String empty = backend.recvStr ();
+ assert (empty.length() == 0);
+
+ // Third frame is READY or else a client reply address
+ String clientAddr = backend.recvStr ();
+
+ // If client reply, send rest back to frontend
+ if (!clientAddr.equals("READY")) {
+
+ empty = backend.recvStr ();
+ assert (empty.length() == 0);
+
+ String reply = backend.recvStr ();
+ frontend.sendMore(clientAddr);
+ frontend.sendMore("");
+ frontend.send(reply);
+
+ if (--clientNbr == 0)
+ break;
+ }
+
+ }
+
+ if (items.pollin(1)) {
+ // Now get next client request, route to LRU worker
+ // Client request is [address][empty][request]
+ String clientAddr = frontend.recvStr ();
+
+ String empty = frontend.recvStr ();
+ assert (empty.length() == 0);
+
+ String request = frontend.recvStr ();
+
+ String workerAddr = workerQueue.poll();
+
+ backend.sendMore (workerAddr);
+ backend.sendMore ("");
+ backend.sendMore (clientAddr );
+ backend.sendMore ("");
+ backend.send (request);
- frontend.close();
- backend.close();
- context.term();
+ }
+ }
- System.exit(0);
+ frontend.close();
+ backend.close();
+ context.term();
- }
+ }
}
View
149 examples/Java/lbbroker2.java
@@ -0,0 +1,149 @@
+import org.zeromq.*;
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZThread.IDetachedRunnable;
+import sun.security.provider.SystemSigner;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class lbbroker2
+{
+ private static final int NBR_CLIENTS = 10;
+ private static final int NBR_WORKERS = 3;
+ private static byte[] WORKER_READY = { '\001' };
+
+ /**
+ * Basic request-reply client using REQ socket
+ */
+ private static class ClientTask implements IDetachedRunnable
+ {
+ @Override
+ public void run (Object ... args)
+ {
+ ZContext context = new ZContext();
+
+ // Prepare our context and sockets
+ Socket client = context.createSocket (ZMQ.REQ);
+ ZHelper.setId (client); // Set a printable identity
+
+ client.connect("ipc://frontend.ipc");
+
+ // Send request, get reply
+ client.send("HELLO");
+ String reply = client.recvStr ();
+ System.out.println("Client: " + reply);
+
+ context.destroy ();
+ }
+ }
+
+ /**
+ * Worker using REQ socket to do load-balancing
+ */
+ private static class WorkerTask implements IDetachedRunnable
+ {
+ @Override
+ public void run (Object ... args)
+ {
+ ZContext context = new ZContext();
+
+ // Prepare our context and sockets
+ Socket worker = context.createSocket (ZMQ.REQ);
+ ZHelper.setId (worker); // Set a printable identity
+
+ worker.connect("ipc://backend.ipc");
+
+ // Tell backend we're ready for work
+ ZFrame frame = new ZFrame (WORKER_READY);
+ frame.send (worker, 0);
+
+ while(true)
+ {
+ ZMsg msg = ZMsg.recvMsg (worker);
+ if (msg == null)
+ break;
+
+ msg.getLast ().reset ("OK");
+ msg.send (worker);
+ }
+ context.destroy ();
+ }
+ }
+
+ /**
+ * This is the main task. This has the identical functionality to
+ * the previous lbbroker example but uses higher level classes to start child threads
+ * to hold the list of workers, and to read and send messages:
+ */
+ public static void main (String[] args) {
+ ZContext context = new ZContext();
+ // Prepare our context and sockets
+ Socket frontend = context.createSocket (ZMQ.ROUTER);
+ Socket backend = context.createSocket (ZMQ.ROUTER);
+ frontend.bind("ipc://frontend.ipc");
+ backend.bind("ipc://backend.ipc");
+
+ int clientNbr;
+ for (clientNbr = 0; clientNbr < NBR_CLIENTS; clientNbr++)
+ ZThread.start (new ClientTask ());
+
+ for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
+ ZThread.start (new WorkerTask ());
+
+ // Queue of available workers
+ Queue<ZFrame> workerQueue = new LinkedList<ZFrame> ();
+
+ // Here is the main loop for the load-balancer. It works the same way
+ // as the previous example, but is a lot shorter because ZMsg class gives
+ // us an API that does more with fewer calls:
+
+ while (!Thread.currentThread().isInterrupted()) {
+
+ // Initialize poll set
+ Poller items = new Poller (2);
+
+ //  Always poll for worker activity on backend
+ items.register(backend, Poller.POLLIN);
+
+ //  Poll front-end only if we have available workers
+ if(workerQueue.size() > 0)
+ items.register(frontend, Poller.POLLIN);
+
+ if (items.poll() < 0)
+ break; // Interrupted
+
+ // Handle worker activity on backend
+ if (items.pollin(0)) {
+
+ ZMsg msg = ZMsg.recvMsg (backend);
+ if (msg == null)
+ break; // Interrupted
+
+ ZFrame identity = msg.unwrap ();
+ // Queue worker address for LRU routing
+ workerQueue.add (identity);
+
+ // Forward message to client if it's not a READY
+ ZFrame frame = msg.getFirst ();
+ if (Arrays.equals (frame.getData (), WORKER_READY))
+ msg.destroy ();
+ else
+ msg.send (frontend);
+ }
+
+ if (items.pollin(1)) {
+ // Get client request, route to first available worker
+ ZMsg msg = ZMsg.recvMsg (frontend);
+ if (msg != null) {
+ msg.wrap (workerQueue.poll ());
+ msg.send (backend);
+ }
+ }
+ }
+
+ context.destroy ();
+ }
+
+}
View
181 examples/Java/lbbroker3.java
@@ -0,0 +1,181 @@
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
+import org.zeromq.ZMsg;
+import org.zeromq.ZLoop;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZThread;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Load-balancing broker
+ * Demonstrates use of the ZLoop API and reactor style
+ *
+ * The client and worker tasks are identical from the previous example.
+ */
+public class lbbroker3
+{
+ private static final int NBR_CLIENTS = 10;
+ private static final int NBR_WORKERS = 3;
+ private static byte[] WORKER_READY = { '\001' };
+
+ /**
+ * Basic request-reply client using REQ socket
+ */
+ private static class ClientTask implements ZThread.IDetachedRunnable
+ {
+ @Override
+ public void run (Object ... args)
+ {
+ ZContext context = new ZContext();
+
+ // Prepare our context and sockets
+ Socket client = context.createSocket (ZMQ.REQ);
+ ZHelper.setId (client); // Set a printable identity
+
+ client.connect("ipc://frontend.ipc");
+
+ // Send request, get reply
+ client.send("HELLO");
+ String reply = client.recvStr ();
+ System.out.println("Client: " + reply);
+
+ context.destroy ();
+ }
+ }
+
+ /**
+ * Worker using REQ socket to do load-balancing
+ */
+ private static class WorkerTask implements ZThread.IDetachedRunnable
+ {
+ @Override
+ public void run (Object ... args)
+ {
+ ZContext context = new ZContext();
+
+ // Prepare our context and sockets
+ Socket worker = context.createSocket (ZMQ.REQ);
+ ZHelper.setId (worker); // Set a printable identity
+
+ worker.connect("ipc://backend.ipc");
+
+ // Tell backend we're ready for work
+ ZFrame frame = new ZFrame (WORKER_READY);
+ frame.send (worker, 0);
+
+ while(true)
+ {
+ ZMsg msg = ZMsg.recvMsg (worker);
+ if (msg == null)
+ break;
+
+ msg.getLast ().reset ("OK");
+ msg.send (worker);
+ }
+ context.destroy ();
+ }
+ }
+
+ //Our load-balancer structure, passed to reactor handlers
+ private static class LBBroker {
+ Socket frontend; // Listen to clients
+ Socket backend; // Listen to workers
+ Queue<ZFrame> workers; // List of ready workers
+ };
+
+ /**
+ * In the reactor design, each time a message arrives on a socket, the
+ * reactor passes it to a handler function. We have two handlers; one
+ * for the frontend, one for the backend:
+ */
+ private static class FrontendHandler implements ZLoop.IZLoopHandler {
+
+ @Override
+ public int handle(ZLoop loop, PollItem item, Object arg_) {
+
+ LBBroker arg = (LBBroker)arg_;
+ ZMsg msg = ZMsg.recvMsg (arg.frontend);
+ if (msg != null) {
+ msg.wrap(arg.workers.poll());
+ msg.send(arg.backend);
+
+ // Cancel reader on frontend if we went from 1 to 0 workers
+ if (arg.workers.size() == 0) {
+ loop.removePoller (new PollItem (arg.frontend, 0));
+ }
+ }
+ return 0;
+ }
+
+ }
+
+ private static class BackendHandler implements ZLoop.IZLoopHandler {
+
+ @Override
+ public int handle(ZLoop loop, PollItem item, Object arg_) {
+
+ LBBroker arg = (LBBroker)arg_;
+ ZMsg msg = ZMsg.recvMsg(arg.backend);
+ if (msg != null) {
+ ZFrame address = msg.unwrap();
+ // Queue worker address for load-balancing
+ arg.workers.add(address);
+
+ // Enable reader on frontend if we went from 0 to 1 workers
+ if (arg.workers.size() == 1) {
+ PollItem newItem = new PollItem (arg.frontend, ZMQ.Poller.POLLIN);
+ loop.addPoller (newItem, frontendHandler, arg);
+ }
+
+ // Forward message to client if it's not a READY
+ ZFrame frame = msg.getFirst();
+ if (Arrays.equals (frame.getData(), WORKER_READY))
+ msg.destroy();
+ else
+ msg.send(arg.frontend);
+ }
+ return 0;
+ }
+ }
+
+ private final static FrontendHandler frontendHandler = new FrontendHandler();
+ private final static BackendHandler backendHandler = new BackendHandler();
+
+ /**
+ * And the main task now sets-up child tasks, then starts its reactor.
+ * If you press Ctrl-C, the reactor exits and the main task shuts down.
+ */
+ public static void main (String[] args) {
+ ZContext context = new ZContext();
+ LBBroker arg = new LBBroker ();
+ // Prepare our context and sockets
+ arg.frontend = context.createSocket (ZMQ.ROUTER);
+ arg.backend = context.createSocket (ZMQ.ROUTER);
+ arg.frontend.bind("ipc://frontend.ipc");
+ arg.backend.bind("ipc://backend.ipc");
+
+ int clientNbr;
+ for (clientNbr = 0; clientNbr < NBR_CLIENTS; clientNbr++)
+ ZThread.start (new ClientTask ());
+
+ for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
+ ZThread.start (new WorkerTask ());
+
+ // Queue of available workers
+ arg.workers = new LinkedList<ZFrame> ();
+
+ // Prepare reactor and fire it up
+ ZLoop reactor = new ZLoop ();
+ PollItem item = new PollItem (arg.backend, ZMQ.Poller.POLLIN);
+ reactor.addPoller (item, backendHandler, arg);
+ reactor.start ();
+
+ context.destroy ();
+ }
+
+}
View
44 examples/Java/msgqueue.java
@@ -1,35 +1,31 @@
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
-import org.zeromq.ZMQQueue;
/**
- * Simple message queuing broker
- * Same as request-reply broker but using QUEUE device.
- *
- * Christophe Huntzinger <chuntz@laposte.net>
- */
+* Simple message queuing broker
+* Same as request-reply broker but using QUEUE device.
+*/
public class msgqueue{
- public static void main (String[] args) {
- // Prepare our context and sockets
- Context context = ZMQ.context(1);
+ public static void main (String[] args) {
+ // Prepare our context and sockets
+ Context context = ZMQ.context(1);
- // Socket facing clients
- Socket frontend = context.socket(ZMQ.ROUTER);
- frontend.bind("tcp://*:5559");
+ // Socket facing clients
+ Socket frontend = context.socket(ZMQ.ROUTER);
+ frontend.bind("tcp://*:5559");
- // Socket facing services
- Socket backend = context.socket(ZMQ.DEALER);
- backend.bind("tcp://*:5560");
+ // Socket facing services
+ Socket backend = context.socket(ZMQ.DEALER);
+ backend.bind("tcp://*:5560");
- // Start built-in device
- ZMQQueue queue = new ZMQQueue(context, frontend, backend);
- // have fun!
-
- // We never get here but clean up anyhow
- frontend.close();
- backend.close();
- context.term();
- }
+ // Start the proxy
+ ZMQ.proxy (frontend, backend, null);
+
+ // We never get here but clean up anyhow
+ frontend.close();
+ backend.close();
+ context.term();
+ }
}
View
58 examples/Java/mspoller.java
@@ -4,39 +4,39 @@
// Reading from multiple sockets in Java
// This version uses ZMQ.Poller
//
-// Nicola Peduzzi <thenikso@gmail.com>
-//
public class mspoller {
- public static void main(String[] args) {
- ZMQ.Context context = ZMQ.context(1);
+ public static void main (String[] args) {
+ ZMQ.Context context = ZMQ.context(1);
- // Connect to task ventilator
- ZMQ.Socket receiver = context.socket(ZMQ.PULL);
- receiver.connect("tcp://localhost:5557");
+ // Connect to task ventilator
+ ZMQ.Socket receiver = context.socket(ZMQ.PULL);
+ receiver.connect("tcp://localhost:5557");
- // Connect to weather server
- ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
- subscriber.connect("tcp://localhost:5556");
- subscriber.subscribe("10001 ".getBytes());
+ // Connect to weather server
+ ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
+ subscriber.connect("tcp://localhost:5556");
+ subscriber.subscribe("10001 ".getBytes());
- // Initialize poll set
- ZMQ.Poller items = context.poller(2);
- items.register(receiver, ZMQ.Poller.POLLIN);
- items.register(subscriber, ZMQ.Poller.POLLIN);
+ // Initialize poll set
+ ZMQ.Poller items = new ZMQ.Poller (2);
+ items.register(receiver, ZMQ.Poller.POLLIN);
+ items.register(subscriber, ZMQ.Poller.POLLIN);
- // Process messages from both sockets
- while (true) {
- byte[] message;
- items.poll();
- if (items.pollin(0)) {
- message = receiver.recv(0);
- // Process task
- }
- if (items.pollin(1)) {
- message = subscriber.recv(0);
- // Process weather update
- }
- }
- }
+ // Process messages from both sockets
+ while (!Thread.currentThread ().isInterrupted ()) {
+ byte[] message;
+ items.poll();
+ if (items.pollin(0)) {
+ message = receiver.recv(0);
+ System.out.println("Process task");
+ }
+ if (items.pollin(1)) {
+ message = subscriber.recv(0);
+ System.out.println("Process weather update");
+ }
+ }
+ receiver.close ();
+ context.term ();
+ }
}
View
58 examples/Java/msreader.java
@@ -4,38 +4,38 @@
// Reading from multiple sockets in Java
// This version uses a simple recv loop
//
-// Nicola Peduzzi <thenikso@gmail.com>
-//
public class msreader {
- public static void main(String[] args) throws Exception {
- // Prepare our context and sockets
- ZMQ.Context context = ZMQ.context(1);
+ public static void main (String[] args) throws Exception {
+ // Prepare our context and sockets
+ ZMQ.Context context = ZMQ.context(1);
- // Connect to task ventilator
- ZMQ.Socket receiver = context.socket(ZMQ.PULL);
- receiver.connect("tcp://localhost:5557");
+ // Connect to task ventilator
+ ZMQ.Socket receiver = context.socket(ZMQ.PULL);
+ receiver.connect("tcp://localhost:5557");
- // Connect to weather server
- ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
- subscriber.connect("tcp://localhost:5556");
- subscriber.subscribe("10001 ".getBytes());
+ // Connect to weather server
+ ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
+ subscriber.connect("tcp://localhost:5556");
+ subscriber.subscribe("10001 ".getBytes());
- // Process messages from both sockets
- // We prioritize traffic from the task ventilator
- while (true) {
- // Process any waiting tasks
- byte[] task;
- while((task = receiver.recv(ZMQ.NOBLOCK)) != null) {
- // process task
- }
- // Process any waiting weather updates
- byte[] update;
- while ((update = subscriber.recv(ZMQ.NOBLOCK)) != null) {
- // process weather update
- }
- // No activity, so sleep for 1 msec
- Thread.sleep(1000000);
- }
- }
+ // Process messages from both sockets
+ // We prioritize traffic from the task ventilator
+ while (!Thread.currentThread ().isInterrupted ()) {
+ // Process any waiting tasks
+ byte[] task;
+ while((task = receiver.recv(ZMQ.DONTWAIT)) != null) {
+ System.out.println("process task");
+ }
+ // Process any waiting weather updates
+ byte[] update;
+ while ((update = subscriber.recv(ZMQ.DONTWAIT)) != null) {
+ System.out.println("process weather update");
+ }
+ // No activity, so sleep for 1 msec
+ Thread.sleep(1000);
+ }
+ subscriber.close ();
+ context.term ();
+ }
}
View
104 examples/Java/mtrelay.java
@@ -1,51 +1,79 @@
-//
-// Multithreaded relay in Java
-//
-// Naveen Chawla <naveen.chwl@gmail.com>
-//
import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.Context;
+import org.zeromq.ZMQ.Socket;
+/**
+ * Multithreaded relay
+ */
public class mtrelay{
- public static void main(String[] args) {
- final ZMQ.Context context = ZMQ.context(1);
+ private static class Step1 extends Thread
+ {
+ private Context context;
+
+ private Step1 (Context context)
+ {
+ this.context = context;
+ }
+
+ @Override
+ public void run(){
+ // Signal downstream to step 2
+ Socket xmitter = context.socket(ZMQ.PAIR);
+ xmitter.connect("inproc://step2");
+ System.out.println ("Step 1 ready, signaling step 2");
+ xmitter.send("READY", 0);
+ xmitter.close ();
+ }
+
+ }
+ private static class Step2 extends Thread
+ {
+ private Context context;
+
+ private Step2 (Context context)
+ {
+ this.context = context;
+ }
+
+ @Override
+ public void run(){
+ // Bind to inproc: endpoint, then start upstream thread
+ Socket receiver = context.socket(ZMQ.PAIR);
+ receiver.bind("inproc://step2");
+ Thread step1 = new Step1 (context);
+ step1.start();
+
+ // Wait for signal
+ receiver.recv(0);
+ receiver.close ();
+
+ // Connect to step3 and tell it we're ready
+ Socket xmitter = context.socket(ZMQ.PAIR);
+ xmitter.connect("inproc://step3");
+ xmitter.send("READY", 0);
+
+ xmitter.close ();
+ }
+
+ }
+ public static void main (String[] args) {
+
+ Context context = ZMQ.context(1);
+
// Bind to inproc: endpoint, then start upstream thread
- ZMQ.Socket receiver = context.socket(ZMQ.PAIR);
+ Socket receiver = context.socket(ZMQ.PAIR);
receiver.bind("inproc://step3");
// Step 2 relays the signal to step 3
- Thread step2 = new Thread(){
- public void run(){
- // Bind to inproc: endpoint, then start upstream thread
- ZMQ.Socket receiver = context.socket(ZMQ.PAIR);
- receiver.bind("inproc://step2");
- Thread step1 = new Thread(){
- public void run(){
- // Signal downstream to step 2
- ZMQ.Socket sender = context.socket(ZMQ.PAIR);
- sender.connect("inproc://step2");
- sender.send("".getBytes(),0);
- }
- };
-
- step1.start();
-
- // Wait for signal
- byte[] message;
- message=receiver.recv(0);
-
- // Signal downstream to step 3
- ZMQ.Socket sender = context.socket(ZMQ.PAIR);
- sender.connect("inproc://step3");
- sender.send(message,0);
- }
- };
+ Thread step2 = new Step2 (context);
step2.start();
// Wait for signal
- byte[] message;
- message = receiver.recv(0);
-
+ receiver.recv(0);
+ receiver.close ();
+
System.out.println ("Test successful!");
+ context.term ();
}
-}
+}
View
117 examples/Java/mtserver.java
@@ -1,62 +1,63 @@
-/*
- * Multithreaded Hello World server in Java
- *
- * @author Vadim Shalts
- * @email vshalts@gmail.com
- *
- */
-
import org.zeromq.ZMQ;
-import org.zeromq.ZMQQueue;
-
-class mtserver {
- static void main(String[] args) {
-
- final ZMQ.Context context = ZMQ.context(1);
-
- ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
- clients.bind ("tcp://*:5555");
-
- ZMQ.Socket workers = context.socket(ZMQ.DEALER);
- workers.bind ("inproc://workers");
+import org.zeromq.ZMQ.Context;
+import org.zeromq.ZMQ.Socket;
- for(int thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
- Thread worker_routine = new Thread() {
-
- @Override
- public void run() {
- ZMQ.Socket socket = context.socket(ZMQ.REP);
- socket.connect ("inproc://workers");
-
- while (true) {
-
- // Wait for next request from client (C string)
- byte[] request = socket.recv (0);
- System.out.println ("Received request: ["+new String(request,0,request.length-1)+"]");
-
- // Do some 'work'
- try {
- Thread.sleep (1000);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
-
- // Send reply back to client (C string)
- byte[] reply = "World ".getBytes();
- reply[reply.length-1] = 0; //Sets the last byte of the reply to 0
- socket.send(reply, 0);
- }
+/**
+ * Multi threaded Hello World server
+ */
+public class mtserver {
+
+ private static class Worker extends Thread
+ {
+ private Context context;
+
+ private Worker (Context context)
+ {
+ this.context = context;
+ }
+ @Override
+ public void run() {
+ ZMQ.Socket socket = context.socket(ZMQ.REP);
+ socket.connect ("inproc://workers");
+
+ while (true) {
+
+ // Wait for next request from client (C string)
+ String request = socket.recvStr (0);
+ System.out.println ( Thread.currentThread().getName() + " Received request: [" + request + "]");
+
+ // Do some 'work'
+ try {
+ Thread.sleep (1000);
+ } catch (InterruptedException e) {
+ }
+
+ // Send reply back to client (C string)
+ socket.send("world", 0);
}
- };
- worker_routine.start();
- }
- // Connect work threads to client threads via a queue
- ZMQQueue zMQQueue = new ZMQQueue(context,clients, workers);
- zMQQueue.run();
-
- // We never get here but clean up anyhow
- clients.close();
- workers.close();
- context.term();
- }
+ }
+ }
+
+ public static void main (String[] args) {
+
+ Context context = ZMQ.context(1);
+
+ Socket clients = context.socket(ZMQ.ROUTER);
+ clients.bind ("tcp://*:5555");
+
+ Socket workers = context.socket(ZMQ.DEALER);
+ workers.bind ("inproc://workers");
+
+ for(int thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
+ Thread worker = new Worker (context);
+ worker.start();
+ }
+ // Connect work threads to client threads via a queue
+ ZMQ.proxy (clients, workers, null);
+
+ // We never get here but clean up anyhow
+ clients.close();
+ workers.close();
+ context.term();
+ }
}
View
68 examples/Java/peering1.java
@@ -0,0 +1,68 @@
+import java.util.Random;
+
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZContext;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Socket;
+
+//Broker peering simulation (part 1)
+//Prototypes the state flow
+//
+
+public class peering1
+{
+
+ public static void main(String[] argv)
+ {
+ // First argument is this broker's name
+ // Other arguments are our peers' names
+ //
+ if (argv.length < 1) {
+ System.out.println("syntax: peering1 me {you}\n");
+ System.exit(-1);
+ }
+ String self = argv[0];
+ System.out.println(String.format("I: preparing broker at %s\n", self));
+ Random rand = new Random(System.nanoTime());
+
+ ZContext ctx = new ZContext();
+
+ // Bind state backend to endpoint
+ Socket statebe = ctx.createSocket(ZMQ.PUB);
+ statebe.bind(String.format("ipc://%s-state.ipc", self));
+
+ // Connect statefe to all peers
+ Socket statefe = ctx.createSocket(ZMQ.SUB);
+ statefe.subscribe("".getBytes());
+ int argn;
+ for (argn = 1; argn < argv.length; argn++) {
+ String peer = argv[argn];
+ System.out.printf("I: connecting to state backend at '%s'\n", peer);
+ statefe.connect(String.format("ipc://%s-state.ipc", peer));
+ }
+ // The main loop sends out status messages to peers, and collects
+ // status messages back from peers. The zmq_poll timeout defines
+ // our own heartbeat:
+
+ while (true) {
+ // Poll for activity, or 1 second timeout
+ PollItem items[] = {new PollItem(statefe, Poller.POLLIN)};
+ int rc = ZMQ.poll(items, 1000);
+ if (rc == -1)
+ break; // Interrupted
+
+ // Handle incoming status messages
+ if (items[0].isReadable()) {
+ String peer_name = new String(statefe.recv(0));
+ String available = new String(statefe.recv(0));
+ System.out.printf("%s - %s workers free\n", peer_name, available);
+ } else {
+ // Send random values for worker availability
+ statebe.send(self, ZMQ.SNDMORE);
+ statebe.send(String.format("%d", rand.nextInt(10)), 0);
+ }
+ }
+ ctx.destroy();
+ }
+}
View
244 examples/Java/peering2.java
@@ -0,0 +1,244 @@
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+
+// Broker peering simulation (part 2)
+// Prototypes the request-reply flow
+
+public class peering2
+{
+
+ private static final int NBR_CLIENTS = 10;
+ private static final int NBR_WORKERS = 3;
+ private static final String WORKER_READY = "\001"; // Signals worker is ready
+
+ // Our own name; in practice this would be configured per node
+ private static String self;
+
+ // The client task does a request-reply dialog using a standard
+ // synchronous REQ socket:
+ private static class client_task extends Thread
+ {
+ @Override
+ public void run()
+ {
+ ZContext ctx = new ZContext();
+ Socket client = ctx.createSocket(ZMQ.REQ);
+ client.connect(String.format("ipc://%s-localfe.ipc", self));
+
+ while (true) {
+ // Send request, get reply
+ client.send("HELLO", 0);
+ String reply = client.recvStr(0);
+ if (reply == null)
+ break; // Interrupted
+ System.out.printf("Client: %s\n", reply);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ ctx.destroy();
+ }
+ }
+
+ // The worker task plugs into the LRU routing dialog using a REQ
+ // socket:
+
+ private static class worker_task extends Thread
+ {
+ @Override
+ public void run()
+ {
+ ZContext ctx = new ZContext();
+ Socket worker = ctx.createSocket(ZMQ.REQ);
+ worker.connect(String.format("ipc://%s-localbe.ipc", self));
+
+ // Tell broker we're ready for work
+ ZFrame frame = new ZFrame(WORKER_READY);
+ frame.send(worker, 0);
+
+ while (true) {
+ // Send request, get reply
+ ZMsg msg = ZMsg.recvMsg(worker, 0);
+ if (msg == null)
+ break; // Interrupted
+ msg.getLast().print("Worker: ");
+ msg.getLast().reset("OK");
+ msg.send(worker);
+
+ }
+ ctx.destroy();
+ }
+ }
+
+ // The main task begins by setting-up its frontend and backend sockets
+ // and then starting its client and worker tasks:
+ public static void main(String[] argv)
+ {
+ // First argument is this broker's name
+ // Other arguments are our peers' names
+ //
+ if (argv.length < 1) {
+ System.out.println("syntax: peering2 me {you}");
+ System.exit(-1);
+ }
+ self = argv[0];
+ System.out.printf("I: preparing broker at %s\n", self);
+ Random rand = new Random(System.nanoTime());
+
+ ZContext ctx = new ZContext();
+
+ // Bind cloud frontend to endpoint
+ Socket cloudfe = ctx.createSocket(ZMQ.ROUTER);
+ cloudfe.setIdentity(self.getBytes());
+ cloudfe.bind(String.format("ipc://%s-cloud.ipc", self));
+
+ // Connect cloud backend to all peers
+ Socket cloudbe = ctx.createSocket(ZMQ.ROUTER);
+ cloudbe.setIdentity(self.getBytes());
+ int argn;
+ for (argn = 1; argn < argv.length; argn++) {
+ String peer = argv[argn];
+ System.out.printf("I: connecting to cloud forintend at '%s'\n", peer);
+ cloudbe.connect(String.format("ipc://%s-cloud.ipc", peer));
+ }
+
+ // Prepare local frontend and backend
+ Socket localfe = ctx.createSocket(ZMQ.ROUTER);
+ localfe.bind(String.format("ipc://%s-localfe.ipc", self));
+ Socket localbe = ctx.createSocket(ZMQ.ROUTER);
+ localbe.bind(String.format("ipc://%s-localbe.ipc", self));
+
+ // Get user to tell us when we can start
+ System.out.println("Press Enter when all brokers are started: ");
+ try {
+ System.in.read();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // Start local workers
+ int worker_nbr;
+ for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
+ new worker_task().start();
+
+ // Start local clients
+ int client_nbr;
+ for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
+ new client_task().start();
+
+ // Here we handle the request-reply flow. We're using the LRU approach
+ // to poll workers at all times, and clients only when there are one or
+ // more workers available.
+
+ // Least recently used queue of available workers
+ int capacity = 0;
+ ArrayList<ZFrame> workers = new ArrayList<ZFrame>();
+
+ while (true) {
+ // First, route any waiting replies from workers
+ PollItem backends[] = {
+ new PollItem(localbe, Poller.POLLIN),
+ new PollItem(cloudbe, Poller.POLLIN)
+ };
+ // If we have no workers anyhow, wait indefinitely
+ int rc = ZMQ.poll(backends,
+ capacity > 0 ? 1000 : -1);
+ if (rc == -1)
+ break; // Interrupted
+ // Handle reply from local worker
+ ZMsg msg = null;
+ if (backends[0].isReadable()) {
+ msg = ZMsg.recvMsg(localbe);
+ if (msg == null)
+ break; // Interrupted
+ ZFrame address = msg.unwrap();
+ workers.add(address);
+ capacity++;
+
+ // If it's READY, don't route the message any further
+ ZFrame frame = msg.getFirst();
+ if (new String(frame.getData()).equals(WORKER_READY)) {
+ msg.destroy();
+ msg = null;
+ }
+ }
+ // Or handle reply from peer broker
+ else if (backends[1].isReadable()) {
+ msg = ZMsg.recvMsg(cloudbe);
+ if (msg == null)
+ break; // Interrupted
+ // We don't use peer broker address for anything
+ ZFrame address = msg.unwrap();
+ address.destroy();
+ }
+ // Route reply to cloud if it's addressed to a broker
+ for (argn = 1; msg != null && argn < argv.length; argn++) {
+ byte[] data = msg.getFirst().getData();
+ if (argv[argn].equals(new String(data))) {
+ msg.send(cloudfe);
+ msg = null;
+ }
+ }
+ // Route reply to client if we still need to
+ if (msg != null)
+ msg.send(localfe);
+
+ // Now we route as many client requests as we have worker capacity
+ // for. We may reroute requests from our local frontend, but not from //
+ // the cloud frontend. We reroute randomly now, just to test things
+ // out. In the next version we'll do this properly by calculating
+ // cloud capacity://
+
+ while (capacity > 0) {
+ PollItem frontends[] = {
+ new PollItem(localfe, Poller.POLLIN),
+ new PollItem(cloudfe, Poller.POLLIN)
+ };
+ rc = ZMQ.poll(frontends, 0);
+ assert (rc >= 0);
+ int reroutable = 0;
+ // We'll do peer brokers first, to prevent starvation
+ if (frontends[1].isReadable()) {
+ msg = ZMsg.recvMsg(cloudfe);
+ reroutable = 0;
+ } else if (frontends[0].isReadable()) {
+ msg = ZMsg.recvMsg(localfe);
+ reroutable = 1;
+ } else
+ break; // No work, go back to backends
+
+ // If reroutable, send to cloud 20% of the time
+ // Here we'd normally use cloud status information
+ //
+ if (reroutable != 0 && argv.length > 1 && rand.nextInt(5) == 0) {
+ // Route to random broker peer
+ int random_peer = rand.nextInt(argv.length - 1) + 1;
+ msg.push(argv[random_peer]);
+ msg.send(cloudbe);
+ } else {
+ ZFrame frame = workers.remove(0);
+ msg.wrap(frame);
+ msg.send(localbe);
+ capacity--;
+ }
+ }
+ }
+ // When we're done, clean up properly
+ while (workers.size() > 0) {
+ ZFrame frame = workers.remove(0);
+ frame.destroy();
+ }
+
+ ctx.destroy();
+ }
+}
View
324 examples/Java/peering3.java
@@ -0,0 +1,324 @@
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+
+// Broker peering simulation (part 3)
+// Prototypes the full flow of status and tasks
+
+public class peering3
+{
+
+ private static final int NBR_CLIENTS = 10;
+ private static final int NBR_WORKERS = 5;
+ private static final String WORKER_READY = "\001"; // Signals worker is ready
+
+ // Our own name; in practice this would be configured per node
+ private static String self;
+
+ // This is the client task. It issues a burst of requests and then
+ // sleeps for a few seconds. This simulates sporadic activity; when
+ // a number of clients are active at once, the local workers should
+ // be overloaded. The client uses a REQ socket for requests and also
+ // pushes statistics to the monitor socket:
+ private static class client_task extends Thread
+ {
+ @Override
+ public void run()
+ {
+ ZContext ctx = new ZContext();
+ Socket client = ctx.createSocket(ZMQ.REQ);
+ client.connect(String.format("ipc://%s-localfe.ipc", self));
+ Socket monitor = ctx.createSocket(ZMQ.PUSH);
+ monitor.connect(String.format("ipc://%s-monitor.ipc", self));
+ Random rand = new Random(System.nanoTime());
+
+ while (true) {
+
+ try {
+ Thread.sleep(rand.nextInt(5) * 1000);
+ } catch (InterruptedException e1) {
+ }
+ int burst = rand.nextInt(15);
+
+ while (burst > 0) {
+ String taskId = String.format("%04X", rand.nextInt(10000));
+ // Send request, get reply
+ client.send(taskId, 0);
+
+ // Wait max ten seconds for a reply, then complain
+ PollItem pollSet[] = {new PollItem(client, Poller.POLLIN)};
+ int rc = ZMQ.poll(pollSet, 10 * 1000);
+ if (rc == -1)
+ break; // Interrupted
+
+ if (pollSet[0].isReadable()) {
+ String reply = client.recvStr(0);
+ if (reply == null)
+ break; // Interrupted
+ // Worker is supposed to answer us with our task id
+ assert (reply.equals(taskId));
+ monitor.send(String.format("%s", reply), 0);
+ } else {
+ monitor.send(
+ String.format("E: CLIENT EXIT - lost task %s", taskId), 0);
+ ctx.destroy();
+ return;
+ }
+ burst--;
+ }
+ }
+ }
+ }
+
+ // This is the worker task, which uses a REQ socket to plug into the LRU
+ // router. It's the same stub worker task you've seen in other examples:
+
+ private static class worker_task extends Thread
+ {
+ @Override
+ public void run()
+ {
+ Random rand = new Random(System.nanoTime());
+ ZContext ctx = new ZContext();
+ Socket worker = ctx.createSocket(ZMQ.REQ);
+ worker.connect(String.format("ipc://%s-localbe.ipc", self));
+
+ // Tell broker we're ready for work
+ ZFrame frame = new ZFrame(WORKER_READY);
+ frame.send(worker, 0);
+
+ while (true) {
+ // Send request, get reply
+ ZMsg msg = ZMsg.recvMsg(worker, 0);
+ if (msg == null)
+ break; // Interrupted
+
+ // Workers are busy for 0/1 seconds
+ try {
+ Thread.sleep(rand.nextInt(2) * 1000);
+ } catch (InterruptedException e) {
+ }
+
+ msg.send(worker);
+
+ }
+ ctx.destroy();
+ }
+ }
+
+ // The main task begins by setting-up all its sockets. The local frontend
+ // talks to clients, and our local backend talks to workers. The cloud
+ // frontend talks to peer brokers as if they were clients, and the cloud
+ // backend talks to peer brokers as if they were workers. The state
+ // backend publishes regular state messages, and the state frontend
+ // subscribes to all state backends to collect these messages. Finally,
+ // we use a PULL monitor socket to collect printable messages from tasks:
+ public static void main(String[] argv)
+ {
+ // First argument is this broker's name
+ // Other arguments are our peers' names
+ //
+ if (argv.length < 1) {
+ System.out.println("syntax: peering3 me {you}");
+ System.exit(-1);
+ }
+ self = argv[0];
+ System.out.printf("I: preparing broker at %s\n", self);
+ Random rand = new Random(System.nanoTime());
+
+ ZContext ctx = new ZContext();
+
+ // Prepare local frontend and backend
+ Socket localfe = ctx.createSocket(ZMQ.ROUTER);
+ localfe.bind(String.format("ipc://%s-localfe.ipc", self));
+ Socket localbe = ctx.createSocket(ZMQ.ROUTER);
+ localbe.bind(String.format("ipc://%s-localbe.ipc", self));
+
+
+ // Bind cloud frontend to endpoint
+ Socket cloudfe = ctx.createSocket(ZMQ.ROUTER);
+ cloudfe.setIdentity(self.getBytes());
+ cloudfe.bind(String.format("ipc://%s-cloud.ipc", self));
+
+ // Connect cloud backend to all peers
+ Socket cloudbe = ctx.createSocket(ZMQ.ROUTER);
+ cloudbe.setIdentity(self.getBytes());
+ int argn;
+ for (argn = 1; argn < argv.length; argn++) {
+ String peer = argv[argn];
+ System.out.printf("I: connecting to cloud forintend at '%s'\n", peer);
+ cloudbe.connect(String.format("ipc://%s-cloud.ipc", peer));
+ }
+
+ // Bind state backend to endpoint
+ Socket statebe = ctx.createSocket(ZMQ.PUB);
+ statebe.bind(String.format("ipc://%s-state.ipc", self));
+
+ // Connect statefe to all peers
+ Socket statefe = ctx.createSocket(ZMQ.SUB);
+ statefe.subscribe("".getBytes());
+ for (argn = 1; argn < argv.length; argn++) {
+ String peer = argv[argn];
+ System.out.printf("I: connecting to state backend at '%s'\n", peer);
+ statefe.connect(String.format("ipc://%s-state.ipc", peer));
+ }
+
+ // Prepare monitor socket
+ Socket monitor = ctx.createSocket(ZMQ.PULL);
+ monitor.bind(String.format("ipc://%s-monitor.ipc", self));
+
+ // Start local workers
+ int worker_nbr;
+ for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
+ new worker_task().start();
+
+ // Start local clients
+ int client_nbr;
+ for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
+ new client_task().start();
+
+ // Queue of available workers
+ int localCapacity = 0;
+ int cloudCapacity = 0;
+ ArrayList<ZFrame> workers = new ArrayList<ZFrame>();
+
+ // The main loop has two parts. First we poll workers and our two service
+ // sockets (statefe and monitor), in any case. If we have no ready workers,
+ // there's no point in looking at incoming requests. These can remain on
+ // their internal 0MQ queues:
+
+ while (true) {
+ // First, route any waiting replies from workers
+ PollItem primary[] = {
+ new PollItem(localbe, Poller.POLLIN),
+ new PollItem(cloudbe, Poller.POLLIN),
+ new PollItem(statefe, Poller.POLLIN),
+ new PollItem(monitor, Poller.POLLIN)
+ };
+ // If we have no workers anyhow, wait indefinitely
+ int rc = ZMQ.poll(primary,
+ localCapacity > 0 ? 1000 : -1);
+ if (rc == -1)
+ break; // Interrupted
+
+ // Track if capacity changes during this iteration
+ int previous = localCapacity;
+
+
+ // Handle reply from local worker
+ ZMsg msg = null;
+ if (primary[0].isReadable()) {
+ msg = ZMsg.recvMsg(localbe);
+ if (msg == null)
+ break; // Interrupted
+ ZFrame address = msg.unwrap();
+ workers.add(address);
+ localCapacity++;
+
+ // If it's READY, don't route the message any further
+ ZFrame frame = msg.getFirst();
+ if (new String(frame.getData()).equals(WORKER_READY)) {
+ msg.destroy();
+ msg = null;
+ }
+ }
+ // Or handle reply from peer broker
+ else if (primary[1].isReadable()) {
+ msg = ZMsg.recvMsg(cloudbe);
+ if (msg == null)
+ break; // Interrupted
+ // We don't use peer broker address for anything
+ ZFrame address = msg.unwrap();
+ address.destroy();
+ }
+ // Route reply to cloud if it's addressed to a broker
+ for (argn = 1; msg != null && argn < argv.length; argn++) {
+ byte[] data = msg.getFirst().getData();
+ if (argv[argn].equals(new String(data))) {
+ msg.send(cloudfe);
+ msg = null;