In [None]:
import zmq
from zmq.auth.thread import ThreadAuthenticator


class TasksQueue(object):
    """ Outgoing task queue from the executor to the Interchange                                                                                                                                                   
    """

    def __init__(self, address, port=55001, mode='client'):
        """                                                                                                                                                                                                        
        Parameters                                                                                                                                                                                                 
        ----------                                                                                                                                                                                                 
                                                                                                                                                                                                                   
        address: str                                                                                                                                                                                            
           address to connect                                                                                                                                                             
        port: int                                                                                                                                                                                
           Port to use
        mode: string
           Either 'client' or 'server'                                                                                                                                                                                         
        """
        self.context = zmq.Context()
        self.mode = mode
        self.port = port

        assert self.mode in ['client', 'server'], "Only two modes are supported: client, server"
        
        if self.mode == 'server':        
            print("Configuring server")
            self.zmq_socket = self.context.socket(zmq.ROUTER)
            self.zmq_socket.set_hwm(0)
            print("Setting up auth-server")
            self.setup_auth()
            self.zmq_socket.bind("tcp://*:{}".format(port))
        else:
            self.zmq_socket = self.context.socket(zmq.DEALER)
            self.zmq_socket.connect("tcp://{}:{}".format(address, port))
            
        self.poller = zmq.Poller()
        self.poller.register(self.zmq_socket, zmq.POLLOUT)
        print("Initialized")
        
    def setup_server_auth(self):
        self.auth = ThreadAuthenticator(context=self.context)
        
        self.auth.configure_curve(domain="*", location=(zmq.auth.CURVE_ALLOW_ANY))
        self.auth.allow("127.0.0.1")
        self.auth.start()
        
        # Configure the listening socket
        keys = zmq.auth.load_certificate(self._secret_key_file)
        self.zmq_socket.setsockopt(zmq.CURVE_PUBLICKEY, keys[0])
        self.zmq_socket.setsockopt(zmq.CURVE_SECRETKEY, keys[1])
        self.zmq_socket.setsockopt(zmq.CURVE_SERVER, True)
        print("Done")
        
    def put(self, message, max_timeout=1000):
        """ This function needs to be fast at the same time aware of the possibility of                                                                                                                            
        ZMQ pipes overflowing.                                                                                                                                                                                     
                                                                                                                                                                                                                   
        The timeout increases slowly if contention is detected on ZMQ pipes.                                                                                                                                       
        We could set copy=False and get slightly better latency but this results                                                                                                                                   
        in ZMQ sockets reaching a broken state once there are ~10k tasks in flight.                                                                                                                                
        This issue can be magnified if each the serialized buffer itself is larger.                                                                                                                                
                                                                                                                                                                                                                   
        Parameters                                                                                                                                                                                                 
        ----------                                                                                                                                                                                                 
                                                                                                                                                                                                                   
        message : py object                                                                                                                                                                                        
             Python object to send                                                                                                                                                                                 
        max_timeout : int                                                                                                                                                                                          
             Max timeout in milliseconds that we will wait for before raising an exception                                                                                                                         
                                                                                                                                                                                                                   
        Raises                                                                                                                                                                                                     
        ------                                                                                                                                                                                                     
                                                                                                                                                                                                                   
        zmq.EAGAIN if the send failed.                                                                                                                                                                             
                                                                                                                                                                                                                   
        """
        assert self.mode == 'server', "put is valid only on the server"
        timeout_ms = 0
        current_wait = 0
        logger.info("Putting task into queue")
        while current_wait < max_timeout:
            socks = dict(self.poller.poll(timeout=timeout_ms))
            if self.zmq_socket in socks and socks[self.zmq_socket] == zmq.POLLOUT:
                # The copy option adds latency but reduces the risk of ZMQ overflow                                                                                                                                
                self.zmq_socket.send_pyobj(message, copy=True)
                return
            else:
                timeout_ms += 1
                logger.debug("Not sending due to full zmq pipe, timeout: {} ms".format(timeout_ms))
            current_wait += timeout_ms

        # Send has failed.                                                                                                                                                                                         
        logger.debug("Remote side has been unresponsive for {}".format(current_wait))
        raise zmq.error.Again

    def close(self):
        self.zmq_socket.close()
        self.context.term()


In [None]:
tq = TasksQueue('127.0.0.1', mode='server')

In [1]:
import os
import zmq
from zmq.auth.thread import ThreadAuthenticator

In [3]:
ctx = zmq.Context() #.instance()

In [None]:
# Start an authenticator for this context.
auth = ThreadAuthenticator(ctx)
auth.start()
auth.allow('127.0.0.1')
# Tell the authenticator how to handle CURVE requests
auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)

secret_keys_dir = '/tmp'

server = ctx.socket(zmq.ROUTER)
server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
server.curve_secretkey = server_secret
server.curve_publickey = server_public
server.curve_server = True  # must come before bind
server.bind('tcp://*:9000')

In [4]:
client = ctx.socket(zmq.DEALER)
import uuid
uid = str(uuid.uuid4())
client.setsockopt(zmq.IDENTITY, b'01')
# We need two certificates, one for the client and one for
# the server. The client must know the server's public key
# to make a CURVE connection.

client_secret_file = os.path.join('/tmp', "client.key_secret")
client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
client.curve_secretkey = client_secret
client.curve_publickey = client_public

# The client must know the server's public key to make a CURVE connection.
server_public_file = os.path.join('/tmp', "server.key")
server_public, _ = zmq.auth.load_certificate(server_public_file)
client.curve_serverkey = server_public


In [5]:
client.connect('tcp://127.0.0.1:9000')

In [6]:
client.send_multipart([b'fooo'])

In [None]:
source, *message = server.recv_multipart()
print(f"Got {message} from {source}")