In [1]:
import zmq
import logging
import pickle
import time
import binascii
from binascii import hexlify


In [2]:
class MDP:
    W_WORKER = b"MDPW01"
    W_READY = b'0x01'
    W_REQUEST = b"0x02"
    W_REPLY = b"0x03"
    W_HEARTBEAT = b"0x04"
    W_DISCONNECT = b"0x05"

In [3]:
def dump(msg_or_socket):
    """Receives all message parts from socket, printing each frame neatly"""
    if isinstance(msg_or_socket, zmq.Socket):
        # it's a socket, call on current message
        msg = msg_or_socket.recv_multipart()
    else:
        msg = msg_or_socket
    print("----------------------------------------")
    for part in msg:
        print("[%03d]" % len(part), end=' ')
        is_text = True
        try:
            print(part.decode('ascii'))
        except UnicodeDecodeError:
            print(r"0x%s" % (binascii.hexlify(part).decode('ascii')))

In [4]:
class Service(object):
    """a single Service"""
    name = None # Service name
    requests = None # List of client requests
    waiting = None # List of waiting workers

    def __init__(self, name):
        self.name = name
        self.requests = []
        self.waiting = []

In [5]:
class Worker(object):
    """a Worker, idle or active"""
    identity = None # hex Identity of worker
    address = None # Address to route to
    service = None # Owning service, if known
    expiry = None # expires at this point, unless heartbeat
    
    def __init__(self, identity, address, lifetime):
        self.identity = identity
        self.address = address
        self.expiry = time.time() + 1e-3*lifetime

In [6]:
class MajorDomoBroker(object):
    """
    Majordomo Protocol broker
    A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
    """

    # We'd normally pull these from config data
    INTERNAL_SERVICE_PREFIX = b"mmi."
    HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
    HEARTBEAT_INTERVAL = 2500 # msecs
    HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

    # ---------------------------------------------------------------------

    ctx = None # Our context
    socket = None # Socket for clients & workers
    poller = None # our Poller

    heartbeat_at = None# When to send HEARTBEAT
    services = None # known services
    workers = None # known workers
    waiting = None # idle workers

    verbose = False # Print activity to stdout

    # ---------------------------------------------------------------------


    def __init__(self, verbose=False):
        """Initialize broker state."""
        self.verbose = verbose
        self.services = {}
        self.workers = {} # This might be a WorkerQueue, for each identity we store a WorkerQueue
        self.waiting = []
        self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.ROUTER)
        self.socket.linger = 0
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        logging.basicConfig(format="%(asctime)s %(message)s",
                            datefmt="%Y-%m-%d %H:%M:%S",
                            level=logging.INFO)



    # ---------------------------------------------------------------------

    def mediate(self):
        """Main broker work happens here"""
        while True:

            # TODO: Pull messages from the queue stash
            # For a given message and service send
            check_for_new_pools(credentials, node)
            for item in queue_stash.get_all():
                msg = serialize(item)
                worker = self.waiting_workers.pop()
                msg = [client, b'', MDP.W_REQUEST, worker.service.name] + msg
                # Grab the list of worker from worker waiting 
                # Process the message and continue
                worker.socket.send_multipart(msg)

            try:
                items = self.poller.poll(self.HEARTBEAT_INTERVAL)
            except KeyboardInterrupt:
                break # Interrupted
            if items:
                msg = self.socket.recv_multipart()
                if self.verbose:
                    print("I: received message:")
                    # logging.info("I: received message:")
                    dump(msg)

                # msg = ["<address of worker>", b"", "MDPW01", "command", "service", "actual_msg_bytes"]

                sender = msg.pop(0)
                empty = msg.pop(0)
                assert empty == b''
                header = msg.pop(0)

                if (MDP.W_WORKER == header):
                    self.process_worker(sender, msg)
                else:
                    print("E: invalid message:")
                    # logging.error("E: invalid message:")
                    dump(msg)

            self.purge_workers()
            self.send_heartbeats()

    def destroy(self):
        """Disconnect all workers, destroy context."""
        while self.workers:
            self.delete_worker(self.workers.values()[0], True)
        self.ctx.destroy(0)

    def process_worker(self, sender, msg):
        """Process message sent to us by a worker."""
        assert len(msg) >= 1 # At least, command

        # msg = ["command", "service", "actual_msg_bytes"]

        command = msg.pop(0)

        worker_ready = hexlify(sender) in self.workers

        # TODO: Require worker for given service
        worker = self.require_worker(sender)

        if (MDP.W_READY == command):
            assert len(msg) >= 1 # At least, a service name
            service = msg.pop(0)
            # Not first command in session or Reserved service name
            if (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):
                self.delete_worker(worker, True)
            else:
                # Attach worker to service and mark as idle
                worker.service = self.require_service(service)
                self.worker_waiting(worker)
        
        elif (MDP.W_HEARTBEAT == command):
            if (worker_ready):
                worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
            else:
                self.delete_worker(worker, True)

        elif (MDP.W_DISCONNECT == command):
            self.delete_worker(worker, False)
        else:
            print("E: invalid message:")
            # logging.error("E: invalid message:")
            dump(msg)

    def delete_worker(self, worker, disconnect):
        """Deletes worker from all data structures, and deletes worker."""
        assert worker is not None
        if disconnect:
            self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)

        if worker.service is not None:
            worker.service.waiting.remove(worker)
        self.workers.pop(worker.identity)

    def require_worker(self, address):
        """Finds the worker (creates if necessary)."""
        assert (address is not None)
        identity = hexlify(address)
        # Instead of getting a worker, we get a WorkerQueue
        # Otherwise if doesn't exist then add a WorkerQueue and a Worker to it
        # From the WorkerQueue get the next worker
        worker = self.workers.get(identity)
        if (worker is None):
            worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)
            self.workers[identity] = worker
            if self.verbose:
                print("I: registering new worker: %s", identity)
                # logging.info("I: registering new worker: %s", identity)

        return worker

    def require_service(self, name):
        """Locates the service (creates if necessary)."""
        assert (name is not None)
        service = self.services.get(name)
        if (service is None):
            service = Service(name)
            self.services[name] = service

        return service

    def bind(self, endpoint):
        """Bind broker to endpoint, can call this multiple times.

        We use a single socket for both clients and workers.
        """
        self.socket.bind(endpoint)
        print("I: MDP broker/0.1.1 is active at %s", endpoint)
        # logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)

    def service_internal(self, service, msg):
        """Handle internal service according to 8/MMI specification"""
        returncode = b"501"
        if b"mmi.service" == service:
            name = msg[-1]
            returncode = b"200" if name in self.services else b"404"
        msg[-1] = returncode

        # insert the protocol header and service name after the routing envelope ([client, ''])
        msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]
        self.socket.send_multipart(msg)

    def send_heartbeats(self):
        """Send heartbeats to idle workers if it's time"""
        if (time.time() > self.heartbeat_at):
            for worker in self.waiting:
                self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)

            self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL

    def purge_workers(self):
        """Look for & kill expired workers.

        Workers are oldest to most recent, so we stop at the first alive worker.
        """
        while self.waiting:
            w = self.waiting[0]
            if w.expiry < time.time():
                print("I: deleting expired worker: %s", w.identity)
                # logging.info("I: deleting expired worker: %s", w.identity)
                self.delete_worker(w,False)
                self.waiting.pop(0)
            else:
                break

    def worker_waiting(self, worker):
        """This worker is now waiting for work."""
        # Queue to broker and service waiting lists
        self.waiting.append(worker)
        worker.service.waiting.append(worker)
        worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
        self.dispatch(worker.service, None)

    def dispatch(self, service, msg):
        """Dispatch requests to waiting workers as possible"""
        assert (service is not None)
        if msg is not None:# Queue message if any
            service.requests.append(msg)
        self.purge_workers()
        while service.waiting and service.requests:
            msg = service.requests.pop(0)
            worker = service.waiting.pop(0)
            self.waiting.remove(worker)
            self.send_to_worker(worker, MDP.W_REQUEST, None, msg)

    def send_to_worker(self, worker, command, option, msg=None):
        """Send message to worker.

        If message is provided, sends that message.
        """

        if msg is None:
            msg = []
        elif not isinstance(msg, list):
            msg = [msg]

        # Stack routing and protocol envelopes to start of message
        # and routing envelope
        if option is not None:
            msg = [option] + msg
        msg = [worker.address, b'', MDP.W_WORKER, command] + msg

        if self.verbose:
            print("I: sending %r to worker", command)
            # logging.info("I: sending %r to worker", command)
            dump(msg)

        self.socket.send_multipart(msg)

In [7]:
verbose = True
broker = MajorDomoBroker(verbose)

In [8]:
broker.bind("tcp://*:5555")

I: MDP broker/0.1.1 is active at %s tcp://*:5555


In [9]:
broker.mediate()

I: received message:
----------------------------------------
[005] 0x00bd1b58ba
[000] 
[006] MDPW01
[004] 0x01
[004] echo
I: registering new worker: %s b'00bd1b58ba'
I: sending %r to worker b'0x04'
----------------------------------------
[005] 0x00bd1b58ba
[000] 
[006] MDPW01
[004] 0x04
I: sending %r to worker b'0x04'
----------------------------------------
[005] 0x00bd1b58ba
[000] 
[006] MDPW01
[004] 0x04
I: deleting expired worker: %s b'00bd1b58ba'
I: received message:
----------------------------------------
[005] 0x00bd1b58ba
[000] 
[006] MDPW01
[004] 0x04
I: registering new worker: %s b'00bd1b58ba'
I: sending %r to worker b'0x05'
----------------------------------------
[005] 0x00bd1b58ba
[000] 
[006] MDPW01
[004] 0x05
I: received message:
----------------------------------------
[005] 0x00bd1b58bb
[000] 
[006] MDPW01
[004] 0x01
[004] echo
I: registering new worker: %s b'00bd1b58bb'
I: sending %r to worker b'0x04'
----------------------------------------
[005] 0x00bd1b58bb
[000

In [15]:
worker = list(broker.workers.values())

In [18]:
worker[0].__dict__

{'identity': b'00bd1b58bb',
 'address': b'\x00\xbd\x1bX\xbb',
 'expiry': 1702404882.7638535,
 'service': <__main__.Service at 0x7f02e4302ac0>}

In [20]:
broker.delete_worker(worker[0], True)

I: sending %r to worker b'0x05'
----------------------------------------
[005] 0x00bd1b58bb
[000] 
[006] MDPW01
[004] 0x05


In [21]:
broker.destroy()

In [24]:
worker[0].service.__dict__

{'name': b'echo', 'requests': [], 'waiting': []}