-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* first synchronous version * tripping benchmark * mdcliapi2/mdclient2 async client * mmiecho/titanic/ticlient
- Loading branch information
Showing
10 changed files
with
904 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
"""Majordomo Protocol definitions""" | ||
# This is the version of MDP/Client we implement | ||
C_CLIENT = "MDPC01" | ||
|
||
# This is the version of MDP/Worker we implement | ||
W_WORKER = "MDPW01" | ||
|
||
# MDP/Server commands, as strings | ||
W_READY = "\001" | ||
W_REQUEST = "\002" | ||
W_REPLY = "\003" | ||
W_HEARTBEAT = "\004" | ||
W_DISCONNECT = "\005" | ||
|
||
commands = [None, "READY", "REQUEST", "REPLY", "HEARTBEAT", "DISCONNECT"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,309 @@ | ||
""" | ||
Majordomo Protocol broker | ||
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8 | ||
Author: Min RK <benjaminrk@gmail.com> | ||
Based on Java example by Arkadiusz Orzechowski | ||
""" | ||
|
||
import logging | ||
import sys | ||
import time | ||
from binascii import hexlify | ||
|
||
import zmq | ||
|
||
# local | ||
import MDP | ||
from zhelpers import dump | ||
|
||
class Service(object): | ||
"""a single Service""" | ||
name = None # Service name | ||
requests = None # List of client requests | ||
waiting = None # List of waiting workers | ||
|
||
def __init__(self, name): | ||
self.name = name | ||
self.requests = [] | ||
self.waiting = [] | ||
|
||
class Worker(object): | ||
"""a Worker, idle or active""" | ||
identity = None # hex Identity of worker | ||
address = None # Address to route to | ||
service = None # Owning service, if known | ||
expiry = None # expires at this point, unless heartbeat | ||
|
||
def __init__(self, identity, address, lifetime): | ||
self.identity = identity | ||
self.address = address | ||
self.expiry = time.time() + 1e-3*lifetime | ||
|
||
class MajorDomoBroker(object): | ||
""" | ||
Majordomo Protocol broker | ||
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8 | ||
""" | ||
|
||
# We'd normally pull these from config data | ||
INTERNAL_SERVICE_PREFIX = "mmi." | ||
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable | ||
HEARTBEAT_INTERVAL = 2500 # msecs | ||
HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS | ||
|
||
# --------------------------------------------------------------------- | ||
|
||
ctx = None # Our context | ||
socket = None # Socket for clients & workers | ||
poller = None # our Poller | ||
|
||
heartbeat_at = None# When to send HEARTBEAT | ||
services = None # known services | ||
workers = None # known workers | ||
waiting = None # idle workers | ||
|
||
verbose = False # Print activity to stdout | ||
|
||
# --------------------------------------------------------------------- | ||
|
||
|
||
def __init__(self, verbose=False): | ||
"""Initialize broker state.""" | ||
self.verbose = verbose | ||
self.services = {} | ||
self.workers = {} | ||
self.waiting = [] | ||
self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL | ||
self.ctx = zmq.Context() | ||
self.socket = self.ctx.socket(zmq.ROUTER) | ||
self.socket.linger = 0 | ||
self.poller = zmq.Poller() | ||
self.poller.register(self.socket, zmq.POLLIN) | ||
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", | ||
level=logging.INFO) | ||
|
||
|
||
|
||
# --------------------------------------------------------------------- | ||
|
||
def mediate(self): | ||
"""Main broker work happens here""" | ||
while True: | ||
try: | ||
items = self.poller.poll(self.HEARTBEAT_INTERVAL) | ||
except KeyboardInterrupt: | ||
break # Interrupted | ||
if items: | ||
msg = self.socket.recv_multipart() | ||
if self.verbose: | ||
logging.info("I: received message:") | ||
dump(msg) | ||
|
||
sender = msg.pop(0) | ||
empty = msg.pop(0) | ||
assert empty == '' | ||
header = msg.pop(0) | ||
|
||
if (MDP.C_CLIENT == header): | ||
self.process_client(sender, msg) | ||
elif (MDP.W_WORKER == header): | ||
self.process_worker(sender, msg) | ||
else: | ||
logging.error("E: invalid message:") | ||
dump(msg) | ||
|
||
self.purge_workers() | ||
self.send_heartbeats() | ||
|
||
def destroy(self): | ||
"""Disconnect all workers, destroy context.""" | ||
while self.workers: | ||
self.delete_worker(self.workers[0], True) | ||
self.ctx.destroy(0) | ||
|
||
|
||
def process_client(self, sender, msg): | ||
"""Process a request coming from a client.""" | ||
assert len(msg) >= 2 # Service name + body | ||
service = msg.pop(0) | ||
# Set reply return address to client sender | ||
msg = [sender,''] + msg | ||
if service.startswith(self.INTERNAL_SERVICE_PREFIX): | ||
self.service_internal(service, msg) | ||
else: | ||
self.dispatch(self.require_service(service), msg) | ||
|
||
|
||
def process_worker(self, sender, msg): | ||
"""Process message sent to us by a worker.""" | ||
assert len(msg) >= 1 # At least, command | ||
|
||
command = msg.pop(0) | ||
|
||
worker_ready = hexlify(sender) in self.workers | ||
|
||
worker = self.require_worker(sender) | ||
|
||
if (MDP.W_READY == command): | ||
# Not first command in session or Reserved service name | ||
if (worker_ready or sender.startswith(self.INTERNAL_SERVICE_PREFIX)): | ||
self.delete_worker(worker, True) | ||
else: | ||
# Attach worker to service and mark as idle | ||
service = msg.pop(0) | ||
worker.service = self.require_service(service) | ||
self.worker_waiting(worker) | ||
|
||
elif (MDP.W_REPLY == command): | ||
if (worker_ready): | ||
# Remove & save client return envelope and insert the | ||
# protocol header and service name, then rewrap envelope. | ||
client = msg.pop(0) | ||
empty = msg.pop(0) # ? | ||
msg = [client, '', MDP.C_CLIENT, worker.service.name] + msg | ||
self.socket.send_multipart(msg) | ||
self.worker_waiting(worker) | ||
else: | ||
self.delete_worker(worker, True) | ||
|
||
elif (MDP.W_HEARTBEAT == command): | ||
if (worker_ready): | ||
worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY | ||
else: | ||
self.delete_worker(worker, True) | ||
|
||
elif (MDP.W_DISCONNECT == command): | ||
self.delete_worker(worker, False) | ||
else: | ||
logging.error("E: invalid message:") | ||
dump(msg) | ||
|
||
def delete_worker(self, worker, disconnect): | ||
"""Deletes worker from all data structures, and deletes worker.""" | ||
assert worker is not None | ||
if disconnect: | ||
self.send_to_worker(worker, MDP.W_DISCONNECT, None, None) | ||
|
||
if worker.service is not None: | ||
worker.service.waiting.remove(worker) | ||
self.workers.pop(worker.identity) | ||
|
||
def require_worker(self, address): | ||
"""Finds the worker (creates if necessary).""" | ||
assert (address is not None) | ||
identity = hexlify(address) | ||
worker = self.workers.get(identity) | ||
if (worker is None): | ||
worker = Worker(identity, address, self.HEARTBEAT_EXPIRY) | ||
self.workers[identity] = worker | ||
if self.verbose: | ||
logging.info("I: registering new worker: %s", identity) | ||
|
||
return worker | ||
|
||
def require_service(self, name): | ||
"""Locates the service (creates if necessary).""" | ||
assert (name is not None) | ||
service = self.services.get(name) | ||
if (service is None): | ||
service = Service(name) | ||
self.services[name] = service | ||
|
||
return service | ||
|
||
def bind(self, endpoint): | ||
"""Bind broker to endpoint, can call this multiple times. | ||
We use a single socket for both clients and workers. | ||
""" | ||
self.socket.bind(endpoint) | ||
logging.info("I: MDP broker/0.1.1 is active at %s", endpoint) | ||
|
||
def service_internal(self, service, msg): | ||
"""Handle internal service according to 8/MMI specification""" | ||
returncode = "501" | ||
if "mmi.service" == service: | ||
name = msg[-1] | ||
returncode = "200" if name in self.services else "400" | ||
msg[-1] = returncode | ||
|
||
# insert the protocol header and service name after the routing envelope ([client, '']) | ||
msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:] | ||
self.socket.send_multipart(msg) | ||
|
||
def send_heartbeats(self): | ||
"""Send heartbeats to idle workers if it's time""" | ||
if (time.time() > self.heartbeat_at): | ||
for worker in self.waiting: | ||
self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None) | ||
|
||
self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL | ||
|
||
def purge_workers(self): | ||
"""Look for & kill expired workers. | ||
Workers are oldest to most recent, so we stop at the first alive worker. | ||
""" | ||
while self.waiting: | ||
w = self.waiting[0] | ||
if w.expiry < time.time(): | ||
logging.info("I: deleting expired worker: %s", w.identity) | ||
self.delete_worker(w,False) | ||
self.waiting.pop(0) | ||
else: | ||
break | ||
|
||
def worker_waiting(self, worker): | ||
"""This worker is now waiting for work.""" | ||
# Queue to broker and service waiting lists | ||
self.waiting.append(worker) | ||
worker.service.waiting.append(worker) | ||
worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY | ||
self.dispatch(worker.service, None) | ||
|
||
def dispatch(self, service, msg): | ||
"""Dispatch requests to waiting workers as possible""" | ||
assert (service is not None) | ||
if msg is not None:# Queue message if any | ||
service.requests.append(msg) | ||
self.purge_workers() | ||
while service.waiting and service.requests: | ||
msg = service.requests.pop(0) | ||
worker = service.waiting.pop(0) | ||
self.waiting.remove(worker) | ||
self.send_to_worker(worker, MDP.W_REQUEST, None, msg) | ||
|
||
def send_to_worker(self, worker, command, option, msg=None): | ||
"""Send message to worker. | ||
If message is provided, sends that message. | ||
""" | ||
|
||
if msg is None: | ||
msg = [] | ||
elif not isinstance(msg, list): | ||
msg = [msg] | ||
|
||
# Stack routing and protocol envelopes to start of message | ||
# and routing envelope | ||
if option is not None: | ||
msg = [option] + msg | ||
msg = [worker.address, '', MDP.W_WORKER, command] + msg | ||
|
||
if self.verbose: | ||
logging.info("I: sending %r to worker", command) | ||
dump(msg) | ||
|
||
self.socket.send_multipart(msg) | ||
|
||
|
||
def main(): | ||
"""create and start new broker""" | ||
verbose = '-v' in sys.argv | ||
broker = MajorDomoBroker(verbose) | ||
broker.bind("tcp://*:5555") | ||
broker.mediate() | ||
|
||
if __name__ == '__main__': | ||
main() |
Oops, something went wrong.