diff --git a/examples/Java/bstar.java b/examples/Java/bstar.java index cb8372cc1..8b583fd84 100644 --- a/examples/Java/bstar.java +++ b/examples/Java/bstar.java @@ -194,6 +194,7 @@ private void updatePeerExpiry() public int handle(ZLoop loop, PollItem item, Object arg) { bstar self = (bstar) arg; + System.out.println(String.format("sending %s %d", self.state, self.state.ordinal())); self.statepub.send(String.format("%d", self.state.ordinal())); return 0; } @@ -207,6 +208,9 @@ public int handle(ZLoop loop, PollItem item, Object arg) { bstar self = (bstar) arg; String state = item.getSocket().recvStr(); + + System.out.println(String.format("recv %s", state)); + if (state != null) { self.event = Event.values()[Integer.parseInt(state)]; self.updatePeerExpiry(); diff --git a/examples/Java/bstarsrv2.java b/examples/Java/bstarsrv2.java index ceb8715a7..e87c7ae8f 100644 --- a/examples/Java/bstarsrv2.java +++ b/examples/Java/bstarsrv2.java @@ -59,7 +59,7 @@ public static void main(String[] argv) { else if (argv.length == 1 && argv[0].equals("-b")) { System.out.printf("I: Backup passive, waiting for primary (active)\n"); - bs = new bstar(true, "tcp://*:5004", "tcp://localhost:5003"); + bs = new bstar(false, "tcp://*:5004", "tcp://localhost:5003"); bs.voter ("tcp://*:5002", ZMQ.ROUTER, Echo, null); } else { diff --git a/examples/Java/flcliapi.java b/examples/Java/flcliapi.java new file mode 100644 index 000000000..65cbbe85c --- /dev/null +++ b/examples/Java/flcliapi.java @@ -0,0 +1,301 @@ +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.PollItem; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMsg; +import org.zeromq.ZThread; +import org.zeromq.ZThread.IAttachedRunnable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// flcliapi class - Freelance Pattern agent class +// Implements the Freelance Protocol at http://rfc.zeromq.org/spec:10 +public class flcliapi +{ + // If not a single service replies within this time, give up + private static final int GLOBAL_TIMEOUT = 2500; + // PING interval for servers we think are alive + private static final int PING_INTERVAL = 2000; // msecs + // Server considered dead if silent for this long + private static final int SERVER_TTL = 6000; // msecs + + // .split API structure + // This API works in two halves, a common pattern for APIs that need to + // run in the background. One half is an frontend object our application + // creates and works with; the other half is a backend "agent" that runs + // in a background thread. The frontend talks to the backend over an + // inproc pipe socket: + + // Structure of our frontend class + private ZContext ctx; // Our context wrapper + private Socket pipe; // Pipe through to flcliapi agent + + public flcliapi() + { + ctx = new ZContext(); + FreelanceAgent agent = new FreelanceAgent(); + pipe = ZThread.fork(ctx, agent); + } + + public void destroy() + { + ctx.destroy(); + } + + // .split connect method + // To implement the connect method, the frontend object sends a multipart + // message to the backend agent. The first part is a string "CONNECT", and + // the second part is the endpoint. It waits 100msec for the connection to + // come up, which isn't pretty, but saves us from sending all requests to a + // single server, at startup time: + public void connect(String endpoint) + { + ZMsg msg = new ZMsg(); + msg.add("CONNECT"); + msg.add(endpoint); + + msg.send(pipe); + try { + Thread.sleep(100); // Allow connection to come up + } catch (InterruptedException e) { + } + } + + // .split request method + // To implement the request method, the frontend object sends a message + // to the backend, specifying a command "REQUEST" and the request message: + public ZMsg request(ZMsg request) + { + request.push("REQUEST"); + request.send(pipe); + ZMsg reply = ZMsg.recvMsg(pipe); + if (reply != null) { + String status = reply.popString(); + if (status.equals("FAILED")) + reply.destroy(); + } + return reply; + } + + + // .split backend agent + // Here we see the backend agent. It runs as an attached thread, talking + // to its parent over a pipe socket. It is a fairly complex piece of work + // so we'll break it down into pieces. First, the agent manages a set of + // servers, using our familiar class approach: + + // Simple class for one server we talk to + private static class Server + { + private String endpoint; // Server identity/endpoint + private boolean alive; // 1 if known to be alive + private long pingAt; // Next ping at this time + private long expires; // Expires at this time + + protected Server(String endpoint) + { + this.endpoint = endpoint; + alive = false; + pingAt = System.currentTimeMillis() + PING_INTERVAL; + expires = System.currentTimeMillis() + SERVER_TTL; + } + protected void destroy() + { + } + + private void ping(Socket socket) + { + if (System.currentTimeMillis() >= pingAt) { + ZMsg ping = new ZMsg(); + ping.add(endpoint); + ping.add("PING"); + ping.send(socket); + pingAt = System.currentTimeMillis() + PING_INTERVAL; + } + } + + private long tickless(long tickless) + { + if (tickless > pingAt) + return pingAt; + return -1; + } + } + + // .split backend agent class + // We build the agent as a class that's capable of processing messages + // coming in from its various sockets: + + // Simple class for one background agent + private static class Agent + { + private ZContext ctx; // Own context + private Socket pipe; // Socket to talk back to application + private Socket router; // Socket to talk to servers + private Map servers; // Servers we've connected to + private List actives; // Servers we know are alive + private int sequence; // Number of requests ever sent + private ZMsg request; // Current request if any + private ZMsg reply; // Current reply if any + private long expires; // Timeout for request/reply + + protected Agent(ZContext ctx, Socket pipe) + { + this.ctx = ctx; + this.pipe = pipe; + router = ctx.createSocket(ZMQ.ROUTER); + servers = new HashMap(); + actives = new ArrayList(); + } + + protected void destroy() + { + for(Server server: servers.values()) + server.destroy(); + } + + // .split control messages + // This method processes one message from our frontend class + // (it's going to be CONNECT or REQUEST): + + // Callback when we remove server from agent 'servers' hash table + private void controlMessage() + { + ZMsg msg = ZMsg.recvMsg(pipe); + String command = msg.popString(); + + if (command.equals("CONNECT")) { + String endpoint = msg.popString(); + System.out.printf("I: connecting to %s...\n", endpoint); + router.connect(endpoint); + Server server = new Server(endpoint); + servers.put(endpoint, server); + actives.add(server); + server.pingAt = System.currentTimeMillis() + PING_INTERVAL; + server.expires = System.currentTimeMillis() + SERVER_TTL; + } + else + if (command.equals("REQUEST")) { + assert (request == null); // Strict request-reply cycle + // Prefix request with sequence number and empty envelope + String sequenceText = String.format("%d", ++sequence); + msg.push(sequenceText); + // Take ownership of request message + request = msg; + msg = null; + // Request expires after global timeout + expires = System.currentTimeMillis() + GLOBAL_TIMEOUT; + } + if (msg != null) + msg.destroy(); + } + + // .split router messages + // This method processes one message from a connected + // server: + private void routerMessage() + { + ZMsg reply = ZMsg.recvMsg(router); + + // Frame 0 is server that replied + String endpoint = reply.popString(); + Server server = servers.get(endpoint); + assert (server != null); + if (!server.alive) { + actives.add(server); + server.alive = true; + } + server.pingAt = System.currentTimeMillis() + PING_INTERVAL; + server.expires = System.currentTimeMillis() + SERVER_TTL; + + // Frame 1 may be sequence number for reply + String sequenceStr = reply.popString(); + if (Integer.parseInt(sequenceStr) == sequence) { + reply.push("OK"); + reply.send(pipe); + request.destroy(); + request = null; + } + else + reply.destroy(); + + } + + } + // .split backend agent implementation + // Finally, here's the agent task itself, which polls its two sockets + // and processes incoming messages: + static private class FreelanceAgent implements IAttachedRunnable + { + + @Override + public void run(Object[] args, ZContext ctx, Socket pipe) + { + Agent agent = new Agent(ctx, pipe); + + PollItem[] items = { + new PollItem(agent.pipe, ZMQ.Poller.POLLIN), + new PollItem(agent.router, ZMQ.Poller.POLLIN) + }; + while (!Thread.currentThread().isInterrupted()) { + // Calculate tickless timer, up to 1 hour + long tickless = System.currentTimeMillis() + 1000 * 3600; + if (agent.request != null + && tickless > agent.expires) + tickless = agent.expires; + + for (Server server: agent.servers.values()) { + long newTickless = server.tickless(tickless); + if (newTickless > 0) + tickless = newTickless; + } + + int rc = ZMQ.poll(items, + (tickless - System.currentTimeMillis())); + if (rc == -1) + break; // Context has been shut down + + if (items[0].isReadable()) + agent.controlMessage(); + + if (items[1].isReadable()) + agent.routerMessage(); + + // If we're processing a request, dispatch to next server + if (agent.request != null) { + if (System.currentTimeMillis() >= agent.expires) { + // Request expired, kill it + agent.pipe.send("FAILED"); + agent.request.destroy(); + agent.request = null; + } + else { + // Find server to talk to, remove any expired ones + while (!agent.actives.isEmpty()) { + Server server = agent.actives.get(0); + if (System.currentTimeMillis() >= server.expires) { + agent.actives.remove(0); + server.alive = false; + } + else { + ZMsg request = agent.request.duplicate(); + request.push(server.endpoint); + request.send(agent.router); + break; + } + } + } + } + + // Disconnect and delete any expired servers + // Send heartbeats to idle servers if needed + for (Server server: agent.servers.values()) + server.ping(agent.router); + } + agent.destroy(); + } + } +} diff --git a/examples/Java/flclient1.java b/examples/Java/flclient1.java new file mode 100644 index 000000000..12df41f0a --- /dev/null +++ b/examples/Java/flclient1.java @@ -0,0 +1,79 @@ +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.PollItem; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMsg; + +// Freelance client - Model 1 +// Uses REQ socket to query one or more services +public class flclient1 +{ + private static final int REQUEST_TIMEOUT = 1000; + private static final int MAX_RETRIES = 3; // Before we abandon + + private static ZMsg tryRequest (ZContext ctx, String endpoint, ZMsg request) + { + System.out.printf("I: trying echo service at %s...\n", endpoint); + Socket client = ctx.createSocket(ZMQ.REQ); + client.connect(endpoint); + + // Send request, wait safely for reply + ZMsg msg = request.duplicate(); + msg.send(client); + PollItem[] items = { new PollItem(client, ZMQ.Poller.POLLIN) }; + ZMQ.poll(items, REQUEST_TIMEOUT); + ZMsg reply = null; + if (items[0].isReadable()) + reply = ZMsg.recvMsg(client); + + // Close socket in any case, we're done with it now + ctx.destroySocket(client); + return reply; + } + // .split client task + // The client uses a Lazy Pirate strategy if it only has one server to talk + // to. If it has two or more servers to talk to, it will try each server just + // once: + + public static void main (String[] argv) + { + ZContext ctx = new ZContext(); + ZMsg request = new ZMsg(); + request.add("Hello world"); + ZMsg reply = null; + + int endpoints = argv.length; + if (endpoints == 0) + System.out.printf ("I: syntax: flclient1 ...\n"); + else + if (endpoints == 1) { + // For one endpoint, we retry N times + int retries; + for (retries = 0; retries < MAX_RETRIES; retries++) { + String endpoint = argv [0]; + reply = tryRequest(ctx, endpoint, request); + if (reply != null) + break; // Successful + System.out.printf("W: no response from %s, retrying...\n", endpoint); + } + } + else { + // For multiple endpoints, try each at most once + int endpointNbr; + for (endpointNbr = 0; endpointNbr < endpoints; endpointNbr++) { + String endpoint = argv[endpointNbr]; + reply = tryRequest (ctx, endpoint, request); + if (reply != null) + break; // Successful + System.out.printf ("W: no response from %s\n", endpoint); + } + } + if (reply != null) { + System.out.printf ("Service is running OK\n"); + reply.destroy(); + } + request.destroy();; + ctx.destroy(); + } + +} diff --git a/examples/Java/flclient2.java b/examples/Java/flclient2.java new file mode 100644 index 000000000..2d3f1159a --- /dev/null +++ b/examples/Java/flclient2.java @@ -0,0 +1,136 @@ +/* + * flclient2.java + * + * ------------------------------------------------------------------------- + * Copyright (c) 2012-2013 InfiniLoop Corporation + * Copyright other contributors as noted in the AUTHORS file. + * + * This file is part of Zyni, an open-source message based application framework. + * + * This is free software; you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the + * Free Software Foundation; either version 3 of the License, or (at your + * option) any later version. + * + * This software is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTA- + * BILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General + * Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + * ========================================================================= + */ + + +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.PollItem; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMsg; + +// Freelance client - Model 2 +// Uses DEALER socket to blast one or more services +public class flclient2 +{ + // If not a single service replies within this time, give up + private static final int GLOBAL_TIMEOUT = 2500; + + // .split class implementation + // Here is the {{flclient}} class implementation. Each instance has a + // context, a DEALER socket it uses to talk to the servers, a counter + // of how many servers it's connected to, and a request sequence number: + private ZContext ctx; // Our context wrapper + private Socket socket; // DEALER socket talking to servers + private int servers; // How many servers we have connected to + private int sequence; // Number of requests ever sent + + public flclient2() + { + ctx = new ZContext(); + socket = ctx.createSocket(ZMQ.DEALER); + } + + public void destroy() + { + ctx.destroy(); + } + + private void connect(String endpoint) + { + socket.connect(endpoint); + servers++; + } + + private ZMsg request(ZMsg request) + { + // Prefix request with sequence number and empty envelope + String sequenceText = String.format("%d", ++sequence); + request.push(sequenceText); + request.push(""); + + // Blast the request to all connected servers + int server; + for (server = 0; server < servers; server++) { + ZMsg msg = request.duplicate(); + msg.send(socket); + } + // Wait for a matching reply to arrive from anywhere + // Since we can poll several times, calculate each one + ZMsg reply = null; + long endtime = System.currentTimeMillis() + GLOBAL_TIMEOUT; + while (System.currentTimeMillis() < endtime) { + PollItem[] items = { new PollItem(socket, ZMQ.Poller.POLLIN) }; + ZMQ.poll(items, endtime - System.currentTimeMillis()); + if (items[0].isReadable()) { + // Reply is [empty][sequence][OK] + reply = ZMsg.recvMsg(socket); + assert (reply.size() == 3); + reply.pop(); + String sequenceStr = reply.popString(); + int sequenceNbr = Integer.parseInt(sequenceStr); + if (sequenceNbr == sequence) + break; + reply.destroy(); + } + } + request.destroy(); + return reply; + + } + + public static void main (String[] argv) + { + if (argv.length == 0) { + System.out.printf ("I: syntax: flclient2 ...\n"); + System.exit(0); + } + + // Create new freelance client object + flclient2 client = new flclient2(); + + // Connect to each endpoint + int argn; + for (argn = 0; argn < argv.length; argn++) + client.connect(argv[argn]); + + // Send a bunch of name resolution 'requests', measure time + int requests = 10000; + long start = System.currentTimeMillis(); + while (requests-- > 0) { + ZMsg request = new ZMsg(); + request.add("random name"); + ZMsg reply = client.request(request); + if (reply == null) { + System.out.printf("E: name service not available, aborting\n"); + break; + } + reply.destroy(); + } + System.out.printf ("Average round trip cost: %d usec\n", + (int) (System.currentTimeMillis() - start) / 10); + + client.destroy(); + } + +} diff --git a/examples/Java/flclient3.java b/examples/Java/flclient3.java new file mode 100644 index 000000000..def0ed094 --- /dev/null +++ b/examples/Java/flclient3.java @@ -0,0 +1,40 @@ +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.PollItem; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMsg; + +// Freelance client - Model 3 +// Uses flcliapi class to encapsulate Freelance pattern +public class flclient3 +{ + public static void main (String[] argv) + { + // Create new freelance client object + flcliapi client = new flcliapi(); + + // Connect to several endpoints + client.connect("tcp://localhost:5555"); + client.connect("tcp://localhost:5556"); + client.connect("tcp://localhost:5557"); + + // Send a bunch of name resolution 'requests', measure time + int requests = 10000; + long start = System.currentTimeMillis(); + while (requests-- > 0) { + ZMsg request = new ZMsg(); + request.add("random name"); + ZMsg reply = client.request(request); + if (reply == null) { + System.out.printf("E: name service not available, aborting\n"); + break; + } + reply.destroy(); + } + System.out.printf("Average round trip cost: %d usec\n", + (int) (System.currentTimeMillis() - start) / 10); + + client.destroy(); + } + +} diff --git a/examples/Java/flserver1.java b/examples/Java/flserver1.java new file mode 100644 index 000000000..9bd83d2c8 --- /dev/null +++ b/examples/Java/flserver1.java @@ -0,0 +1,32 @@ +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMsg; + +// Freelance server - Model 1 +// Trivial echo service +public class flserver1 +{ + public static void main(String[] args) + { + if (args.length < 1) { + System.out.printf("I: syntax: flserver1 \n"); + System.exit(0); + } + ZContext ctx = new ZContext(); + Socket server = ctx.createSocket(ZMQ.REP); + server.bind(args[0]); + + System.out.printf ("I: echo service is ready at %s\n", args[0]); + while (true) { + ZMsg msg = ZMsg.recvMsg(server); + if (msg == null) + break; // Interrupted + msg.send(server); + } + if (Thread.currentThread().isInterrupted()) + System.out.printf ("W: interrupted\n"); + + ctx.destroy(); + } +} diff --git a/examples/Java/flserver2.java b/examples/Java/flserver2.java new file mode 100644 index 000000000..9f19df019 --- /dev/null +++ b/examples/Java/flserver2.java @@ -0,0 +1,43 @@ +import org.zeromq.ZContext; +import org.zeromq.ZFrame; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMsg; + +// Freelance server - Model 2 +// Does some work, replies OK, with message sequencing +public class flserver2 +{ + public static void main(String[] args) + { + if (args.length < 1) { + System.out.printf("I: syntax: flserver2 \n"); + System.exit(0); + } + ZContext ctx = new ZContext(); + Socket server = ctx.createSocket(ZMQ.REP); + server.bind(args[0]); + + System.out.printf ("I: echo service is ready at %s\n", args[0]); + while (true) { + ZMsg request = ZMsg.recvMsg(server); + if (request == null) + break; // Interrupted + + // Fail nastily if run against wrong client + assert (request.size() == 2); + + ZFrame identity = request.pop(); + request.destroy(); + + ZMsg reply = new ZMsg(); + reply.add(identity); + reply.add("OK"); + reply.send(server); + } + if (Thread.currentThread().isInterrupted()) + System.out.printf ("W: interrupted\n"); + + ctx.destroy(); + } +} diff --git a/examples/Java/flserver3.java b/examples/Java/flserver3.java new file mode 100644 index 000000000..e6971541f --- /dev/null +++ b/examples/Java/flserver3.java @@ -0,0 +1,55 @@ +import org.zeromq.ZContext; +import org.zeromq.ZFrame; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMsg; + +// Freelance server - Model 3 +// Uses an ROUTER/ROUTER socket but just one thread +public class flserver3 +{ + public static void main(String[] args) + { + boolean verbose = (args.length > 0 && args[0].equals("-v")); + + ZContext ctx = new ZContext(); + // Prepare server socket with predictable identity + String bindEndpoint = "tcp://*:5555"; + String connectEndpoint = "tcp://localhost:5555"; + Socket server = ctx.createSocket(ZMQ.ROUTER); + server.setIdentity(connectEndpoint.getBytes()); + server.bind(bindEndpoint); + System.out.printf ("I: service is ready at %s\n", bindEndpoint); + + while (!Thread.currentThread().isInterrupted()) { + ZMsg request = ZMsg.recvMsg(server); + if (verbose && request != null) + request.dump(System.out); + + if (request == null) + break; // Interrupted + + // Frame 0: identity of client + // Frame 1: PING, or client control frame + // Frame 2: request body + ZFrame identity = request.pop(); + ZFrame control = request.pop(); + ZMsg reply = new ZMsg(); + if (control.equals("PING")) + reply.add("PONG"); + else { + reply.add(control); + reply.add("OK"); + } + request.destroy(); + reply.push(identity); + if (verbose && reply != null) + reply.dump(System.out); + reply.send(server); + } + if (Thread.currentThread().isInterrupted()) + System.out.printf ("W: interrupted\n"); + + ctx.destroy(); + } +}