Skip to content

Commit

Permalink
Added Binary Star examples
Browse files Browse the repository at this point in the history
Added Binary Star and Binary Star Reactor examples. Also added Tripping
example (sync vs synch test)
  • Loading branch information
rjsmith committed Aug 26, 2011
1 parent 8eb633c commit b03ec06
Show file tree
Hide file tree
Showing 12 changed files with 902 additions and 9 deletions.
368 changes: 368 additions & 0 deletions examples/Haxe/BStar.hx
@@ -0,0 +1,368 @@
/**
* (c) 2011 Richard J Smith
*
* Based on implementation of bstar in the ZeroMQ ZGuide:
* http://github.com/imatix/zguide/blob/master/examples/C/bstar.c
*
* This file is part of ZGuide
*
* ZGuide is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* ZGuide is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package ;

import haxe.io.Bytes;
import haxe.Stack;
import neko.Sys;
import neko.Lib;
import org.zeromq.ZContext;
import org.zeromq.ZLoop;
import org.zeromq.ZMQ;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQException;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
import org.zeromq.ZSocket;

// States we can be in at any time
private enum StateT {
STATE_PRIMARY; // Primary, waiting for peer to connect
STATE_BACKUP; // Backup, waiting for peer to connect
STATE_ACTIVE; // Active - accepting connections
STATE_PASSIVE; // Passive - not accepting connections
}

private enum EventT {
PEER_PRIMARY; // HA peer is pending primary
PEER_BACKUP; // HA peer is pending backup
PEER_ACTIVE; // HA peer is active
PEER_PASSIVE; // HA peer is passive
CLIENT_REQUEST; // Client makes request
}

/**
* Shortcut typedef for method signature of BStar reactor handler functions
*/
typedef HandlerFunctionType = ZLoop->ZMQSocket->Dynamic->Int;

/**
* Binary Star Reactor
*/
class BStar
{

/** We send state information every this often
* If peer doesn't respond in two heartbeats, it is 'dead'
*/
private static inline var BSTAR_HEARTBEAT = 100;

/** Our context */
private var ctx:ZContext;

/** Reactor loop */
public var loop(default, null):ZLoop;

/** State publisher socket */
private var statePub:ZMQSocket;

/** State subscriber socket */
private var stateSub:ZMQSocket;

/** Current state */
public var state(default, null):StateT;

/** Current event */
public var event(default, null):EventT;

/** When peer is considered 'dead' */
public var peerExpiry:Float;

/** Voting socket handler */
private var voterFn:HandlerFunctionType;

/** Arguments for voting handler */
private var voterArgs:Dynamic;

/** Master socket handler, called when become Master */
private var masterFn:HandlerFunctionType;

/** Arguments for Master handler */
private var masterArgs:Dynamic;

/** Slave socket handler, called when become Slave */
private var slaveFn:HandlerFunctionType;

/** Arguments for slave handler */
private var slaveArgs:Dynamic;

/** Print activity to stdout */
public var verbose:Bool;

/** Logger function used in verbose mode */
private var log:Dynamic->Void;

/**
* BStar Constructor
* @param isPrimary True if this instance is the primary instance, else false if slave
* @param local Network address to bind the statePub socket of this instance to
* @param remote Network address to connect the stateSub socket of this instance to
* @param ?verbose True to generate logging info
* @param ?logger Logger function
*/
public function new(isPrimary:Bool, local:String, remote:String, ?verbose:Bool = false, ?logger:Dynamic->Void)
{
// Initialise the binary star server
ctx = new ZContext();
loop = new ZLoop(logger);
loop.verbose = verbose;
state = { if (isPrimary) STATE_PRIMARY; else STATE_BACKUP; };

// Create publisher for state going to peer
statePub = ctx.createSocket(ZMQ_PUB);
statePub.bind(local);

// Create subscriber for state coming from peer
stateSub = ctx.createSocket(ZMQ_SUB);
stateSub.setsockopt(ZMQ_SUBSCRIBE, Bytes.ofString(""));
stateSub.connect(remote);

// Set up basic reactor events
loop.registerTimer(BSTAR_HEARTBEAT, 0, sendState);
var item = { socket: stateSub, event: ZMQ.ZMQ_POLLIN() };
loop.registerPoller(item, recvState);

this.verbose = verbose;
if (logger != null)
log = logger;
else
log = Lib.println;

}

/**
* Destructor
* Cleans up internal ZLoop reactor object and ZContext objects.
*/
public function destroy() {
if (loop != null)
loop.destroy();
if (ctx != null)
ctx.destroy();
}

/**
* Create socket, bind to local endpoint, and register as reader for
* voting. The socket will only be available if the Binary Star state
* machine allows it. Input on the socket will act as a "vote" in the
* Binary Star scheme. We require exactly one voter per bstar instance.
*
* @param endpoint Endpoint address
* @param type Socket Type to bind to endpoint
* @param handler Voter Handler method
* @param args Optional args to pass to Voter Handfler method when called.
* @return
*/
public function setVoter(endpoint:String, type:SocketType, handler:HandlerFunctionType, ?args:Dynamic):Bool {
// Hold actual handler + arg so we can call this later
var socket = ctx.createSocket(type);
socket.bind(endpoint);
voterFn = handler;
voterArgs = args;
return loop.registerPoller( { socket:socket, event:ZMQ.ZMQ_POLLIN() }, voterReady);
}

/**
* Sets handler method called when instance becomes Master
* @param handler
* @param ?args
*/
public function setMaster(handler:HandlerFunctionType, ?args:Dynamic) {
if (masterFn == null) {
masterFn = handler;
masterArgs = args;
}
}

/**
* Sets handler method called when instance becomes Slave
* @param handler
* @param ?args
*/
public function setSlave(handler:HandlerFunctionType, ?args:Dynamic) {
if (slaveFn == null) {
slaveFn = handler;
slaveArgs = args;
}
}


/**
* Executes finite state machine (apply event to this state)
* Returns true if there was an exception
* @return
*/
public function stateMachine():Bool
{
var exception = false;
switch (state) {
case STATE_PRIMARY:
// Primary server is waiting for peer to connect
// Accepts CLIENT_REQUEST events in this state
switch (event) {
case PEER_BACKUP:
if (verbose)
log("I: connected to backup (slave), ready as master");
state = STATE_ACTIVE;
if (masterFn != null)
masterFn(loop, null, masterArgs);
case PEER_ACTIVE:
if (verbose)
log("I: connected to backup (master), ready as slave");
state = STATE_PASSIVE;
if (slaveFn != null)
slaveFn(loop, null, slaveArgs);
case CLIENT_REQUEST:
if (verbose)
log("I: request from client, ready as master");
state = STATE_ACTIVE;
if (masterFn != null)
masterFn(loop, null, masterArgs);
default:
}
case STATE_BACKUP:
// Backup server is waiting for peer to connect
// Rejects CLIENT_REQUEST events in this state
switch (event) {
case PEER_ACTIVE:
if (verbose)
log("I: connected to primary (master), ready as slave");
state = STATE_PASSIVE;
if (slaveFn != null)
slaveFn(loop, null, slaveArgs);
case CLIENT_REQUEST:
exception = true;
default:
}
case STATE_ACTIVE:
// Server is active
// Accepts CLIENT_REQUEST events in this state
switch (event) {
case PEER_ACTIVE:
// Two masters would mean split-brain
log("E: fatal error - dual masters, aborting");
exception = true;
default:
}
case STATE_PASSIVE:
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
switch (event) {
case PEER_PRIMARY:
// Peer is restarting - I become active, peer will go passive
if (verbose)
log("I: primary (slave) is restarting, ready as master");
state = STATE_ACTIVE;
case PEER_BACKUP:
// Peer is restarting - become active, peer will go passive
if (verbose)
log("I: backup (slave) is restarting, ready as master");
state = STATE_ACTIVE;
case PEER_PASSIVE:
// Two passives would mean cluster would be non-responsive
log("E: fatal error - dual slaves, aborting");
exception = true;
case CLIENT_REQUEST:
// Peer becomes master if timeout as passed
// It's the client request that triggers the failover
if (Date.now().getTime() >= peerExpiry) {
// If peer is dead, switch to the active state
if (verbose)
log("I: failover successful, ready as master");
state = STATE_ACTIVE;
} else {
if (verbose)
log("I: peer is active, so ignore connection");
exception = true;
}
default:
}
}
return exception;
}

/**
* Reactor event handler
* Publish our state to peer
* @param loop
* @param socket
* @param arg
* @return
*/
public function sendState(loop:ZLoop, socket:ZMQSocket):Int {
statePub.sendMsg(Bytes.ofString(Std.string(Type.enumIndex(state))));
return 0;
}

/**
* Reactor event handler
* Receive state from peer, execute finite state machine.
* @param loop
* @param socket
* @return
*/
public function recvState(loop:ZLoop, socket:ZMQSocket):Int {
var message = stateSub.recvMsg().toString();
event = Type.createEnumIndex(EventT, Std.parseInt(message));
peerExpiry = Date.now().getTime() + (2 * BSTAR_HEARTBEAT);
return {
if (stateMachine())
-1; // Error, so exit
else
0;
};
}

/**
* Application wants to speak to us, see if it's possible
* @param loop
* @param socket
* @return
*/
public function voterReady(loop:ZLoop, socket:ZMQSocket):Int {
// If server can accept input now, call application handler
event = CLIENT_REQUEST;
if (stateMachine()) {
// Destroy waiting message, no-one to read it
var msg = socket.recvMsg();
} else {
if (verbose)
log("I: CLIENT REQUEST");
voterFn(loop, socket, voterArgs);
}
return 0;
}

/**
* Start the reactor, ends if a callback function returns -1, or the
* process receives SIGINT or SIGTERM
* @return 0 if interrupted or invalid, -1 if cancelled by handler
*/
public function start():Int {
if (voterFn != null && loop != null)
return loop.start();
else
return 0;
}

}

0 comments on commit b03ec06

Please sign in to comment.