From b03ec06c639933bf48be01e52814cc74c4be2387 Mon Sep 17 00:00:00 2001 From: Richard Smith Date: Fri, 26 Aug 2011 15:46:19 +0100 Subject: [PATCH] Added Binary Star examples Added Binary Star and Binary Star Reactor examples. Also added Tripping example (sync vs synch test) --- examples/Haxe/BStar.hx | 368 +++++++++++++++++++++++++++++++++++++ examples/Haxe/BStarCli.hx | 87 +++++++++ examples/Haxe/BStarSrv.hx | 229 +++++++++++++++++++++++ examples/Haxe/BStarSrv2.hx | 57 ++++++ examples/Haxe/MDClient2.hx | 2 + examples/Haxe/MDWorker.hx | 4 +- examples/Haxe/MMIEcho.hx | 3 + examples/Haxe/PPWorker.hx | 2 + examples/Haxe/Run.hx | 29 ++- examples/Haxe/TIClient.hx | 3 + examples/Haxe/Titanic.hx | 2 + examples/Haxe/Tripping.hx | 125 +++++++++++++ 12 files changed, 902 insertions(+), 9 deletions(-) create mode 100644 examples/Haxe/BStar.hx create mode 100644 examples/Haxe/BStarCli.hx create mode 100644 examples/Haxe/BStarSrv.hx create mode 100644 examples/Haxe/BStarSrv2.hx create mode 100644 examples/Haxe/Tripping.hx diff --git a/examples/Haxe/BStar.hx b/examples/Haxe/BStar.hx new file mode 100644 index 000000000..e417f7a72 --- /dev/null +++ b/examples/Haxe/BStar.hx @@ -0,0 +1,368 @@ +/** + * (c) 2011 Richard J Smith + * + * Based on implementation of bstar in the ZeroMQ ZGuide: + * http://github.com/imatix/zguide/blob/master/examples/C/bstar.c + * + * This file is part of ZGuide + * + * ZGuide is free software; you can redistribute it and/or modify it under + * the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * ZGuide is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Lesser GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with this program. If not, see . + */ + +package ; + +import haxe.io.Bytes; +import haxe.Stack; +import neko.Sys; +import neko.Lib; +import org.zeromq.ZContext; +import org.zeromq.ZLoop; +import org.zeromq.ZMQ; +import org.zeromq.ZMQPoller; +import org.zeromq.ZMQException; +import org.zeromq.ZMQSocket; +import org.zeromq.ZMsg; +import org.zeromq.ZSocket; + +// States we can be in at any time +private enum StateT { + STATE_PRIMARY; // Primary, waiting for peer to connect + STATE_BACKUP; // Backup, waiting for peer to connect + STATE_ACTIVE; // Active - accepting connections + STATE_PASSIVE; // Passive - not accepting connections +} + +private enum EventT { + PEER_PRIMARY; // HA peer is pending primary + PEER_BACKUP; // HA peer is pending backup + PEER_ACTIVE; // HA peer is active + PEER_PASSIVE; // HA peer is passive + CLIENT_REQUEST; // Client makes request +} + +/** + * Shortcut typedef for method signature of BStar reactor handler functions + */ +typedef HandlerFunctionType = ZLoop->ZMQSocket->Dynamic->Int; + +/** + * Binary Star Reactor + */ +class BStar +{ + + /** We send state information every this often + * If peer doesn't respond in two heartbeats, it is 'dead' + */ + private static inline var BSTAR_HEARTBEAT = 100; + + /** Our context */ + private var ctx:ZContext; + + /** Reactor loop */ + public var loop(default, null):ZLoop; + + /** State publisher socket */ + private var statePub:ZMQSocket; + + /** State subscriber socket */ + private var stateSub:ZMQSocket; + + /** Current state */ + public var state(default, null):StateT; + + /** Current event */ + public var event(default, null):EventT; + + /** When peer is considered 'dead' */ + public var peerExpiry:Float; + + /** Voting socket handler */ + private var voterFn:HandlerFunctionType; + + /** Arguments for voting handler */ + private var voterArgs:Dynamic; + + /** Master socket handler, called when become Master */ + private var masterFn:HandlerFunctionType; + + /** Arguments for Master handler */ + private var masterArgs:Dynamic; + + /** Slave socket handler, called when become Slave */ + private var slaveFn:HandlerFunctionType; + + /** Arguments for slave handler */ + private var slaveArgs:Dynamic; + + /** Print activity to stdout */ + public var verbose:Bool; + + /** Logger function used in verbose mode */ + private var log:Dynamic->Void; + + /** + * BStar Constructor + * @param isPrimary True if this instance is the primary instance, else false if slave + * @param local Network address to bind the statePub socket of this instance to + * @param remote Network address to connect the stateSub socket of this instance to + * @param ?verbose True to generate logging info + * @param ?logger Logger function + */ + public function new(isPrimary:Bool, local:String, remote:String, ?verbose:Bool = false, ?logger:Dynamic->Void) + { + // Initialise the binary star server + ctx = new ZContext(); + loop = new ZLoop(logger); + loop.verbose = verbose; + state = { if (isPrimary) STATE_PRIMARY; else STATE_BACKUP; }; + + // Create publisher for state going to peer + statePub = ctx.createSocket(ZMQ_PUB); + statePub.bind(local); + + // Create subscriber for state coming from peer + stateSub = ctx.createSocket(ZMQ_SUB); + stateSub.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("")); + stateSub.connect(remote); + + // Set up basic reactor events + loop.registerTimer(BSTAR_HEARTBEAT, 0, sendState); + var item = { socket: stateSub, event: ZMQ.ZMQ_POLLIN() }; + loop.registerPoller(item, recvState); + + this.verbose = verbose; + if (logger != null) + log = logger; + else + log = Lib.println; + + } + + /** + * Destructor + * Cleans up internal ZLoop reactor object and ZContext objects. + */ + public function destroy() { + if (loop != null) + loop.destroy(); + if (ctx != null) + ctx.destroy(); + } + + /** + * Create socket, bind to local endpoint, and register as reader for + * voting. The socket will only be available if the Binary Star state + * machine allows it. Input on the socket will act as a "vote" in the + * Binary Star scheme. We require exactly one voter per bstar instance. + * + * @param endpoint Endpoint address + * @param type Socket Type to bind to endpoint + * @param handler Voter Handler method + * @param args Optional args to pass to Voter Handfler method when called. + * @return + */ + public function setVoter(endpoint:String, type:SocketType, handler:HandlerFunctionType, ?args:Dynamic):Bool { + // Hold actual handler + arg so we can call this later + var socket = ctx.createSocket(type); + socket.bind(endpoint); + voterFn = handler; + voterArgs = args; + return loop.registerPoller( { socket:socket, event:ZMQ.ZMQ_POLLIN() }, voterReady); + } + + /** + * Sets handler method called when instance becomes Master + * @param handler + * @param ?args + */ + public function setMaster(handler:HandlerFunctionType, ?args:Dynamic) { + if (masterFn == null) { + masterFn = handler; + masterArgs = args; + } + } + + /** + * Sets handler method called when instance becomes Slave + * @param handler + * @param ?args + */ + public function setSlave(handler:HandlerFunctionType, ?args:Dynamic) { + if (slaveFn == null) { + slaveFn = handler; + slaveArgs = args; + } + } + + + /** + * Executes finite state machine (apply event to this state) + * Returns true if there was an exception + * @return + */ + public function stateMachine():Bool + { + var exception = false; + switch (state) { + case STATE_PRIMARY: + // Primary server is waiting for peer to connect + // Accepts CLIENT_REQUEST events in this state + switch (event) { + case PEER_BACKUP: + if (verbose) + log("I: connected to backup (slave), ready as master"); + state = STATE_ACTIVE; + if (masterFn != null) + masterFn(loop, null, masterArgs); + case PEER_ACTIVE: + if (verbose) + log("I: connected to backup (master), ready as slave"); + state = STATE_PASSIVE; + if (slaveFn != null) + slaveFn(loop, null, slaveArgs); + case CLIENT_REQUEST: + if (verbose) + log("I: request from client, ready as master"); + state = STATE_ACTIVE; + if (masterFn != null) + masterFn(loop, null, masterArgs); + default: + } + case STATE_BACKUP: + // Backup server is waiting for peer to connect + // Rejects CLIENT_REQUEST events in this state + switch (event) { + case PEER_ACTIVE: + if (verbose) + log("I: connected to primary (master), ready as slave"); + state = STATE_PASSIVE; + if (slaveFn != null) + slaveFn(loop, null, slaveArgs); + case CLIENT_REQUEST: + exception = true; + default: + } + case STATE_ACTIVE: + // Server is active + // Accepts CLIENT_REQUEST events in this state + switch (event) { + case PEER_ACTIVE: + // Two masters would mean split-brain + log("E: fatal error - dual masters, aborting"); + exception = true; + default: + } + case STATE_PASSIVE: + // Server is passive + // CLIENT_REQUEST events can trigger failover if peer looks dead + switch (event) { + case PEER_PRIMARY: + // Peer is restarting - I become active, peer will go passive + if (verbose) + log("I: primary (slave) is restarting, ready as master"); + state = STATE_ACTIVE; + case PEER_BACKUP: + // Peer is restarting - become active, peer will go passive + if (verbose) + log("I: backup (slave) is restarting, ready as master"); + state = STATE_ACTIVE; + case PEER_PASSIVE: + // Two passives would mean cluster would be non-responsive + log("E: fatal error - dual slaves, aborting"); + exception = true; + case CLIENT_REQUEST: + // Peer becomes master if timeout as passed + // It's the client request that triggers the failover + if (Date.now().getTime() >= peerExpiry) { + // If peer is dead, switch to the active state + if (verbose) + log("I: failover successful, ready as master"); + state = STATE_ACTIVE; + } else { + if (verbose) + log("I: peer is active, so ignore connection"); + exception = true; + } + default: + } + } + return exception; + } + + /** + * Reactor event handler + * Publish our state to peer + * @param loop + * @param socket + * @param arg + * @return + */ + public function sendState(loop:ZLoop, socket:ZMQSocket):Int { + statePub.sendMsg(Bytes.ofString(Std.string(Type.enumIndex(state)))); + return 0; + } + + /** + * Reactor event handler + * Receive state from peer, execute finite state machine. + * @param loop + * @param socket + * @return + */ + public function recvState(loop:ZLoop, socket:ZMQSocket):Int { + var message = stateSub.recvMsg().toString(); + event = Type.createEnumIndex(EventT, Std.parseInt(message)); + peerExpiry = Date.now().getTime() + (2 * BSTAR_HEARTBEAT); + return { + if (stateMachine()) + -1; // Error, so exit + else + 0; + }; + } + + /** + * Application wants to speak to us, see if it's possible + * @param loop + * @param socket + * @return + */ + public function voterReady(loop:ZLoop, socket:ZMQSocket):Int { + // If server can accept input now, call application handler + event = CLIENT_REQUEST; + if (stateMachine()) { + // Destroy waiting message, no-one to read it + var msg = socket.recvMsg(); + } else { + if (verbose) + log("I: CLIENT REQUEST"); + voterFn(loop, socket, voterArgs); + } + return 0; + } + + /** + * Start the reactor, ends if a callback function returns -1, or the + * process receives SIGINT or SIGTERM + * @return 0 if interrupted or invalid, -1 if cancelled by handler + */ + public function start():Int { + if (voterFn != null && loop != null) + return loop.start(); + else + return 0; + } + +} \ No newline at end of file diff --git a/examples/Haxe/BStarCli.hx b/examples/Haxe/BStarCli.hx new file mode 100644 index 000000000..b4fb1d41a --- /dev/null +++ b/examples/Haxe/BStarCli.hx @@ -0,0 +1,87 @@ +package ; +import haxe.Stack; +import neko.Lib; +import neko.Sys; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQPoller; +import org.zeromq.ZMsg; +import org.zeromq.ZMQException; + +/** + * Binary Star Client + * @author Richard J Smith + * + * @see http://zguide.zeromq.org/page:all#Binary-Star-Implementation + */ + +class BStarCli +{ + + private static inline var REQUEST_TIMEOUT = 1000; // msecs + private static inline var SETTLE_DELAY = 2000; // Before failing over + + public static function main() + { + Lib.println("** BStarCli (see: http://zguide.zeromq.org/page:all#Binary-Star-Implementation)"); + + var ctx = new ZContext(); + var server = ["tcp://localhost:5001", "tcp://localhost:5002"]; + var server_nbr = 0; + + Lib.println("I: connecting to server at " + server[server_nbr]); + var client = ctx.createSocket(ZMQ_REQ); + client.connect(server[server_nbr]); + + var sequence = 0; + var poller = new ZMQPoller(); + poller.registerSocket(client, ZMQ.ZMQ_POLLIN()); + while (!ZMQ.isInterrupted()) { + // We send a request, then we work to get a reply + var request = Std.string(++sequence); + ZMsg.newStringMsg(request).send(client); + + var expectReply = true; + while (expectReply) { + // Poll socket for a reply, with timeout + try { + var res = poller.poll(REQUEST_TIMEOUT * 1000); // Convert timeout to microseconds + } catch (e:ZMQException) { + if (!ZMQ.isInterrupted()) { + trace("ZMQException #:" + e.errNo + ", str:" + e.str()); + trace (Stack.toString(Stack.exceptionStack())); + } else { + Lib.println("W: interrupt received, killing client..."); + } + ctx.destroy(); + return; + } + + if (poller.pollin(1)) { + // We got a reply from the server, must match sequence + var reply = client.recvMsg().toString(); + if (reply != null && Std.parseInt(reply) == sequence) { + Lib.println("I: server replied OK (" + reply + ")"); + expectReply = false; + Sys.sleep(1.0); // One request per second + } else + Lib.println("E: malformed reply from server: " + reply); + } else { + Lib.println("W: no response from server, failing over"); + // Old socket is confused; close it and open a new one + ctx.destroySocket(client); + server_nbr = (server_nbr + 1) % 2; + Sys.sleep(SETTLE_DELAY / 1000); + Lib.println("I: connecting to server at " + server[server_nbr]); + client = ctx.createSocket(ZMQ_REQ); + client.connect(server[server_nbr]); + + poller.unregisterAllSockets(); + poller.registerSocket(client, ZMQ.ZMQ_POLLIN()); + ZMsg.newStringMsg(request).send(client); + } + } + } + ctx.destroy(); + } +} \ No newline at end of file diff --git a/examples/Haxe/BStarSrv.hx b/examples/Haxe/BStarSrv.hx new file mode 100644 index 000000000..67f21b3b1 --- /dev/null +++ b/examples/Haxe/BStarSrv.hx @@ -0,0 +1,229 @@ +package ; +import haxe.io.Bytes; +import haxe.Stack; +import neko.Sys; +import neko.Lib; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQPoller; +import org.zeromq.ZMQException; +import org.zeromq.ZMsg; +import org.zeromq.ZSocket; + + +/** + * Binary Star Server + * @author Richard J Smith + * + * @see http://zguide.zeromq.org/page:all#Binary-Star-Implementation + */ + +class BStarSrv +{ + + private static inline var HEARTBEAT = 100; + + /** Current state */ + public var state:StateT; + + /** Current event */ + public var event:EventT; + + /** When peer is considered 'dead' */ + public var peerExpiry:Float; + + /** + * BStarSrv constructor + * @param state Initial state + */ + public function new(state:StateT) { + this.state = state; + } + + /** + * Main binary star server loop + */ + public function run() { + var ctx = new ZContext(); + var statePub = ctx.createSocket(ZMQ_PUB); + var stateSub = ctx.createSocket(ZMQ_SUB); + var frontend = ctx.createSocket(ZMQ_ROUTER); + + switch (state) { + case STATE_PRIMARY: + Lib.println("I: primary master, waiting for backup (slave)"); + frontend.bind("tcp://*:5001"); + statePub.bind("tcp://*:5003"); + stateSub.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("")); + stateSub.connect("tcp://localhost:5004"); + case STATE_BACKUP: + Lib.println("I: backup slave, waiting for primary (master)"); + frontend.bind("tcp://*:5002"); + statePub.bind("tcp://*:5004"); + stateSub.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString("")); + stateSub.connect("tcp://localhost:5003"); + default: + ctx.destroy(); + return; + } + // Set timer for next outgoing state message + var sendStateAt = Date.now().getTime() + HEARTBEAT; + + var poller = new ZMQPoller(); + poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN()); + poller.registerSocket(stateSub, ZMQ.ZMQ_POLLIN()); + + while (!ZMQ.isInterrupted()) { + var timeLeft = Std.int(sendStateAt - Date.now().getTime()); + if (timeLeft < 0) + timeLeft = 0; + try { + var res = poller.poll(timeLeft * 1000); // Convert timeout to microseconds + } catch (e:ZMQException) { + if (!ZMQ.isInterrupted()) { + trace("ZMQException #:" + e.errNo + ", str:" + e.str()); + trace (Stack.toString(Stack.exceptionStack())); + } else { + Lib.println("W: interrupt received, killing server..."); + } + ctx.destroy(); + return; + } + if (poller.pollin(1)) { + // Have a client request + var msg = ZMsg.recvMsg(frontend); + event = CLIENT_REQUEST; + if (!stateMachine()) + // Answer client by echoing request back + msg.send(frontend); // Pretend do some work and then reply + else + msg.destroy(); + } + if (poller.pollin(2)) { + // Have state from our peer, execute as event + var message = stateSub.recvMsg().toString(); + event = Type.createEnumIndex(EventT, Std.parseInt(message)); + if (stateMachine()) + break; // Error, so exit + peerExpiry = Date.now().getTime() + (2 * HEARTBEAT); + } + // If we timed-out, send state to peer + if (Date.now().getTime() >= sendStateAt) { + statePub.sendMsg(Bytes.ofString(Std.string(Type.enumIndex(state)))); + sendStateAt = Date.now().getTime() + HEARTBEAT; + } + } + ctx.destroy(); + } + + /** + * Executes finite state machine (apply event to this state) + * Returns true if there was an exception + * @return + */ + public function stateMachine():Bool + { + var exception = false; + switch (state) { + case STATE_PRIMARY: + // Primary server is waiting for peer to connect + // Accepts CLIENT_REQUEST events in this state + switch (event) { + case PEER_BACKUP: + Lib.println("I: connected to backup (slave), ready as master"); + state = STATE_ACTIVE; + case PEER_ACTIVE: + Lib.println("I: connected to backup (master), ready as slave"); + state = STATE_PASSIVE; + default: + } + case STATE_BACKUP: + // Backup server is waiting for peer to connect + // Rejects CLIENT_REQUEST events in this state + switch (event) { + case PEER_ACTIVE: + Lib.println("I: connected to primary (master), ready as slave"); + state = STATE_PASSIVE; + case CLIENT_REQUEST: + exception = true; + default: + } + case STATE_ACTIVE: + // Server is active + // Accepts CLIENT_REQUEST events in this state + switch (event) { + case PEER_ACTIVE: + // Two masters would mean split-brain + Lib.println("E: fatal error - dual masters, aborting"); + exception = true; + default: + } + case STATE_PASSIVE: + // Server is passive + // CLIENT_REQUEST events can trigger failover if peer looks dead + switch (event) { + case PEER_PRIMARY: + // Peer is restarting - become active, peer will go passive + Lib.println("I: primary (slave) is restarting, ready as master"); + state = STATE_ACTIVE; + case PEER_BACKUP: + // Peer is restarting - become active, peer will go passive + Lib.println("I: backup (slave) is restarting, ready as master"); + state = STATE_ACTIVE; + case PEER_PASSIVE: + // Two passives would mean cluster would be non-responsive + Lib.println("E: fatal error - dual slaves, aborting"); + exception = true; + case CLIENT_REQUEST: + // Peer becomes master if timeout as passed + // It's the client request that triggers the failover + if (Date.now().getTime() >= peerExpiry) { + // If peer is dead, switch to the active state + Lib.println("I: failover successful, ready as master"); + state = STATE_ACTIVE; + } else { + Lib.println("I: peer is active, so ignore connection"); + exception = true; + } + default: + } + } + return exception; + } + + public static function main() { + + Lib.println("** BStarSrv (see: http://zguide.zeromq.org/page:all#Binary-Star-Implementation)"); + + var state:StateT = null; + var argArr = Sys.args(); + if (argArr.length > 1 && argArr[argArr.length - 1] == "-p") { + state = STATE_PRIMARY; + } else if (argArr.length > 1 && argArr[argArr.length - 1] == "-b") { + state = STATE_BACKUP; + } else { + Lib.println("Usage: bstartsrv { -p | -b }"); + return; + } + + var bstarServer = new BStarSrv(state); + bstarServer.run(); + } + +} + +// States we can be in at any time +private enum StateT { + STATE_PRIMARY; // Primary, waiting for peer to connect + STATE_BACKUP; // Backup, waiting for peer to connect + STATE_ACTIVE; // Active - accepting connections + STATE_PASSIVE; // Passive - not accepting connections +} + +private enum EventT { + PEER_PRIMARY; // HA peer is pending primary + PEER_BACKUP; // HA peer is pending backup + PEER_ACTIVE; // HA peer is active + PEER_PASSIVE; // HA peer is passive + CLIENT_REQUEST; // Client makes request +} diff --git a/examples/Haxe/BStarSrv2.hx b/examples/Haxe/BStarSrv2.hx new file mode 100644 index 000000000..36f41c041 --- /dev/null +++ b/examples/Haxe/BStarSrv2.hx @@ -0,0 +1,57 @@ +package ; + +/** + * Binary Star server, using BStar reactor class + * @author Richard J Smith + * + * See: http://zguide.zeromq.org/page:all#Binary-Star-Reactor + */ + +import BStar; +import neko.Lib; +import neko.Sys; +import org.zeromq.ZLoop; +import org.zeromq.ZMQ; +import org.zeromq.ZMQSocket; +import org.zeromq.ZMsg; + +class BStarSrv2 +{ + + /** + * Echo service + * @param loop + * @param socket + * @return + */ + private static function echo(loop:ZLoop, socket:ZMQSocket, args:Dynamic):Int { + var msg = ZMsg.recvMsg(socket); + msg.send(socket); + return 0; + } + + public static function main() { + Lib.println("** BStarSrv2 (see: http://zguide.zeromq.org/page:all#Binary-Star-Reactor)"); + + var bstar:BStar = null; + + // Arguments can be either of: + // -p primary server, at tcp://localhost:5001 + // -b backup server, at tcp://localhost:5002 + var argArr = Sys.args(); + if (argArr.length > 1 && argArr[argArr.length - 1] == "-p") { + Lib.println("I: primary master, waiting for backup (slave)"); + bstar = new BStar(true, "tcp://*:5003", "tcp://localhost:5004", true, null); + bstar.setVoter("tcp://*:5001", ZMQ_ROUTER, echo); + } else if (argArr.length > 1 && argArr[argArr.length - 1] == "-b") { + Lib.println("I: backup slave, waiting for primary (master)"); + bstar = new BStar(false, "tcp://*:5004", "tcp://localhost:5003", true, null); + bstar.setVoter("tcp://*:5002", ZMQ_ROUTER, echo); + } else { + Lib.println("Usage: bstartsrv2 { -p | -b }"); + return; + } + bstar.start(); + bstar.destroy(); + } +} \ No newline at end of file diff --git a/examples/Haxe/MDClient2.hx b/examples/Haxe/MDClient2.hx index f55d5f32c..ce681287c 100644 --- a/examples/Haxe/MDClient2.hx +++ b/examples/Haxe/MDClient2.hx @@ -14,6 +14,8 @@ class MDClient2 { public static function main() { + Lib.println("** MDClient2 (see: http://zguide.zeromq.org/page:all#Asynchronous-Majordomo-Pattern)"); + var argArr = Sys.args(); var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v"); diff --git a/examples/Haxe/MDWorker.hx b/examples/Haxe/MDWorker.hx index ce002cf0a..baf4f9e6b 100644 --- a/examples/Haxe/MDWorker.hx +++ b/examples/Haxe/MDWorker.hx @@ -13,7 +13,9 @@ class MDWorker { public static function main() { - var argArr = Sys.args(); + Lib.println("** MDWorker (see: http://zguide.zeromq.org/page:all#Service-Oriented-Reliable-Queuing-Majordomo-Pattern)"); + + var argArr = Sys.args(); var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v"); var session = new MDWrkAPI("tcp://localhost:5555", "echo", verbose); diff --git a/examples/Haxe/MMIEcho.hx b/examples/Haxe/MMIEcho.hx index 957890359..2677b176e 100644 --- a/examples/Haxe/MMIEcho.hx +++ b/examples/Haxe/MMIEcho.hx @@ -11,6 +11,9 @@ class MMIEcho { public static function main() { + + Lib.println("** MMIEcho (see: http://zguide.zeromq.org/page:all#Service-Discovery)"); + var argArr = Sys.args(); var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v"); diff --git a/examples/Haxe/PPWorker.hx b/examples/Haxe/PPWorker.hx index 56238c33f..e69b73e82 100644 --- a/examples/Haxe/PPWorker.hx +++ b/examples/Haxe/PPWorker.hx @@ -46,6 +46,8 @@ class PPWorker } public static function main() { + Lib.println("** PPWorker (see: http://zguide.zeromq.org/page:all#Robust-Reliable-Queuing-Paranoid-Pirate-Pattern)"); + var ctx = new ZContext(); var worker = workerSocket(ctx); diff --git a/examples/Haxe/Run.hx b/examples/Haxe/Run.hx index 35d46c697..e95273d6b 100644 --- a/examples/Haxe/Run.hx +++ b/examples/Haxe/Run.hx @@ -86,10 +86,15 @@ class Run Lib.println("45. MDClient"); Lib.println("46. MDWorker"); Lib.println("47. MDBroker"); - Lib.println("48. MDClient2"); - Lib.println("49. MMIEcho"); - Lib.println("50. TIClient"); - Lib.println("51. Titanic"); + Lib.println("48. Tripping"); + Lib.println("49. MDClient2"); + Lib.println("50. MMIEcho"); + Lib.println("51. TIClient"); + Lib.println("52. Titanic"); + Lib.println(""); + Lib.println("53. BStarSrv"); + Lib.println("54. BStarCli"); + Lib.println("55. BStarSrv2"); do { Lib.print("Type number followed by Enter key, or q to quit: "); @@ -194,15 +199,23 @@ class Run case 47: MDBroker.main(); case 48: - MDClient2.main(); + Tripping.main(); case 49: - MMIEcho.main(); + MDClient2.main(); case 50: - TIClient.main(); + MMIEcho.main(); case 51: + TIClient.main(); + case 52: Titanic.main(); + case 53: + BStarSrv.main(); + case 54: + BStarCli.main(); + case 55: + BStarSrv2.main(); default: - Lib.println ("Unknown program number ... exiting"); + Lib.println ("Unknown program number ... exiting"); } } diff --git a/examples/Haxe/TIClient.hx b/examples/Haxe/TIClient.hx index 403a39d2f..14a17d649 100644 --- a/examples/Haxe/TIClient.hx +++ b/examples/Haxe/TIClient.hx @@ -28,6 +28,9 @@ class TIClient public static function main() { + + Lib.println("** TIClient (see: http://zguide.zeromq.org/page:all#Disconnected-Reliability-Titanic-Pattern)"); + var argArr = Sys.args(); var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v"); diff --git a/examples/Haxe/Titanic.hx b/examples/Haxe/Titanic.hx index c5668239c..057f4be5c 100644 --- a/examples/Haxe/Titanic.hx +++ b/examples/Haxe/Titanic.hx @@ -39,6 +39,8 @@ class Titanic * Main method */ public static function main() { + Lib.println("** Titanic (see: http://zguide.zeromq.org/page:all#Disconnected-Reliability-Titanic-Pattern)"); + var argArr = Sys.args(); var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v"); var log = Lib.println; diff --git a/examples/Haxe/Tripping.hx b/examples/Haxe/Tripping.hx new file mode 100644 index 000000000..1efbc25ae --- /dev/null +++ b/examples/Haxe/Tripping.hx @@ -0,0 +1,125 @@ +package ; + +import haxe.Stack; +import haxe.io.Bytes; +import neko.Lib; +import neko.Sys; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; +import org.zeromq.ZMQPoller; +import org.zeromq.ZMQSocket; +import org.zeromq.ZMsg; +import org.zeromq.ZMQException; +import org.zeromq.ZThread; + +/** + * Round-trip demonstrator + * + * While this example runs in a single process, that is just to make + * it easier to start and stop the example. Client thread signals to + * main when it's ready. + * + * @author Richard J Smith + */ + +class Tripping +{ + + private static function clientTask(ctx:ZContext, pipe:ZMQSocket, ?args:Dynamic) { + var hello = Bytes.ofString("hello"); + var client = ctx.createSocket(ZMQ_DEALER); + client.setsockopt(ZMQ_IDENTITY, Bytes.ofString("C")); + client.connect("tcp://localhost:5555"); + + Lib.println("Setting up test..."); + Sys.sleep(0.1); // 100 msecs + + var start:Float = 0.0; + + Lib.println("Synchronous round-trip test..."); + start = Date.now().getTime(); + for (requests in 0 ... 10000) { + client.sendMsg(hello); + var reply = client.recvMsg(); + } + Lib.println(" " + Std.int((1000.0 * 10000.0) / (Date.now().getTime() - start)) + " calls/second"); + + Lib.println("Asynchronous round-trip test..."); + start = Date.now().getTime(); + for (requests in 0 ... 10000) + client.sendMsg(hello); + for (requests in 0 ... 10000) { + var reply = client.recvMsg(); + } + Lib.println(" " + Std.int((1000.0 * 10000.0) / (Date.now().getTime() - start)) + " calls/second"); + + pipe.sendMsg(Bytes.ofString("done")); + } + + private static function workerTask(?args:Dynamic) { + var ctx = new ZContext(); + var worker = ctx.createSocket(ZMQ_DEALER); + worker.setsockopt(ZMQ_IDENTITY, Bytes.ofString("W")); + worker.connect("tcp://localhost:5556"); + + while (true) { + var msg = ZMsg.recvMsg(worker); + msg.send(worker); + } + ctx.destroy(); + } + + private static function brokerTask(?args:Dynamic) { + // Prepare our contexts and sockets + var ctx = new ZContext(); + var frontend = ctx.createSocket(ZMQ_ROUTER); + var backend = ctx.createSocket(ZMQ_ROUTER); + frontend.bind("tcp://*:5555"); + backend.bind("tcp://*:5556"); + + // Initialise pollset + var poller = new ZMQPoller(); + poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN()); + poller.registerSocket(backend, ZMQ.ZMQ_POLLIN()); + + while (true) { + try { + poller.poll(-1); + } catch (e:ZMQException) { + if (ZMQ.isInterrupted()) { + ctx.destroy(); + return; + } + trace("ZMQException #:" + e.errNo + ", str:" + e.str()); + trace (Stack.toString(Stack.exceptionStack())); + } + if (poller.pollin(1)) { + var msg = ZMsg.recvMsg(frontend); + var address = msg.pop(); + msg.pushString("W"); + msg.send(backend); + } + if (poller.pollin(2)) { + var msg = ZMsg.recvMsg(backend); + var address = msg.pop(); + msg.pushString("C"); + msg.send(frontend); + } + } + } + + public static function main() { + Lib.println("** Tripping (see: http://zguide.zeromq.org/page:all#Asynchronous-Majordomo-Pattern)"); + + // Create threads + var ctx = new ZContext(); + var client = ZThread.attach(ctx, clientTask, null); + ZThread.detach(workerTask, null); + ZThread.detach(brokerTask, null); + + // Wait for signal on client pipe + var signal = client.recvMsg(); + + ctx.destroy(); + } +} \ No newline at end of file