Skip to content

Commit

Permalink
Added Majordomo and Titanic pattern haXe examples
Browse files Browse the repository at this point in the history
Committed MD****.hx and TI*****.hx class files.   Tested on cpp and neko
targets on Mac64, Linux32 and WinXP
  • Loading branch information
rjsmith committed Aug 17, 2011
1 parent 11b735d commit 8eb633c
Show file tree
Hide file tree
Showing 9 changed files with 785 additions and 4 deletions.
4 changes: 2 additions & 2 deletions examples/Haxe/MDBroker.hx
Expand Up @@ -494,8 +494,8 @@ private class Broker {

// Set reply return address to client sender
msg.wrap(sender.duplicate());
if ( sender.size() >= 4 // Reserved for service name
&& sender.toString().indexOf("mmi.") == 0)
if ( serviceFrame.size() >= 4 // Reserved for service name
&& serviceFrame.toString().indexOf("mmi.") == 0)
internalService(serviceFrame, msg);
else
dispatchService(service, msg);
Expand Down
2 changes: 1 addition & 1 deletion examples/Haxe/MDCliAPI.hx
Expand Up @@ -90,7 +90,7 @@ class MDCliAPI
client.setsockopt(ZMQ_LINGER, 0);
client.connect(broker);
if (verbose)
log("I: connecting to broker at " + broker + "...");
log("I: client connecting to broker at " + broker + "...");
}

/**
Expand Down
172 changes: 172 additions & 0 deletions examples/Haxe/MDCliAPI2.hx
@@ -0,0 +1,172 @@
/**
* (c) 2011 Richard J Smith
*
* This file is part of ZGuide
*
* ZGuide is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* ZGuide is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package ;
import haxe.Stack;
import neko.Lib;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQException;

import MDP;

/**
* Majordomo Protocol Client API (async version)
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7
*/
class MDCliAPI2
{

/** Request timeout (in msec) */
public var timeout:Int;

// Private instance fields

/** Our context */
private var ctx:ZContext;

/** Connection string to reach broker */
private var broker:String;

/** Socket to broker */
private var client:ZMQSocket;

/** Print activity to stdout */
private var verbose:Bool;

/** Logger function used in verbose mode */
private var log:Dynamic->Void;


/**
* Constructor
* @param broker
* @param verbose
*/
public function new(broker:String, ?verbose:Bool=false, ?logger:Dynamic->Void) {
ctx = new ZContext();
this.broker = broker;
this.verbose = verbose;
this.timeout = 2500; // msecs
if (logger != null)
log = logger;
else
log = Lib.println;

connectToBroker();
}

/**
* Connect or reconnect to broker
*/
public function connectToBroker() {
if (client != null)
client.close();
client = ctx.createSocket(ZMQ_DEALER);
client.setsockopt(ZMQ_LINGER, 0);
client.connect(broker);
if (verbose)
log("I: connecting to broker at " + broker + "...");
}

/**
* Destructor
*/
public function destroy() {
ctx.destroy();
}

/**
* Send request to broker
* Takes ownership of request message and destroys it when sent.
* @param service
* @param request
* @return
*/
public function send(service:String, request:ZMsg) {
// Prefix request with MDP protocol frames
// Frame 0: empty (REQ socket emulation)
// Frame 1: "MDPCxy" (six bytes, MDP/Client)
// Frame 2: Service name (printable string)
request.push(ZFrame.newStringFrame(service));
request.push(ZFrame.newStringFrame(MDP.MDPC_CLIENT));
request.push(ZFrame.newStringFrame(""));
if (verbose) {
log("I: send request to '" + service + "' service:");
log(request.toString());
}
request.send(client);
}

/**
* Returns the reply message or NULL if there was no reply. Does not
* attempt to recover from a broker failure, this is not possible
* without storing all answered requests and re-sending them all...
* @return
*/
public function recv():ZMsg {
var poller = new ZMQPoller();

poller.registerSocket(client, ZMQ.ZMQ_POLLIN());
// Poll socket for a reply, with timeout
try {
var res = poller.poll(timeout * 1000);
} catch (e:ZMQException) {
if (!ZMQ.isInterrupted()) {
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
} else
log("W: interrupt received, killing client...");
ctx.destroy();
return null;
}
// If we got a reply, process it
if (poller.pollin(1)) {
// We got a reply from the server, must match sequence
var msg = ZMsg.recvMsg(client);
if (msg == null)
return null; // Interrupted
if (verbose)
log("I: received reply:" + msg.toString());
if (msg.size() < 3)
return null; // Don't try to handle errors
var empty = msg.pop(); // Empty frame
if (empty.size() != 0) {
return null; // Assert
}
empty.destroy();
var header = msg.pop();
if (!header.streq(MDP.MDPC_CLIENT)) {
return null; // Assert
}
header.destroy();
var reply_service = msg.pop();
reply_service.destroy();
return msg; // Success
} else {
if (verbose)
log("E: permanent error, abandoning");
}
return null;
}
}
38 changes: 38 additions & 0 deletions examples/Haxe/MDClient2.hx
@@ -0,0 +1,38 @@
package ;
import neko.Lib;
import neko.Sys;
import org.zeromq.ZMsg;

/**
* Majordomo Protocol client example (asynchronous)
* Uses the MDPCli API to hide all MDP aspects
*
* @author Richard J Smith
*/

class MDClient2
{

public static function main() {
var argArr = Sys.args();
var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v");

var session = new MDCliAPI2("tcp://localhost:5555", verbose);
for (i in 0 ... 100000) {
var request = new ZMsg();
request.pushString("Hello world: "+i);
session.send("echo", request);
}
var count = 0;
for (i in 0 ... 100000) {
var reply = session.recv();
if (reply != null)
reply.destroy();
else
break; // Interrupt or failure
count++;
}
Lib.println(count + " requests/replies processed");
session.destroy();
}
}
2 changes: 1 addition & 1 deletion examples/Haxe/MDWrkAPI.hx
Expand Up @@ -111,7 +111,7 @@ class MDWrkAPI
worker.setsockopt(ZMQ_LINGER, 0);
worker.connect(broker);
if (verbose)
log("I: connecting to broker at " + broker + "...");
log("I: worker connecting to broker at " + broker + "...");

sendToBroker(MDP.MDPW_READY, service);

Expand Down
34 changes: 34 additions & 0 deletions examples/Haxe/MMIEcho.hx
@@ -0,0 +1,34 @@
package ;

import neko.Lib;
import neko.Sys;
import org.zeromq.ZMsg;

/**
* MMI echo query example
*/
class MMIEcho
{

public static function main() {
var argArr = Sys.args();
var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v");

var session = new MDCliAPI("tcp://localhost:5555", verbose);

// This is the service we want to look up
var request = new ZMsg();
request.addString("echo");

// This is the service we send our request to
var reply = session.send("mmi.service", request);

if (reply != null) {
var replyCode = reply.first().toString();
Lib.println("Lookup echo service: " + replyCode);
} else
Lib.println("E: no response from broker, make sure it's running");

session.destroy();
}
}
12 changes: 12 additions & 0 deletions examples/Haxe/Run.hx
Expand Up @@ -86,6 +86,10 @@ class Run
Lib.println("45. MDClient");
Lib.println("46. MDWorker");
Lib.println("47. MDBroker");
Lib.println("48. MDClient2");
Lib.println("49. MMIEcho");
Lib.println("50. TIClient");
Lib.println("51. Titanic");

do {
Lib.print("Type number followed by Enter key, or q to quit: ");
Expand Down Expand Up @@ -189,6 +193,14 @@ class Run
MDWorker.main();
case 47:
MDBroker.main();
case 48:
MDClient2.main();
case 49:
MMIEcho.main();
case 50:
TIClient.main();
case 51:
Titanic.main();
default:
Lib.println ("Unknown program number ... exiting");
}
Expand Down

0 comments on commit 8eb633c

Please sign in to comment.