Skip to content
Browse files

Merge pull request #312 from miniway/master

chapter 4 java examples
  • Loading branch information...
2 parents 4f79d32 + 1195b88 commit 796a19dffe3df4ab3d52d5d71f3ed123e514bf3e @hintjens hintjens committed
View
4 examples/Java/MDP.java
@@ -1,6 +1,4 @@
/**
- * (c) 2011 Arkadiusz Orzechowski
- *
* This file is part of ZGuide
*
* ZGuide is free software; you can redistribute it and/or modify it under
@@ -22,8 +20,6 @@
/**
* Majordomo Protocol definitions, Java version
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
*/
public enum MDP {
View
40 examples/Java/ZHelper.java
@@ -1,11 +1,16 @@
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.List;
import java.util.Random;
public class ZHelper
{
private static Random rand = new Random(System.currentTimeMillis ());
-
+
/**
* Receives all message parts from socket, prints neatly
*/
@@ -36,21 +41,20 @@ public static void setId (Socket sock)
sock.setIdentity (identity.getBytes ());
}
-
-
- public static List<Socket> buildZPipe(Context ctx) {
- Socket socket1 = ctx.socket(ZMQ.PAIR);
- socket1.setLinger(0);
- socket1.setHWM(1);
-
- Socket socket2 = ctx.socket(ZMQ.PAIR);
- socket2.setLinger(0);
- socket2.setHWM(1);
-
- String iface = "inproc://" + new BigInteger(130, rand).toString(32);
- socket1.bind(iface);
- socket2.connect(iface);
-
- return Arrays.asList(socket1, socket2);
- }
+
+ public static List<Socket> buildZPipe(Context ctx) {
+ Socket socket1 = ctx.socket(ZMQ.PAIR);
+ socket1.setLinger(0);
+ socket1.setHWM(1);
+
+ Socket socket2 = ctx.socket(ZMQ.PAIR);
+ socket2.setLinger(0);
+ socket2.setHWM(1);
+
+ String iface = "inproc://" + new BigInteger(130, rand).toString(32);
+ socket1.bind(iface);
+ socket2.connect(iface);
+
+ return Arrays.asList(socket1, socket2);
+ }
}
View
32 examples/Java/asyncsrv.java
@@ -1,12 +1,12 @@
-import java.util.Random;
-
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
-import org.zeromq.ZMQ.Poller;
-import org.zeromq.ZMsg;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+import org.zeromq.ZMQ.Poller;
+
+import java.util.Random;
//
//Asynchronous client-to-server (DEALER to ROUTER)
@@ -26,11 +26,9 @@
private static Random rand = new Random(System.nanoTime());
- private static class client_task implements Runnable
- {
+ private static class client_task implements Runnable {
- public void run()
- {
+ public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.DEALER);
@@ -64,10 +62,8 @@ public void run()
//one request at a time but one client can talk to multiple workers at
//once.
- private static class server_task implements Runnable
- {
- public void run()
- {
+ private static class server_task implements Runnable {
+ public void run() {
ZContext ctx = new ZContext();
// Frontend socket talks to clients over TCP
@@ -92,17 +88,14 @@ public void run()
//Each worker task works on one request at a time and sends a random number
//of replies back, with random delays between replies:
- private static class server_worker implements Runnable
- {
+ private static class server_worker implements Runnable {
private ZContext ctx;
- public server_worker(ZContext ctx)
- {
+ public server_worker(ZContext ctx) {
this.ctx = ctx;
}
- public void run()
- {
+ public void run() {
Socket worker = ctx.createSocket(ZMQ.DEALER);
worker.connect("inproc://backend");
@@ -135,8 +128,7 @@ public void run()
//The main thread simply starts several clients, and a server, and then
//waits for the server to finish.
- public static void main(String[] args) throws Exception
- {
+ public static void main(String[] args) throws Exception {
ZContext ctx = new ZContext();
new Thread(new client_task()).start();
new Thread(new client_task()).start();
View
327 examples/Java/bstar.java
@@ -0,0 +1,327 @@
+/*
+ * bstar.java
+ *
+ * -------------------------------------------------------------------------
+ * Copyright (c) 2012-2013 InfiniLoop Corporation
+ * Copyright other contributors as noted in the AUTHORS file.
+ *
+ * This file is part of Zyni, an open-source message based application framework.
+ *
+ * This is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU Lesser General Public License as published by the
+ * Free Software Foundation; either version 3 of the License, or (at your
+ * option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTA-
+ * BILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+ * Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program. If not, see http://www.gnu.org/licenses/.
+ * =========================================================================
+ */
+
+
+import org.zeromq.ZContext;
+import org.zeromq.ZLoop;
+import org.zeromq.ZLoop.IZLoopHandler;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+
+// bstar class - Binary Star reactor
+public class bstar
+{
+ // States we can be in at any point in time
+ enum State {
+ STATE_PRIMARY, // Primary, waiting for peer to connect
+ STATE_BACKUP, // Backup, waiting for peer to connect
+ STATE_ACTIVE, // Active - accepting connections
+ STATE_PASSIVE // Passive - not accepting connections
+ }
+
+ // Events, which start with the states our peer can be in
+ enum Event {
+ PEER_PRIMARY, // HA peer is pending primary
+ PEER_BACKUP, // HA peer is pending backup
+ PEER_ACTIVE, // HA peer is active
+ PEER_PASSIVE, // HA peer is passive
+ CLIENT_REQUEST // Client makes request
+ }
+
+ private ZContext ctx; // Our private context
+ private ZLoop loop; // Reactor loop
+ private Socket statepub; // State publisher
+ private Socket statesub; // State subscriber
+ private State state; // Current state
+ private Event event; // Current event
+ private long peerExpiry; // When peer is considered 'dead'
+ private ZLoop.IZLoopHandler voterFn; // Voting socket handler
+ private Object voterArg; // Arguments for voting handler
+ private ZLoop.IZLoopHandler activeFn; // Call when become active
+ private Object activeArg; // Arguments for handler
+ private ZLoop.IZLoopHandler passiveFn; // Call when become passive
+ private Object passiveArg; // Arguments for handler
+
+ // The finite-state machine is the same as in the proof-of-concept server.
+ // To understand this reactor in detail, first read the ZLoop class.
+ // .skip
+
+ // We send state information this often
+ // If peer doesn't respond in two heartbeats, it is 'dead'
+ private final static int BSTAR_HEARTBEAT = 1000; // In msecs
+
+ // Binary Star finite state machine (applies event to state)
+ // Returns false if there was an exception, true if event was valid.
+
+ private boolean execute()
+ {
+ boolean rc = true;
+
+ // Primary server is waiting for peer to connect
+ // Accepts CLIENT_REQUEST events in this state
+ if (state == State.STATE_PRIMARY) {
+ if (event == Event.PEER_BACKUP) {
+ System.out.printf ("I: connected to backup (passive), ready active\n");
+ state = State.STATE_ACTIVE;
+ if (activeFn != null)
+ activeFn.handle(loop, null, activeArg);
+ }
+ else
+ if (event == Event.PEER_ACTIVE) {
+ System.out.printf ("I: connected to backup (active), ready passive\n");
+ state = State.STATE_PASSIVE;
+ if (passiveFn != null)
+ passiveFn.handle(loop, null, passiveArg);
+ }
+ else
+ if (event == Event.CLIENT_REQUEST) {
+ // Allow client requests to turn us into the active if we've
+ // waited sufficiently long to believe the backup is not
+ // currently acting as active (i.e., after a failover)
+ assert (peerExpiry > 0);
+ if (System.currentTimeMillis() >= peerExpiry) {
+ System.out.printf ("I: request from client, ready as active\n");
+ state = State.STATE_ACTIVE;
+ if (activeFn != null)
+ activeFn.handle(loop, null, activeArg);
+ } else
+ // Don't respond to clients yet - it's possible we're
+ // performing a failback and the backup is currently active
+ rc = false;
+ }
+ }
+ else
+ if (state == State.STATE_BACKUP) {
+ if (event == Event.PEER_ACTIVE) {
+ System.out.printf ("I: connected to primary (active), ready passive\n");
+ state = State.STATE_PASSIVE;
+ if (passiveFn != null)
+ passiveFn.handle(loop, null, passiveArg);
+ }
+ else
+ // Reject client connections when acting as backup
+ if (event == Event.CLIENT_REQUEST)
+ rc = false;
+ }
+ else
+ // .split active and passive states
+ // These are the ACTIVE and PASSIVE states:
+ if (state == State.STATE_ACTIVE) {
+ if (event == Event.PEER_ACTIVE) {
+ // Two actives would mean split-brain
+ System.out.printf ("E: fatal error - dual actives, aborting\n");
+ rc = false;
+ }
+ }
+ else
+ // Server is passive
+ // CLIENT_REQUEST events can trigger failover if peer looks dead
+ if (state == State.STATE_PASSIVE) {
+ if (event == Event.PEER_PRIMARY) {
+ // Peer is restarting - become active, peer will go passive
+ System.out.printf ("I: primary (passive) is restarting, ready active\n");
+ state = State.STATE_ACTIVE;
+ }
+ else
+ if (event == Event.PEER_BACKUP) {
+ // Peer is restarting - become active, peer will go passive
+ System.out.printf ("I: backup (passive) is restarting, ready active\n");
+ state = State.STATE_ACTIVE;
+ }
+ else
+ if (event == Event.PEER_PASSIVE) {
+ // Two passives would mean cluster would be non-responsive
+ System.out.printf ("E: fatal error - dual passives, aborting\n");
+ rc = false;
+ }
+ else
+ if (event == Event.CLIENT_REQUEST) {
+ // Peer becomes active if timeout has passed
+ // It's the client request that triggers the failover
+ assert (peerExpiry > 0);
+ if (System.currentTimeMillis () >= peerExpiry) {
+ // If peer is dead, switch to the active state
+ System.out.printf ("I: failover successful, ready active\n");
+ state = State.STATE_ACTIVE;
+ }
+ else
+ // If peer is alive, reject connections
+ rc = false;
+
+ // Call state change handler if necessary
+ if (state == State.STATE_ACTIVE && activeFn != null)
+ activeFn.handle(loop, null, activeArg);
+ }
+ }
+ return rc;
+ }
+
+ private void updatePeerExpiry()
+ {
+ peerExpiry = System.currentTimeMillis() + 2 * BSTAR_HEARTBEAT;
+ }
+
+
+ // Reactor event handlers...
+
+ // Publish our state to peer
+ private static IZLoopHandler SendState = new IZLoopHandler () {
+
+ @Override
+ public int handle(ZLoop loop, PollItem item, Object arg)
+ {
+ bstar self = (bstar) arg;
+ self.statepub.send(String.format("%d", self.state.ordinal()));
+ return 0;
+ }
+ };
+
+ // Receive state from peer, execute finite state machine
+ private static IZLoopHandler RecvState = new IZLoopHandler () {
+
+ @Override
+ public int handle(ZLoop loop, PollItem item, Object arg)
+ {
+ bstar self = (bstar) arg;
+ String state = item.getSocket().recvStr();
+ if (state != null) {
+ self.event = Event.values()[Integer.parseInt(state)];
+ self.updatePeerExpiry();
+ }
+ return self.execute() ? 0 : -1;
+ }
+ };
+
+ // Application wants to speak to us, see if it's possible
+ private static IZLoopHandler VoterReady = new IZLoopHandler () {
+
+ @Override
+ public int handle(ZLoop loop, PollItem item, Object arg)
+ {
+ bstar self = (bstar) arg;
+ // If server can accept input now, call appl handler
+ self.event = Event.CLIENT_REQUEST;
+ if (self.execute())
+ self.voterFn.handle(loop, item, self.voterArg);
+ else {
+ // Destroy waiting message, no-one to read it
+ ZMsg msg = ZMsg.recvMsg(item.getSocket());
+ msg.destroy();
+ }
+ return 0;
+ }
+ };
+
+ // .until
+ // .split constructor
+ // This is the constructor for our {{bstar}} class. We have to tell it
+ // whether we're primary or backup server, as well as our local and
+ // remote endpoints to bind and connect to:
+ public bstar(boolean primary, String local, String remote) {
+ // Initialize the Binary Star
+ ctx = new ZContext();
+ loop = new ZLoop();
+ state = primary? State.STATE_PRIMARY: State.STATE_BACKUP;
+
+ // Create publisher for state going to peer
+ statepub = ctx.createSocket(ZMQ.PUB);
+ statepub.bind(local);
+
+ // Create subscriber for state coming from peer
+ statesub = ctx.createSocket(ZMQ.SUB);
+ statesub.subscribe("".getBytes());
+ statesub.connect(remote);
+
+ // Set-up basic reactor events
+ loop.addTimer(BSTAR_HEARTBEAT, 0, SendState, this);
+ PollItem poller = new PollItem(statesub, ZMQ.Poller.POLLIN);
+ loop.addPoller(poller, RecvState, this);
+ }
+
+ // .split destructor
+ // The destructor shuts down the bstar reactor:
+ public void destroy()
+ {
+ loop.destroy();
+ ctx.destroy();
+ }
+
+ // .split zloop method
+ // This method returns the underlying zloop reactor, so we can add
+ // additional timers and readers:
+ public ZLoop zloop()
+ {
+ return loop;
+ }
+
+ // .split voter method
+ // This method registers a client voter socket. Messages received
+ // on this socket provide the CLIENT_REQUEST events for the Binary Star
+ // FSM and are passed to the provided application handler. We require
+ // exactly one voter per {{bstar}} instance:
+ public int voter(String endpoint, int type, IZLoopHandler handler, Object arg)
+ {
+ // Hold actual handler+arg so we can call this later
+ Socket socket = ctx.createSocket(type);
+ socket.bind(endpoint);
+ voterFn = handler;
+ voterArg = arg;
+ PollItem poller = new PollItem(socket, ZMQ.Poller.POLLIN);
+ return loop.addPoller(poller, VoterReady, this);
+ }
+
+ // .split register state-change handlers
+ // Register handlers to be called each time there's a state change:
+ public void newAction(IZLoopHandler handler, Object arg)
+ {
+ activeFn = handler;
+ activeArg = arg;
+ }
+
+ public void newPassive(IZLoopHandler handler, Object arg)
+ {
+ passiveFn = handler;
+ passiveArg = arg;
+ }
+
+ // .split enable/disable tracing
+ // Enable/disable verbose tracing, for debugging:
+ public void setVerbose(boolean verbose)
+ {
+ loop.verbose(verbose);
+ }
+
+ // .split start the reactor
+ // Finally, start the configured reactor. It will end if any handler
+ // returns -1 to the reactor, or if the process receives Interrupt
+
+ public int start()
+ {
+ assert (voterFn != null);
+ updatePeerExpiry();
+ return loop.start();
+ }
+}
View
76 examples/Java/bstarcli.java
@@ -0,0 +1,76 @@
+import org.zeromq.ZContext;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Socket;
+
+// Binary Star client proof-of-concept implementation. This client does no
+// real work; it just demonstrates the Binary Star failover model.
+public class bstarcli
+{
+ private static final long REQUEST_TIMEOUT = 1000; // msecs
+ private static final long SETTLE_DELAY = 2000; // Before failing over
+
+ public static void main(String[] argv) throws Exception
+ {
+ ZContext ctx = new ZContext();
+
+ String[] server = { "tcp://localhost:5001", "tcp://localhost:5002" };
+ int serverNbr = 0;
+
+ System.out.printf ("I: connecting to server at %s...\n", server [serverNbr]);
+ Socket client = ctx.createSocket(ZMQ.REQ);
+ client.connect(server[serverNbr]);
+
+ int sequence = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ // We send a request, then we work to get a reply
+ String request = String.format("%d", ++sequence);
+ client.send(request);
+
+ boolean expectReply = true;
+ while (expectReply) {
+ // Poll socket for a reply, with timeout
+ PollItem items [] = { new PollItem(client, ZMQ.Poller.POLLIN) };
+ int rc = ZMQ.poll(items, 1, REQUEST_TIMEOUT);
+ if (rc == -1)
+ break; // Interrupted
+
+ // .split main body of client
+ // We use a Lazy Pirate strategy in the client. If there's no
+ // reply within our timeout, we close the socket and try again.
+ // In Binary Star, it's the client vote that decides which
+ // server is primary; the client must therefore try to connect
+ // to each server in turn:
+
+ if (items[0].isReadable()) {
+ // We got a reply from the server, must match sequence
+ String reply = client.recvStr();
+ if (Integer.parseInt(reply) == sequence) {
+ System.out.printf ("I: server replied OK (%s)\n", reply);
+ expectReply = false;
+ Thread.sleep(1000); // One request per second
+ }
+ else
+ System.out.printf ("E: bad reply from server: %s\n", reply);
+ }
+ else {
+ System.out.printf ("W: no response from server, failing over\n");
+
+ // Old socket is confused; close it and open a new one
+ ctx.destroySocket(client);
+ serverNbr = (serverNbr + 1) % 2;
+ Thread.sleep(SETTLE_DELAY);
+ System.out.printf("I: connecting to server at %s...\n",
+ server[serverNbr]);
+ client = ctx.createSocket(ZMQ.REQ);
+ client.connect(server[serverNbr]);
+
+ // Send request again, on new socket
+ client.send(request);
+ }
+ }
+ }
+ ctx.destroy();
+
+ }
+}
View
206 examples/Java/bstarsrv.java
@@ -0,0 +1,206 @@
+import org.zeromq.ZContext;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+
+// Binary Star server proof-of-concept implementation. This server does no
+// real work; it just demonstrates the Binary Star failover model.
+public class bstarsrv
+{
+ // States we can be in at any point in time
+ enum State {
+ STATE_PRIMARY, // Primary, waiting for peer to connect
+ STATE_BACKUP, // Backup, waiting for peer to connect
+ STATE_ACTIVE, // Active - accepting connections
+ STATE_PASSIVE // Passive - not accepting connections
+ }
+
+ // Events, which start with the states our peer can be in
+ enum Event {
+ PEER_PRIMARY, // HA peer is pending primary
+ PEER_BACKUP, // HA peer is pending backup
+ PEER_ACTIVE, // HA peer is active
+ PEER_PASSIVE, // HA peer is passive
+ CLIENT_REQUEST // Client makes request
+ }
+
+ // Our finite state machine
+ private State state; // Current state
+ private Event event; // Current event
+ private long peerExpiry; // When peer is considered 'dead'
+
+ // We send state information this often
+ // If peer doesn't respond in two heartbeats, it is 'dead'
+ private final static long HEARTBEAT = 1000; // In msecs
+
+ // .split Binary Star state machine
+ // The heart of the Binary Star design is its finite-state machine (FSM).
+ // The FSM runs one event at a time. We apply an event to the current state,
+ // which checks if the event is accepted, and if so, sets a new state:
+
+ private boolean stateMachine()
+ {
+ boolean exception = false;
+
+ // These are the PRIMARY and BACKUP states; we're waiting to become
+ // ACTIVE or PASSIVE depending on events we get from our peer:
+ if (state == State.STATE_PRIMARY) {
+ if (event == Event.PEER_BACKUP) {
+ System.out.printf ("I: connected to backup (passive), ready active\n");
+ state = State.STATE_ACTIVE;
+ }
+ else
+ if (event == Event.PEER_ACTIVE) {
+ System.out.printf ("I: connected to backup (active), ready passive\n");
+ state = State.STATE_PASSIVE;
+ }
+ // Accept client connections
+ }
+ else
+ if (state == State.STATE_BACKUP) {
+ if (event == Event.PEER_ACTIVE) {
+ System.out.printf ("I: connected to primary (active), ready passive\n");
+ state = State.STATE_PASSIVE;
+ }
+ else
+ // Reject client connections when acting as backup
+ if (event == Event.CLIENT_REQUEST)
+ exception = true;
+ }
+ else
+ // .split active and passive states
+ // These are the ACTIVE and PASSIVE states:
+ if (state == State.STATE_ACTIVE) {
+ if (event == Event.PEER_ACTIVE) {
+ // Two actives would mean split-brain
+ System.out.printf ("E: fatal error - dual actives, aborting\n");
+ exception = true;
+ }
+ }
+ else
+ // Server is passive
+ // CLIENT_REQUEST events can trigger failover if peer looks dead
+ if (state == State.STATE_PASSIVE) {
+ if (event == Event.PEER_PRIMARY) {
+ // Peer is restarting - become active, peer will go passive
+ System.out.printf ("I: primary (passive) is restarting, ready active\n");
+ state = State.STATE_ACTIVE;
+ }
+ else
+ if (event == Event.PEER_BACKUP) {
+ // Peer is restarting - become active, peer will go passive
+ System.out.printf ("I: backup (passive) is restarting, ready active\n");
+ state = State.STATE_ACTIVE;
+ }
+ else
+ if (event == Event.PEER_PASSIVE) {
+ // Two passives would mean cluster would be non-responsive
+ System.out.printf ("E: fatal error - dual passives, aborting\n");
+ exception = true;
+ }
+ else
+ if (event == Event.CLIENT_REQUEST) {
+ // Peer becomes active if timeout has passed
+ // It's the client request that triggers the failover
+ assert (peerExpiry > 0);
+ if (System.currentTimeMillis () >= peerExpiry) {
+ // If peer is dead, switch to the active state
+ System.out.printf ("I: failover successful, ready active\n");
+ state = State.STATE_ACTIVE;
+ }
+ else
+ // If peer is alive, reject connections
+ exception = true;
+ }
+ }
+ return exception;
+ }
+
+ // .split main task
+ // This is our main task. First we bind/connect our sockets with our
+ // peer and make sure we will get state messages correctly. We use
+ // three sockets; one to publish state, one to subscribe to state, and
+ // one for client requests/replies:
+
+ public static void main(String[] argv) {
+ // Arguments can be either of:
+ // -p primary server, at tcp://localhost:5001
+ // -b backup server, at tcp://localhost:5002
+ ZContext ctx = new ZContext();
+ Socket statepub = ctx.createSocket(ZMQ.PUB);
+ Socket statesub = ctx.createSocket(ZMQ.SUB);
+ statesub.subscribe("".getBytes());
+ Socket frontend = ctx.createSocket(ZMQ.ROUTER);
+ bstarsrv fsm = new bstarsrv();
+
+ if (argv.length == 1 && argv[0].equals("-p")) {
+ System.out.printf("I: Primary active, waiting for backup (passive)\n");
+ frontend.bind("tcp://*:5001");
+ statepub.bind("tcp://*:5003");
+ statesub.connect("tcp://localhost:5004");
+ fsm.state = State.STATE_PRIMARY;
+ }
+ else
+ if (argv.length == 1 && argv[0].equals("-b")) {
+ System.out.printf("I: Backup passive, waiting for primary (active)\n");
+ frontend.bind("tcp://*:5002");
+ statepub.bind("tcp://*:5004");
+ statesub.connect("tcp://localhost:5003");
+ fsm.state = State.STATE_BACKUP;
+ }
+ else {
+ System.out.printf("Usage: bstarsrv { -p | -b }\n");
+ ctx.destroy();
+ System.exit(0);
+ }
+ // .split handling socket input
+ // We now process events on our two input sockets, and process these
+ // events one at a time via our finite-state machine. Our "work" for
+ // a client request is simply to echo it back:
+
+ // Set timer for next outgoing state message
+ long sendStateAt = System.currentTimeMillis() + HEARTBEAT;
+ while (!Thread.currentThread().isInterrupted()) {
+ PollItem[] items = {
+ new PollItem(frontend, ZMQ.Poller.POLLIN),
+ new PollItem(statesub, ZMQ.Poller.POLLIN),
+ };
+ int timeLeft = (int) ((sendStateAt - System.currentTimeMillis()));
+ if (timeLeft < 0)
+ timeLeft = 0;
+ int rc = ZMQ.poll(items, 2, timeLeft);
+ if (rc == -1)
+ break; // Context has been shut down
+
+ if (items[0].isReadable()) {
+ // Have a client request
+ ZMsg msg = ZMsg.recvMsg(frontend);
+ fsm.event = Event.CLIENT_REQUEST;
+ if (fsm.stateMachine() == false)
+ // Answer client by echoing request back
+ msg.send(frontend);
+ else
+ msg.destroy();
+ }
+ if (items[1].isReadable()) {
+ // Have state from our peer, execute as event
+ String message = statesub.recvStr();
+ fsm.event = Event.values()[Integer.parseInt(message)];
+ if (fsm.stateMachine())
+ break; // Error, so exit
+ fsm.peerExpiry = System.currentTimeMillis() + 2 * HEARTBEAT;
+ }
+ // If we timed out, send state to peer
+ if (System.currentTimeMillis() >= sendStateAt) {
+ statepub.send(String.valueOf(fsm.state.ordinal()));
+ sendStateAt = System.currentTimeMillis() + HEARTBEAT;
+ }
+ }
+ if (Thread.currentThread().isInterrupted())
+ System.out.printf ("W: interrupted\n");
+
+ // Shutdown sockets and context
+ ctx.destroy();
+ }
+}
View
72 examples/Java/bstarsrv2.java
@@ -0,0 +1,72 @@
+/*
+ * bstarsrv2.java
+ *
+ * -------------------------------------------------------------------------
+ * Copyright (c) 2012-2013 InfiniLoop Corporation
+ * Copyright other contributors as noted in the AUTHORS file.
+ *
+ * This file is part of Zyni, an open-source message based application framework.
+ *
+ * This is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU Lesser General Public License as published by the
+ * Free Software Foundation; either version 3 of the License, or (at your
+ * option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTA-
+ * BILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+ * Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program. If not, see http://www.gnu.org/licenses/.
+ * =========================================================================
+ */
+
+
+import org.zeromq.ZContext;
+import org.zeromq.ZLoop;
+import org.zeromq.ZLoop.IZLoopHandler;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+
+// Binary Star server, using bstar reactor
+public class bstarsrv2
+{
+ private static IZLoopHandler Echo = new IZLoopHandler()
+ {
+ @Override
+ public int handle(ZLoop loop, PollItem item, Object arg)
+ {
+ ZMsg msg = ZMsg.recvMsg(item.getSocket());
+ msg.send(item.getSocket());
+ return 0;
+ }
+ };
+
+ public static void main(String[] argv) {
+ // Arguments can be either of:
+ // -p primary server, at tcp://localhost:5001
+ // -b backup server, at tcp://localhost:5002
+ bstar bs = null;
+
+ if (argv.length == 1 && argv[0].equals("-p")) {
+ System.out.printf("I: Primary active, waiting for backup (passive)\n");
+ bs = new bstar(true, "tcp://*:5003", "tcp://localhost:5004");
+ bs.voter ("tcp://*:5001", ZMQ.ROUTER, Echo, null);
+ }
+ else
+ if (argv.length == 1 && argv[0].equals("-b")) {
+ System.out.printf("I: Backup passive, waiting for primary (active)\n");
+ bs = new bstar(true, "tcp://*:5004", "tcp://localhost:5003");
+ bs.voter ("tcp://*:5002", ZMQ.ROUTER, Echo, null);
+ }
+ else {
+ System.out.printf("Usage: bstarsrv { -p | -b }\n");
+ System.exit(0);
+ }
+ bs.start();
+ bs.destroy();
+ }
+}
View
156 examples/Java/lpclient.java
@@ -1,78 +1,78 @@
-/**
- * Lazy Pirate client
- * Use zmq_poll to do a safe request-reply
- * To run, start lpserver and then randomly kill/restart it
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
-import org.zeromq.ZMQ;
-
-public class lpclient {
- public static int REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
- public static int REQUEST_RETRIES = 3; // before we abandon
- public static String SERVER_ENDPOINT = "tcp://localhost:5555";
-
- public static void main(String[] args) throws InterruptedException {
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket client = context.socket(ZMQ.REQ);
- System.out.println("Connecting to server...");
- client.connect(SERVER_ENDPOINT);
-
- int sequence = 0;
- int retriesLeft = REQUEST_RETRIES;
- while (retriesLeft > 0) {
- // We send a request, then we work to get a reply
- String requestString = ++sequence + " ";
- byte[] request = requestString.getBytes();
- request[request.length - 1] = 0; // Sets the last byte to 0
- client.send(request, 0);
-
- boolean expectReply = true;
- while (expectReply) {
- // Poll socket for a reply, with timeout
- ZMQ.Poller items = context.poller();
- items.register(client, ZMQ.Poller.POLLIN);
- long rc = items.poll(REQUEST_TIMEOUT * 1000);
- if (items.pollin(0)) {
- final byte[] reply = client.recv(0);
- final String replyString = new String(reply).trim();
- int replySequence = -1;
- try {
- replySequence = Integer.parseInt(replyString);
- } catch (Exception ignoreItNow) {
- }
- if (replySequence == sequence) {
- System.out.printf("I: server replied OK (%d)\n",
- replySequence);
- retriesLeft = REQUEST_RETRIES;
- expectReply = false;
- } else {
- System.out.printf(
- "E: malformed reply from server: (%s)\n",
- replyString);
- }
- } else {
- System.out
- .println("W: no response from server, retrying...");
- // Old socket is confused; close it and open a new one
- client.setLinger(0); // drop pending messages immediately
- // on close
- client.close();
- items.unregister(client);
- if (--retriesLeft == 0) {
- System.out
- .println("E: server seems to be offline, abandoning");
- break;
- }
- System.out.println("I: reconnecting to server...");
- client = context.socket(ZMQ.REQ);
- client.connect(SERVER_ENDPOINT);
- // Send request again, on new socket
- client.send(request, 0);
- }
- }
- }
- client.close();
- context.term();
- }
-}
+import org.zeromq.ZContext;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZMQ.Socket;
+
+//
+// Lazy Pirate client
+// Use zmq_poll to do a safe request-reply
+// To run, start lpserver and then randomly kill/restart it
+//
+
+public class lpclient
+{
+
+ private final static int REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
+ private final static int REQUEST_RETRIES = 3; // Before we abandon
+ private final static String SERVER_ENDPOINT = "tcp://localhost:5555";
+
+ public static void main(String[] argv)
+ {
+ ZContext ctx = new ZContext();
+ System.out.println("I: connecting to server");
+ Socket client = ctx.createSocket(ZMQ.REQ);
+ assert (client != null);
+ client.connect(SERVER_ENDPOINT);
+
+ int sequence = 0;
+ int retriesLeft = REQUEST_RETRIES;
+ while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {
+ // We send a request, then we work to get a reply
+ String request = String.format("%d", ++sequence);
+ client.send(request);
+
+ int expect_reply = 1;
+ while (expect_reply > 0) {
+ // Poll socket for a reply, with timeout
+ PollItem items[] = {new PollItem(client, Poller.POLLIN)};
+ int rc = ZMQ.poll(items, REQUEST_TIMEOUT);
+ if (rc == -1)
+ break; // Interrupted
+
+ // Here we process a server reply and exit our loop if the
+ // reply is valid. If we didn't a reply we close the client
+ // socket and resend the request. We try a number of times
+ // before finally abandoning:
+
+ if (items[0].isReadable()) {
+ // We got a reply from the server, must match sequence
+ String reply = client.recvStr();
+ if (reply == null)
+ break; // Interrupted
+ if (Integer.parseInt(reply) == sequence) {
+ System.out.printf("I: server replied OK (%s)\n", reply);
+ retriesLeft = REQUEST_RETRIES;
+ expect_reply = 0;
+ } else
+ System.out.printf("E: malformed reply from server: %s\n",
+ reply);
+
+ } else if (--retriesLeft == 0) {
+ System.out.println("E: server seems to be offline, abandoning\n");
+ break;
+ } else {
+ System.out.println("W: no response from server, retrying\n");
+ // Old socket is confused; close it and open a new one
+ ctx.destroySocket(client);
+ System.out.println("I: reconnecting to server\n");
+ client = ctx.createSocket(ZMQ.REQ);
+ client.connect(SERVER_ENDPOINT);
+ // Send request again, on new socket
+ client.send(request);
+ }
+ }
+ }
+ ctx.destroy();
+ }
+}
View
66 examples/Java/lpserver.java
@@ -1,48 +1,44 @@
-/**
- * Lazy Pirate server
- * Binds REQ socket to tcp://*:5555
- * Like hwserver except:
- * - echoes request as-is
- * - randomly runs slowly, or exits to simulate a crash.
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
-import org.zeromq.ZMQ;
import java.util.Random;
-public class lpserver {
- public static void main(String[] args) {
- // Prepare our context and socket
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket server = context.socket(ZMQ.REP);
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.Context;
+import org.zeromq.ZMQ.Socket;
+
+//
+// Lazy Pirate server
+// Binds REQ socket to tcp://*:5555
+// Like hwserver except:
+// - echoes request as-is
+// - randomly runs slowly, or exits to simulate a crash.
+//
+public class lpserver
+{
+
+ public static void main(String[] argv) throws Exception
+ {
+ Random rand = new Random(System.nanoTime());
+
+ Context context = ZMQ.context(1);
+ Socket server = context.socket(ZMQ.REP);
server.bind("tcp://*:5555");
- Random rand = new Random();
int cycles = 0;
while (true) {
- byte[] request = server.recv(0);
+ String request = server.recvStr();
cycles++;
- try {
- // Simulate various problems, after a few cycles
- if (cycles > 3 && rand.nextInt(4) == 0) {
- System.out.println("I: simulating a crash");
- break;
- } else if (cycles > 3 && rand.nextInt(4) == 0) {
- System.out.println("I: simulating CPU overload");
- Thread.sleep(2000);
- }
-
- System.out.printf("I: normal request (%s)\n", new String(
- request).trim());
- // Do some 'work'
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
+ // Simulate various problems, after a few cycles
+ if (cycles > 3 && rand.nextInt(3) == 0) {
+ System.out.println("I: simulating a crash");
+ break;
+ } else if (cycles > 3 && rand.nextInt(3) == 0) {
+ System.out.println("I: simulating CPU overload");
+ Thread.sleep(2000);
}
- server.send(request, 0);
+ System.out.printf("I: normal request (%s)\n", request);
+ Thread.sleep(1000); // Do some heavy work
+ server.send(request);
}
- // cleanup
server.close();
context.term();
}
View
46 examples/Java/mdbroker.java
@@ -1,21 +1,21 @@
/**
- * (c) 2011 Arkadiusz Orzechowski
- *
- * This file is part of ZGuide
- *
- * ZGuide is free software; you can redistribute it and/or modify it under
- * the terms of the Lesser GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * ZGuide is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Lesser GNU General Public License for more details.
- *
- * You should have received a copy of the Lesser GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
+* (c) 2011 Arkadiusz Orzechowski
+*
+* This file is part of ZGuide
+*
+* ZGuide is free software; you can redistribute it and/or modify it under
+* the terms of the Lesser GNU General Public License as published by
+* the Free Software Foundation; either version 3 of the License, or
+* (at your option) any later version.
+*
+* ZGuide is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* Lesser GNU General Public License for more details.
+*
+* You should have received a copy of the Lesser GNU General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Formatter;
@@ -28,11 +28,9 @@
import org.zeromq.ZMsg;
/**
- * Majordomo Protocol broker
- * A minimal implementation of http://rfc.zeromq.org/spec:7 and spec:8
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
+* Majordomo Protocol broker
+* A minimal implementation of http://rfc.zeromq.org/spec:7 and spec:8
+*/
public class mdbroker {
// We'd normally pull these from config data
@@ -121,9 +119,9 @@ public mdbroker(boolean verbose) {
*/
public void mediate() {
while (!Thread.currentThread().isInterrupted()) {
- ZMQ.Poller items = ctx.getContext().poller();
+ ZMQ.Poller items = new ZMQ.Poller(1);
items.register(socket, ZMQ.Poller.POLLIN);
- if (items.poll(HEARTBEAT_INTERVAL * 1000) == -1)
+ if (items.poll(HEARTBEAT_INTERVAL) == -1)
break; // Interrupted
if (items.pollin(0)) {
ZMsg msg = ZMsg.recvMsg(socket);
View
51 examples/Java/mdcliapi.java
@@ -1,21 +1,19 @@
/**
- * (c) 2011 Arkadiusz Orzechowski
- *
- * This file is part of ZGuide
- *
- * ZGuide is free software; you can redistribute it and/or modify it under
- * the terms of the Lesser GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * ZGuide is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Lesser GNU General Public License for more details.
- *
- * You should have received a copy of the Lesser GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
+* This file is part of ZGuide
+*
+* ZGuide is free software; you can redistribute it and/or modify it under
+* the terms of the Lesser GNU General Public License as published by
+* the Free Software Foundation; either version 3 of the License, or
+* (at your option) any later version.
+*
+* ZGuide is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* Lesser GNU General Public License for more details.
+*
+* You should have received a copy of the Lesser GNU General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
import java.util.Formatter;
import org.zeromq.ZContext;
@@ -24,11 +22,10 @@
import org.zeromq.ZMsg;
/**
- * Majordomo Protocol Client API, Java version Implements the MDP/Worker spec at
- * http://rfc.zeromq.org/spec:7.
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
+* Majordomo Protocol Client API, Java version Implements the MDP/Worker spec at
+* http://rfc.zeromq.org/spec:7.
+*
+*/
public class mdcliapi {
private String broker;
@@ -72,14 +69,14 @@ void reconnectToBroker() {
client = ctx.createSocket(ZMQ.REQ);
client.connect(broker);
if (verbose)
- log.format("I: connecting to broker at %s...\n", broker);
+ log.format("I: connecting to broker at %s\n", broker);
}
/**
* Send request to broker and get reply by hook or crook Takes ownership of
* request message and destroys it when sent. Returns the reply message or
* NULL if there was no reply.
- *
+ *
* @param service
* @param request
* @return
@@ -100,9 +97,9 @@ public ZMsg send(String service, ZMsg request) {
request.duplicate().send(client);
// Poll socket for a reply, with timeout
- ZMQ.Poller items = ctx.getContext().poller();
+ ZMQ.Poller items = new ZMQ.Poller(1);
items.register(client, ZMQ.Poller.POLLIN);
- if (items.poll(timeout * 1000) == -1)
+ if (items.poll(timeout) == -1)
break; // Interrupted
if (items.pollin(0)) {
@@ -130,7 +127,7 @@ public ZMsg send(String service, ZMsg request) {
log.format("W: permanent error, abandoning\n");
break;
}
- log.format("W: no reply, reconnecting...\n");
+ log.format("W: no reply, reconnecting\n");
reconnectToBroker();
}
}
View
8 examples/Java/mdcliapi2.java
@@ -1,7 +1,4 @@
/**
- * (c) 2011 Arkadiusz Orzechowski
- *
- * This file is part of ZGuide
*
* ZGuide is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
@@ -16,6 +13,7 @@
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+
import java.util.Formatter;
import org.zeromq.ZContext;
@@ -26,8 +24,6 @@
/**
* Majordomo Protocol Client API, asynchronous Java version. Implements the
* MDP/Worker spec at http://rfc.zeromq.org/spec:7.
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
*/
public class mdcliapi2 {
@@ -75,7 +71,7 @@ public ZMsg recv() {
ZMsg reply = null;
// Poll socket for a reply, with timeout
- ZMQ.Poller items = ctx.getContext().poller();
+ ZMQ.Poller items = new ZMQ.Poller(1);
items.register(client, ZMQ.Poller.POLLIN);
if (items.poll(timeout * 1000) == -1)
return null; // Interrupted
View
7 examples/Java/mdclient.java
@@ -1,11 +1,8 @@
import org.zeromq.ZMsg;
/**
- * Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- *
- */
+* Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
+*/
public class mdclient {
public static void main(String[] args) {
View
3 examples/Java/mdclient2.java
@@ -4,9 +4,8 @@
* Majordomo Protocol client example, asynchronous. Uses the mdcli API to hide
* all MDP aspects
*
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- *
*/
+
public class mdclient2 {
public static void main(String[] args) {
View
10 examples/Java/mdworker.java
@@ -1,11 +1,9 @@
import org.zeromq.ZMsg;
/**
- * Majordomo Protocol worker example. Uses the mdwrk API to hide all MDP aspects
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- *
- */
+* Majordomo Protocol worker example. Uses the mdwrk API to hide all MDP aspects
+*
+*/
public class mdworker {
/**
@@ -20,7 +18,7 @@ public static void main(String[] args) {
ZMsg request = workerSession.receive(reply);
if (request == null)
break; //Interrupted
- reply = request; // Echo is complex... :-)
+ reply = request; // Echo is complex :-)
}
workerSession.destroy();
}
View
54 examples/Java/mdwrkapi.java
@@ -1,21 +1,19 @@
/**
- * (c) 2011 Arkadiusz Orzechowski
- *
- * This file is part of ZGuide
- *
- * ZGuide is free software; you can redistribute it and/or modify it under
- * the terms of the Lesser GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * ZGuide is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Lesser GNU General Public License for more details.
- *
- * You should have received a copy of the Lesser GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
+* This file is part of ZGuide
+*
+* ZGuide is free software; you can redistribute it and/or modify it under
+* the terms of the Lesser GNU General Public License as published by
+* the Free Software Foundation; either version 3 of the License, or
+* (at your option) any later version.
+*
+* ZGuide is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* Lesser GNU General Public License for more details.
+*
+* You should have received a copy of the Lesser GNU General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
import java.util.Formatter;
import org.zeromq.ZContext;
@@ -24,11 +22,9 @@
import org.zeromq.ZMsg;
/**
- * Majordomo Protocol Client API, Java version Implements the MDP/Worker spec at
- * http://rfc.zeromq.org/spec:7.
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
+* Majordomo Protocol Client API, Java version Implements the MDP/Worker spec at
+* http://rfc.zeromq.org/spec:7.
+*/
public class mdwrkapi {
private static final int HEARTBEAT_LIVENESS = 3; // 3-5 is reasonable
@@ -65,7 +61,7 @@ public mdwrkapi(String broker, String service, boolean verbose) {
/**
* Send message to broker If no msg is provided, creates one internally
- *
+ *
* @param command
* @param option
* @param msg
@@ -98,7 +94,7 @@ void reconnectToBroker() {
worker = ctx.createSocket(ZMQ.DEALER);
worker.connect(broker);
if (verbose)
- log.format("I: connecting to broker at %s...\n", broker);
+ log.format("I: connecting to broker at %s\n", broker);
// Register service with broker
sendToBroker(MDP.W_READY, service, null);
@@ -127,9 +123,9 @@ public ZMsg receive(ZMsg reply) {
while (!Thread.currentThread().isInterrupted()) {
// Poll socket for a reply, with timeout
- ZMQ.Poller items = ctx.getContext().poller();
+ ZMQ.Poller items = new ZMQ.Poller(1);
items.register(worker, ZMQ.Poller.POLLIN);
- if (items.poll(timeout * 1000) == -1)
+ if (items.poll(timeout) == -1)
break; // Interrupted
if (items.pollin(0)) {
@@ -155,7 +151,7 @@ public ZMsg receive(ZMsg reply) {
ZFrame command = msg.pop();
if (MDP.W_REQUEST.frameEquals(command)) {
// We should pop and save as many addresses as there are
- // up to a null part, but for now, just save one...
+ // up to a null part, but for now, just save one
replyTo = msg.unwrap();
command.destroy();
return msg; // We have a request to process
@@ -171,7 +167,7 @@ public ZMsg receive(ZMsg reply) {
msg.destroy();
} else if (--liveness == 0) {
if (verbose)
- log.format("W: disconnected from broker - retrying...\n");
+ log.format("W: disconnected from broker - retrying\n");
try {
Thread.sleep(reconnect);
} catch (InterruptedException e) {
@@ -190,7 +186,7 @@ public ZMsg receive(ZMsg reply) {
}
if (Thread.currentThread().isInterrupted())
- log.format("W: interrupt received, killing worker...\n");
+ log.format("W: interrupt received, killing worker\n");
return null;
}
View
253 examples/Java/ppqueue.java
@@ -1,174 +1,159 @@
-/**
- * Paranoid Pirate queue
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Deque;
+import java.util.ArrayList;
+import java.util.Iterator;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
-public class ppqueue {
+//
+// Paranoid Pirate queue
+//
- private static final int HEARTBEAT_LIVENESS = 3; // 3-5 is reasonable
- private static final int HEARTBEAT_INTERVAL = 1000; // msecs
+public class ppqueue {
- private static final byte[] PPP_READY = { 1 }; // Signals worker is ready
- private static final byte[] PPP_HEARTBEAT = { 2 }; // Signals worker
- // heartbeat
+ private final static int HEARTBEAT_LIVENESS = 3; // 3-5 is reasonable
+ private final static int HEARTBEAT_INTERVAL = 1000; // msecs
- /**
- * Keeps worker's address and expiry time.
- */
+ // Paranoid Pirate Protocol constants
+ private final static String PPP_READY = "\001"; // Signals worker is ready
+ private final static String PPP_HEARTBEAT = "\002"; // Signals worker heartbeat
+
+ // Here we define the worker class; a structure and a set of functions that
+ // as constructor, destructor, and methods on worker objects:
+
private static class Worker {
- ZFrame address;
- long expiry;
-
- public Worker(ZFrame address) {
+ ZFrame address; // Address of worker
+ String identity; // Printable identity
+ long expiry; // Expires at this time
+
+ protected Worker(ZFrame address) {
this.address = address;
- this.expiry = System.currentTimeMillis() + HEARTBEAT_INTERVAL
- * HEARTBEAT_LIVENESS;
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(address.getData());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof Worker))
- return false;
- Worker other = (Worker) obj;
- return Arrays.equals(address.getData(), other.address.getData());
+ identity = new String(address.getData());
+ expiry = System.currentTimeMillis() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
}
- }
-
- private static class WorkersPool {
- private Deque<Worker> workers = new ArrayDeque<Worker>();
- private static final ZFrame heartbeatFrame = new ZFrame(PPP_HEARTBEAT);
- private long heartbeatAt = System.currentTimeMillis()
- + HEARTBEAT_INTERVAL;
-
- /**
- * Worker is ready, remove if on list and move to end
- */
- public synchronized void workerReady(Worker worker) {
- if (workers.remove(worker)) {
- System.out.printf("I: %s is alive, waiting\n",
- worker.address.toString());
- } else {
- System.out.printf("I: %s is now ready to work\n",
- worker.address.toString());
+ // The ready method puts a worker to the end of the ready list:
+ protected void ready(ArrayList<Worker> workers) {
+ Iterator<Worker> it = workers.iterator();
+ while (it.hasNext()) {
+ Worker worker = it.next();
+ if (identity.equals(worker.identity)) {
+ it.remove();
+ break;
+ }
}
- workers.offerLast(worker);
+ workers.add(this);
}
-
- /**
- * Return next available worker address
- */
- public synchronized ZFrame next() {
- return workers.pollFirst().address;
+
+ // The next method returns the next available worker address:
+ protected static ZFrame next(ArrayList<Worker> workers) {
+ Worker worker = workers.remove(0);
+ assert (worker != null);
+ ZFrame frame = worker.address;
+ return frame;
}
- /**
- * Send heartbeats to idle workers if it's time
- */
- public synchronized void sendHeartbeats(Socket backend) {
- // Send heartbeats to idle workers if it's time
- if (System.currentTimeMillis() >= heartbeatAt) {
- for (Worker worker : workers) {
- worker.address.sendAndKeep(backend, ZMQ.SNDMORE);
- heartbeatFrame.sendAndKeep(backend);
+ // The purge method looks for and kills expired workers. We hold workers
+ // from oldest to most recent, so we stop at the first alive worker:
+ protected static void purge(ArrayList<Worker> workers) {
+ Iterator<Worker> it = workers.iterator();
+ while (it.hasNext()) {
+ Worker worker = it.next();
+ if (System.currentTimeMillis() < worker.expiry) {
+ break;
}
- heartbeatAt = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
+ it.remove();
}
}
+ };
- /**
- * Look for & kill expired workers. Workers are oldest to most recent,
- * so we stop at the first alive worker.
- */
- public synchronized void purge() {
- for (Worker w = workers.peekFirst(); w != null
- && w.expiry < System.currentTimeMillis(); w = workers
- .peekFirst()) {
- workers.pollFirst().address.destroy();
- }
- }
-
- public boolean isEmpty() {
- return workers.isEmpty();
- }
-
- public synchronized void close() {
- for (Worker worker : workers)
- worker.address.destroy();
- }
- }
-
+ // The main task is an LRU queue with heartbeating on workers so we can
+ // detect crashed or blocked worker tasks:
public static void main(String[] args) {
- // Prepare our context and sockets
- ZContext context = new ZContext();
- ZMQ.Socket frontend = context.createSocket(ZMQ.ROUTER);
- ZMQ.Socket backend = context.createSocket(ZMQ.ROUTER);
- frontend.bind("tcp://*:5555"); // For clients
- backend.bind("tcp://*:5556"); // For workers
- WorkersPool workers = new WorkersPool();
-
- while (!Thread.currentThread().isInterrupted()) {
- ZMQ.Poller items = context.getContext().poller();
- items.register(backend, ZMQ.Poller.POLLIN);
-
- if (!workers.isEmpty()) // poll frontend only if there are
- // registered workers
- items.register(frontend, ZMQ.Poller.POLLIN);
-
- items.poll();
- if (items.pollin(0)) {
- // receive whole message (all ZFrames) at once
- ZMsg msg = ZMsg.recvMsg(backend);
+ ZContext ctx = new ZContext ();
+ Socket frontend = ctx.createSocket(ZMQ.ROUTER);
+ Socket backend = ctx.createSocket(ZMQ.ROUTER);
+ frontend.bind( "tcp://*:5555"); // For clients
+ backend.bind( "tcp://*:5556"); // For workers
+
+ // List of available workers
+ ArrayList<Worker> workers = new ArrayList<Worker> ();
+
+ // Send out heartbeats at regular intervals
+ long heartbeat_at = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
+
+ while (true) {
+ PollItem items [] = {
+ new PollItem( backend, ZMQ.Poller.POLLIN ),
+ new PollItem( frontend, ZMQ.Poller.POLLIN )
+ };
+ // Poll frontend only if we have available workers
+ int rc = ZMQ.poll (items, workers.size() > 0 ? 2:1,
+ HEARTBEAT_INTERVAL );
+ if (rc == -1)
+ break; // Interrupted
+
+ // Handle worker activity on backend
+ if (items [0].isReadable()) {
+ // Use worker address for LRU routing
+ ZMsg msg = ZMsg.recvMsg (backend);
if (msg == null)
- break; // Interrupted
+ break; // Interrupted
- // Any sign of life from worker means it's ready
+ // Any sign of life from worker means it's ready
ZFrame address = msg.unwrap();
- workers.workerReady(new Worker(address));
+ Worker worker = new Worker(address);
+ worker.ready(workers);
- // Validate control message, or return reply to client
+ // Validate control message, or return reply to client
if (msg.size() == 1) {
ZFrame frame = msg.getFirst();
- if (!(Arrays.equals(frame.getData(), PPP_HEARTBEAT) || Arrays
- .equals(frame.getData(), PPP_READY))) {
- System.out.printf("E: invalid message from worker "
- + msg.toString());
+ String data = new String(frame.getData());
+ if (!data.equals(PPP_READY)
+ && !data.equals( PPP_HEARTBEAT)) {
+ System.out.println ("E: invalid message from worker");
+ msg.dump(System.out);
}
msg.destroy();
- } else
+ }
+ else
msg.send(frontend);
}
- if (items.pollin(1)) {
- // Now get next client request, route to next worker
- ZMsg msg = ZMsg.recvMsg(frontend);
+ if (items [1].isReadable()) {
+ // Now get next client request, route to next worker
+ ZMsg msg = ZMsg.recvMsg (frontend);
if (msg == null)
- break; // Interrupted
- msg.push(workers.next());
- msg.send(backend);
+ break; // Interrupted
+ msg.push(Worker.next(workers));
+ msg.send( backend);
}
- workers.sendHeartbeats(backend);
- workers.purge();
-
+ // We handle heartbeating after any socket activity. First we send
+ // heartbeats to any idle workers if it's time. Then we purge any
+ // dead workers:
+
+ if (System.currentTimeMillis() >= heartbeat_at) {
+ for (Worker worker: workers) {
+
+ worker.address.send(backend,
+ ZFrame.REUSE + ZFrame.MORE);
+ ZFrame frame = new ZFrame (PPP_HEARTBEAT);
+ frame.send(backend, 0);
+ }
+ heartbeat_at = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
+ }
+ Worker.purge (workers);
}
- // When we're done, clean up properly
- workers.close();
- context.destroy();
+ // When we're done, clean up properly
+ while ( workers.size() > 0) {
+ Worker worker = workers.remove(0);
+ }
+ workers.clear();
+ ctx.destroy();
}
+
}
View
205 examples/Java/ppworker.java
@@ -1,135 +1,152 @@
-/**
- * Paranoid Pirate worker
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
-import java.util.Arrays;
import java.util.Random;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
+//
+// Paranoid Pirate worker
+//
public class ppworker {
- private static final int HEARTBEAT_LIVENESS = 3; // 3-5 is reasonable
- private static final int HEARTBEAT_INTERVAL = 1000; // msecs
- private static final int INTERVAL_INIT = 1000; // Initial reconnect
- private static final int INTERVAL_MAX = 32000; // After exponential backoff
+ private final static int HEARTBEAT_LIVENESS = 3 ; // 3-5 is reasonable
+ private final static int HEARTBEAT_INTERVAL = 1000; // msecs
+ private final static int INTERVAL_INIT = 1000; // Initial reconnect
+ private final static int INTERVAL_MAX = 32000; // After exponential backoff
+
+ // Paranoid Pirate Protocol constants
+ private final static String PPP_READY = "\001" ; // Signals worker is ready
+ private final static String PPP_HEARTBEAT = "\002" ; // Signals worker heartbeat
+
+ // Helper function that returns a new configured socket
+ // connected to the Paranoid Pirate queue
+
+ private static Socket worker_socket(ZContext ctx) {
+ Socket worker = ctx.createSocket(ZMQ.DEALER);
+ worker.connect( "tcp://localhost:5556");
+
+ // Tell queue we're ready for work
+ System.out.println ("I: worker ready\n");
+ ZFrame frame = new ZFrame (PPP_READY);
+ frame.send( worker, 0);
- // Paranoid Pirate Protocol constants
- private static final byte[] PPP_READY = { 1 }; // Signals worker is ready
- private static final byte[] PPP_HEARTBEAT = { 2 }; // Signals worker
- // heartbeat
-
- private static final ZFrame heartbeatFrame = new ZFrame(PPP_HEARTBEAT);
-
- private static ZMQ.Socket connectWorker(ZContext context) {
- ZMQ.Socket worker = context.createSocket(ZMQ.DEALER);
- // Set random identity to make tracing easier
- Random rand = new Random();
- String id = String.format("%04x-%04x", rand.nextInt(0x10001),
- rand.nextInt(0x10001));
- worker.setIdentity(id.getBytes());
- worker.connect("tcp://localhost:5556");
-
- // Tell the queue we're ready for work
- System.out.printf("I: worker ready\n");
- worker.send(PPP_READY, 0);
return worker;
}
-
- /**
- * Do the job, simulate problems if cycle>5
- */
- private static boolean doTheWork(int cycle) {
- final Random rand = new Random();
- try {
- if (cycle > 3 && rand.nextInt(6) == 0) {
- System.out.printf("I: simulating a crash\n");
- return false;
- } else if (cycle > 3 && rand.nextInt(6) == 0) {
- System.out.printf("I: simulating CPU overload\n");
- Thread.sleep(3000);
- }
- System.out.printf("I: normal reply\n");
- // Do some 'work'
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return true;
- }
-
+
+ // We have a single task, which implements the worker side of the
+ // Paranoid Pirate Protocol (PPP). The interesting parts here are
+ // the heartbeating, which lets the worker detect if the queue has
+ // died, and vice-versa:
+
public static void main(String[] args) {
- // Prepare our context and socket
- ZContext context = new ZContext();
- ZMQ.Socket worker = connectWorker(context);
+ ZContext ctx = new ZContext ();
+ Socket worker = worker_socket (ctx);
- // If liveness hits zero, queue is considered disconnected
+ // If liveness hits zero, queue is considered disconnected
int liveness = HEARTBEAT_LIVENESS;
int interval = INTERVAL_INIT;
- // Send out heartbeats at regular intervals
- long heartbeatAt = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
+ // Send out heartbeats at regular intervals
+ long heartbeat_at = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
+ Random rand = new Random(System.nanoTime());
int cycles = 0;
- while (!Thread.currentThread().isInterrupted()) {
-
- ZMQ.Poller items = context.getContext().poller();
- items.register(worker, ZMQ.Poller.POLLIN);
-
- if (items.poll(HEARTBEAT_INTERVAL * 1000) == -1)
- break; // Interrupted
-
- if (items.pollin(0)) {
+ while (true) {
+ PollItem items [] = { new PollItem( worker, ZMQ.Poller.POLLIN ) };
+ int rc = ZMQ.poll (items, HEARTBEAT_INTERVAL );
+ if (rc == -1)
+ break; // Interrupted
+
+ if (items [0].isReadable()) {
+ // Get message
+ // - 3-part envelope + content -> request
+ // - 1-part HEARTBEAT -> heartbeat
ZMsg msg = ZMsg.recvMsg(worker);
if (msg == null)
- break; // Interrupted
-
- if (msg.size() == 3) { // serving a client request
- if (!doTheWork(cycles++))
- break; // crashed
+ break; // Interrupted
+
+ // To test the robustness of the queue implementation we //
+ // simulate various typical problems, such as the worker
+ // crashing, or running very slowly. We do this after a few
+ // cycles so that the architecture can get up and running
+ // first:
+ if (msg.size() == 3) {
+ cycles++;
+ if (cycles > 3 && rand.nextInt(5) == 0) {
+ System.out.println ("I: simulating a crash\n");
+ msg.destroy();
+ msg = null;
+ break;
+ }
+ else
+ if (cycles > 3 && rand.nextInt (5) == 0) {
+ System.out.println ("I: simulating CPU overload\n");
+ try {
+ Thread.sleep (3000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ System.out.println ("I: normal reply\n");
+ msg.send( worker);
liveness = HEARTBEAT_LIVENESS;
- msg.send(worker);
- } else if (msg.size() == 1) { // heartbeat
+ try {
+ Thread.sleep (1000);
+ } catch (InterruptedException e) {
+ break;
+ } // Do some heavy work
+ }
+ else
+ // When we get a heartbeat message from the queue, it means the
+ // queue was (recently) alive, so reset our liveness indicator:
+ if (msg.size() == 1) {
ZFrame frame = msg.getFirst();
- if (Arrays.equals(frame.getData(), PPP_HEARTBEAT)) {
+ if (PPP_HEARTBEAT.equals(new String(frame.getData())))
liveness = HEARTBEAT_LIVENESS;
- } else {
- System.out.printf("E: invalid message (%s)\n",
- frame.toString());
+ else {
+ System.out.println ("E: invalid message\n");
+ msg.dump(System.out);
}
- frame.destroy();
- } else {
- System.out.printf("E: invalid message (%s)\n",
- msg.toString());
+ msg.destroy();
+ }
+ else {
+ System.out.println ("E: invalid message\n");
+ msg.dump(System.out);
}
interval = INTERVAL_INIT;
- } else if (--liveness == 0) {
- System.out.printf("W: heartbeat failure, can't reach queue\n");
- System.out.printf("W: reconnecting in %d msec...\n", interval);
+ }
+ else
+ // If the queue hasn't sent us heartbeats in a while, destroy the
+ // socket and reconnect. This is the simplest most brutal way of
+ // discarding any messages we might have sent in the meantime://
+ if (--liveness == 0) {
+ System.out.println ("W: heartbeat failure, can't reach queue\n");
+ System.out.println (String.format("W: reconnecting in %zd msec\n", interval));
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
+
if (interval < INTERVAL_MAX)
interval *= 2;
- context.destroySocket(worker);
- worker = connectWorker(context);
+ ctx.destroySocket(worker);
+ worker = worker_socket (ctx);
liveness = HEARTBEAT_LIVENESS;
}
- // Send heartbeat to queue if it's time
- if (System.currentTimeMillis() > heartbeatAt) {
- heartbeatAt = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
- System.out.printf("I: worker heartbeat\n");
- heartbeatFrame.sendAndKeep(worker);
+
+ // Send heartbeat to queue if it's time
+ if (System.currentTimeMillis() > heartbeat_at) {
+ heartbeat_at = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
+ System.out.println ("I: worker heartbeat\n");
+ ZFrame frame = new ZFrame (PPP_HEARTBEAT);
+ frame.send(worker, 0);
}
}
- // cleanup
- context.destroy();
+ ctx.destroy();
}
}
View
128 examples/Java/spqueue.java
@@ -1,77 +1,75 @@
-/**
- * Simple Pirate queue
- * This is identical to the LRU pattern, with no reliability mechanisms
- * at all. It depends on the client for recovery. Runs forever.
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
+import java.util.ArrayList;
+
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
-import org.zeromq.ZMQQueue;
-import java.util.ArrayDeque; // change to LinkedList for java<1.6
-import java.util.Queue;
-import java.util.Arrays;
+import org.zeromq.ZMQ.PollItem;
+import org.zeromq.ZMQ.Poller;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+//
+// Simple Pirate queue
+// This is identical to the LRU pattern, with no reliability mechanisms
+// at all. It depends on the client for recovery. Runs forever.
+//
public class spqueue {
- private static final byte[] LRU_READY = { 1 }; // Signals worker is ready
- private static final byte[] EMPTY = new byte[0];
-
- public static void main (String[] args) {
- // Prepare our context and sockets
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket frontend = context.socket(ZMQ.ROUTER);
- ZMQ.Socket backend = context.socket(ZMQ.ROUTER);
- frontend.bind("tcp://*:5555"); // For clients
- backend.bind("tcp://*:5556"); // For workers
-
- Queue<String> workers = new ArrayDeque<String>();
- while (!Thread.currentThread().isInterrupted()) {
- ZMQ.Poller items = context.poller();
- items.register(backend, ZMQ.Poller.POLLIN);
- if (!workers.isEmpty())
- items.register(frontend, ZMQ.Poller.POLLIN);
+ private final static String WORKER_READY = "\001"; // Signals worker is ready
+ public static void main(String[] args) {
+ ZContext ctx = new ZContext ();
+ Socket frontend = ctx.createSocket(ZMQ.ROUTER);
+ Socket backend = ctx.createSocket(ZMQ.ROUTER);
+ frontend.bind("tcp://*:5555"); // For clients
+ backend.bind("tcp://*:5556"); // For workers
- items.poll();
- if (items.pollin(0)) {
- // First frame is worker address, put it on the queue
- String workerAddr = new String(backend.recv(0));
- workers.add(workerAddr);
- // Second frame is empty
- byte[] empty = backend.recv(0);
- assert empty.length==0 | true;
- // Forward message to client if it's not a LRU_READY
- byte[] clientAddr = backend.recv(0);
- if (Arrays.equals(LRU_READY,clientAddr)){
- System.out.printf ("I: welcome new worker: %s\n",
- workerAddr);
- } else {
- empty = backend.recv(0);
- assert empty.length==0 | true;
- frontend.send(clientAddr, ZMQ.SNDMORE);
- frontend.send(EMPTY, ZMQ.SNDMORE);
- frontend.send(backend.recv(0), 0);
- }
- }
- if (items.pollin(1)) {
- // Get client request, route to first available worker
- byte[] clientAddr = frontend.recv(0);
- byte[] empty = frontend.recv(0);
- assert empty.length==0 | true;
+ // Queue of available workers
+ ArrayList<ZFrame> workers = new ArrayList<ZFrame> ();
+
+ // The body of this example is exactly the same as lruqueue2.
+ while (true) {
+ PollItem items [] = {
+ new PollItem( backend, Poller.POLLIN ),
+ new PollItem( frontend, Poller.POLLIN )
+ };
+ int rc = ZMQ.poll (items, workers.size() > 0 ? 2 : 1, -1);
- String worker = workers.poll();
+ // Poll frontend only if we have available workers
+ if (rc == -1)
+ break; // Interrupted
- backend.send(worker.getBytes(), ZMQ.SNDMORE);
- backend.send(EMPTY, ZMQ.SNDMORE);
- backend.send(clientAddr, ZMQ.SNDMORE);
- backend.send(EMPTY, ZMQ.SNDMORE);
- backend.send(frontend.recv(0), 0);
- }
+ // Handle worker activity on backend
+ if (items [0].isReadable()) {
+ // Use worker address for LRU routing
+ ZMsg msg = ZMsg.recvMsg(backend);
+ if (msg == null)
+ break; // Interrupted
+ ZFrame address = msg.unwrap();
+ workers.add( address);
+ // Forward message to client if it's not a READY
+ ZFrame frame = msg.getFirst();
+ if (new String(frame.getData()).equals(WORKER_READY))
+ msg.destroy();
+ else
+ msg.send(frontend);
+ }
+ if (items [1].isReadable()) {
+ // Get client request, route to first available worker
+ ZMsg msg = ZMsg.recvMsg (frontend);
+ if (msg != null) {
+ msg.wrap (workers.remove(0));
+ msg.send(backend);
+ }
+ }
}
-
// When we're done, clean up properly
- frontend.close();
- backend.close();
- context.term();
+ while (workers.size()>0) {
+ ZFrame frame = workers.remove(0);
+ frame.destroy();
+ }
+ workers.clear();
+ ctx.destroy();
}
+
}
View
93 examples/Java/spworker.java
@@ -1,63 +1,58 @@
-/**
- * Simple Pirate worker
- * Connects REQ socket to tcp://*:5556
- * Implements worker part of LRU queueing
- *
- * @author Arkadiusz Orzechowski <aorzecho@gmail.com>
- */
-import org.zeromq.ZMQ;
import java.util.Random;
-public class spworker {
+import org.zeromq.ZContext;
+import org.zeromq.ZFrame;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMQ.Socket;
+import org.zeromq.ZMsg;
+
+//
+// Simple Pirate worker
+// Connects REQ socket to tcp://*:5556
+// Implements worker part of load-balancing queueing
+//
+public class spworker