Skip to content

Commit

Permalink
Rename incoming and outgoing ports to frontend and backend in router
Browse files Browse the repository at this point in the history
- This refactors the `self.incoming` and `self.outgoing` sockets to
  `self.frontend` (client facing) and `self.backend` (manager facing). This
  change should make the purpose of these ports more clear for new people
  working in the code.

Resolves: #16
  • Loading branch information
com4 committed Mar 21, 2017
1 parent 93fa651 commit 89ed238
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 46 deletions.
55 changes: 29 additions & 26 deletions eventmq/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ def __init__(self, *args, **kwargs):

self.poller = poller.Poller()

self.incoming = receiver.Receiver()
self.outgoing = receiver.Receiver()
#: Port clients connect on.
self.frontend = receiver.Receiver()
#: Port job managers connect on
self.backend = receiver.Receiver()
#: Port for administrative commands
self.administrative_socket = receiver.Receiver()

self.poller.register(self.incoming, poller.POLLIN)
self.poller.register(self.outgoing, poller.POLLIN)
self.poller.register(self.frontend, poller.POLLIN)
self.poller.register(self.backend, poller.POLLIN)
self.poller.register(self.administrative_socket, poller.POLLIN)

self.status = STATUS.ready
Expand Down Expand Up @@ -147,8 +150,8 @@ def start(self,
"""
self.status = STATUS.starting

self.incoming.listen(frontend_addr)
self.outgoing.listen(backend_addr)
self.frontend.listen(frontend_addr)
self.backend.listen(backend_addr)
self.administrative_socket.listen(administrative_addr)

self.status = STATUS.listening
Expand All @@ -169,12 +172,12 @@ def _start_event_loop(self):
now = monotonic()
events = self.poller.poll()

if events.get(self.incoming) == poller.POLLIN:
msg = self.incoming.recv_multipart()
if events.get(self.frontend) == poller.POLLIN:
msg = self.frontend.recv_multipart()
self.process_client_message(msg)

if events.get(self.outgoing) == poller.POLLIN:
msg = self.outgoing.recv_multipart()
if events.get(self.backend) == poller.POLLIN:
msg = self.backend.recv_multipart()
self.process_worker_message(msg)

if events.get(self.administrative_socket) == poller.POLLIN:
Expand Down Expand Up @@ -275,7 +278,7 @@ def send_workers_heartbeats(self):
self._meta['last_sent_heartbeat'] = monotonic()

for worker_id in self.workers:
self.send_heartbeat(self.outgoing, worker_id)
self.send_heartbeat(self.backend, worker_id)

def send_schedulers_heartbeats(self):
"""
Expand All @@ -284,7 +287,7 @@ def send_schedulers_heartbeats(self):
self._meta['last_sent_scheduler_heartbeat'] = monotonic()

for scheduler_id in self.schedulers:
self.send_heartbeat(self.incoming, scheduler_id)
self.send_heartbeat(self.frontend, scheduler_id)

def on_heartbeat(self, sender, msgid, msg):
"""
Expand Down Expand Up @@ -319,10 +322,10 @@ def on_inform(self, sender, msgid, msg):

if client_type == CLIENT_TYPE.worker:
self.add_worker(sender, queues)
self.send_ack(self.outgoing, sender, msgid)
self.send_ack(self.backend, sender, msgid)
elif client_type == CLIENT_TYPE.scheduler:
self.add_scheduler(sender)
self.send_ack(self.incoming, sender, msgid)
self.send_ack(self.frontend, sender, msgid)

def on_reply(self, sender, msgid, msg):
"""
Expand Down Expand Up @@ -353,10 +356,10 @@ def on_disconnect(self, msgid, msg):
# Remove schedulers and send them a kbye
logger.info("Router preparing to disconnect...")
for scheduler in self.schedulers:
self.send_kbye(self.incoming, scheduler)
self.send_kbye(self.frontend, scheduler)

self.schedulers.clear()
self.incoming.unbind(conf.FRONTEND_ADDR)
self.frontend.unbind(conf.FRONTEND_ADDR)

if len(self.waiting_messages) > 0:
logger.info("Router processing messages in queue.")
Expand All @@ -366,10 +369,10 @@ def on_disconnect(self, msgid, msg):
self.process_worker_message(msg)

for worker in self.workers.keys():
self.send_kbye(self.outgoing, worker)
self.send_kbye(self.backend, worker)

self.workers.clear()
self.outgoing.unbind(conf.BACKEND_ADDR)
self.backend.unbind(conf.BACKEND_ADDR)

# Loops event loops should check for this and break out
self.received_disconnect = True
Expand Down Expand Up @@ -400,7 +403,7 @@ def on_ready(self, sender, msgid, msg):
msg = self.waiting_messages[queue_name].peekleft()

try:
fwdmsg(self.outgoing, sender, msg)
fwdmsg(self.backend, sender, msg)
self.waiting_messages[queue_name].popleft()
except exceptions.PeerGoneAwayError:
# Cleanup a workerg that cannot be contacted, leaving the
Expand Down Expand Up @@ -494,12 +497,12 @@ def on_request(self, sender, msgid, msg, depth=1):

# Rebuild the message to be sent to the worker. fwdmsg will
# properly address the message.
fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION,
'REQUEST', msgid, ] + msg)
fwdmsg(self.backend, worker_addr, ['', constants.PROTOCOL_VERSION,
'REQUEST', msgid, ] + msg)

self.workers[worker_addr]['available_slots'] -= 1
# Acknowledgment of the request being submitted to the client
sendmsg(self.incoming, sender, 'REPLY',
sendmsg(self.frontend, sender, 'REPLY',
(msgid,))
except exceptions.PeerGoneAwayError:
logger.debug(
Expand Down Expand Up @@ -760,7 +763,7 @@ def process_client_message(self, original_msg, depth=0):
try:
# Strips off the client id before forwarding because the
# scheduler isn't expecting it.
fwdmsg(self.incoming, scheduler_addr, original_msg[1:])
fwdmsg(self.frontend, scheduler_addr, original_msg[1:])

except exceptions.PeerGoneAwayError:
logger.debug("Scheduler {} has unexpectedly gone away. Trying "
Expand All @@ -777,7 +780,7 @@ def process_client_message(self, original_msg, depth=0):
try:
# Strips off the client id before forwarding because the
# scheduler isn't expecting it.
fwdmsg(self.incoming, scheduler_addr, original_msg[1:])
fwdmsg(self.frontend, scheduler_addr, original_msg[1:])

except exceptions.PeerGoneAwayError:
logger.debug("Scheduler {} has unexpectedly gone away."
Expand Down Expand Up @@ -902,8 +905,8 @@ def sighup_handler(self, signum, frame):
process receives a SIGHUP from the system.
"""
logger.info('Caught signame %s' % signum)
self.incoming.unbind(conf.FRONTEND_ADDR)
self.outgoing.unbind(conf.BACKEND_ADDR)
self.frontend.unbind(conf.FRONTEND_ADDR)
self.backend.unbind(conf.BACKEND_ADDR)
import_settings()
self.start(frontend_addr=conf.FRONTEND_ADDR,
backend_addr=conf.BACKEND_ADDR,
Expand Down
2 changes: 1 addition & 1 deletion eventmq/tests/test_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def setUp(self):
self.zcontext = zmq.Context.instance()

self.router = router.Router()
self.receiver = self.router.incoming
self.receiver = self.router.frontend
self.sender = sender.Sender()

def test_send_multipart_unicode(self):
Expand Down
36 changes: 18 additions & 18 deletions eventmq/tests/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ class TestCase(unittest.TestCase):
def setUp(self):
self.router = router.Router(skip_signal=True)
self.router.zcontext = mock.Mock(spec=zmq.Context)
self.router.incoming = mock.Mock(spec=receiver.Receiver)
self.router.outgoing = mock.Mock(spec=receiver.Receiver)
self.router.frontend = mock.Mock(spec=receiver.Receiver)
self.router.backend = mock.Mock(spec=receiver.Receiver)

@mock.patch('eventmq.receiver.zmq.Socket.bind')
@mock.patch('eventmq.router.Router._start_event_loop')
def test_start(self, event_loop_mock, zsocket_bind_mock):
# Test default args
self.router.start()
self.router.incoming.listen.assert_called_with(conf.FRONTEND_ADDR)
self.router.outgoing.listen.assert_called_with(conf.BACKEND_ADDR)
self.router.frontend.listen.assert_called_with(conf.FRONTEND_ADDR)
self.router.backend.listen.assert_called_with(conf.BACKEND_ADDR)
self.assertEqual(self.router.status, constants.STATUS.listening)

# Test invalid args
Expand All @@ -55,7 +55,7 @@ def test_on_inform_worker(self, add_worker_mock, send_ack_mock):
sender_id, inform_msgid, [queues, 'worker'])

self.router.send_ack.assert_called_with(
self.router.outgoing, sender_id, inform_msgid)
self.router.backend, sender_id, inform_msgid)

self.router.add_worker.assert_called_with(
sender_id, [(32, 'top'), (23, 'drop'), (12, 'shop')])
Expand All @@ -75,7 +75,7 @@ def test_on_inform_worker_default_queue(self, add_worker_mock,
sender_id, inform_msgid, [queues, constants.CLIENT_TYPE.worker])

self.router.send_ack.assert_called_with(
self.router.outgoing, sender_id, inform_msgid)
self.router.backend, sender_id, inform_msgid)
self.router.add_worker.assert_called_with(
sender_id, [(10, 'default'), ])

Expand Down Expand Up @@ -146,10 +146,10 @@ def test_send_ack(self, generate_msgid_mock):

generate_msgid_mock.return_value = ack_msgid

self.router.send_ack(self.router.outgoing, sender_id, orig_msgid)
self.router.send_ack(self.router.backend, sender_id, orig_msgid)

# Verify that an ACK was sent for the INFORM
self.router.outgoing.send_multipart.assert_called_with(
self.router.backend.send_multipart.assert_called_with(
('ACK', ack_msgid, orig_msgid),
constants.PROTOCOL_VERSION, _recipient_id=sender_id)

Expand All @@ -168,9 +168,9 @@ def test_send_heartbeat(self, generate_msgid_mock):

generate_msgid_mock.return_value = msgid

self.router.send_heartbeat(self.router.incoming, recipient_id)
self.router.send_heartbeat(self.router.frontend, recipient_id)

self.router.incoming.send_multipart.assert_called_with(
self.router.frontend.send_multipart.assert_called_with(
('HEARTBEAT', msgid, str(ts)), constants.PROTOCOL_VERSION,
_recipient_id=recipient_id)

Expand All @@ -197,8 +197,8 @@ def test_send_worker_heartbeats(self, send_heartbeat_mock):
self.assertGreater(self.router._meta['last_sent_heartbeat'], 0)

send_heartbeat_mock.assert_has_calls(
[mock.call(self.router.outgoing, 'w1'),
mock.call(self.router.outgoing, 'w2')], any_order=True)
[mock.call(self.router.backend, 'w1'),
mock.call(self.router.backend, 'w2')], any_order=True)

@mock.patch('eventmq.router.Router.send_heartbeat')
def test_send_schedulers_heartbeats(self, send_hb_mock):
Expand All @@ -215,7 +215,7 @@ def test_send_schedulers_heartbeats(self, send_hb_mock):

self.assertGreater(
self.router._meta['last_sent_scheduler_heartbeat'], 0)
send_hb_mock.assert_called_with(self.router.incoming, scheduler_id)
send_hb_mock.assert_called_with(self.router.frontend, scheduler_id)

def test_on_disconnect(self):
self.assertFalse(self.router.received_disconnect)
Expand Down Expand Up @@ -245,7 +245,7 @@ def test_on_ready(self, requeue_worker_mock, fwdmsg_mock):

self.router.on_ready(worker_id, msgid, msg)

fwdmsg_mock.assert_called_with(self.router.outgoing, worker_id,
fwdmsg_mock.assert_called_with(self.router.backend, worker_id,
waiting_msg)

self.router.on_ready(worker_id, msgid + 'a', msg)
Expand Down Expand Up @@ -296,19 +296,19 @@ def test_on_ready_multpile_queues(self, requeue_worker_mock,
# Forward waiting_msg1
ready_msgid1 = 'ready23'
self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1])
fwdmsg_mock.assert_called_with(self.router.outgoing, worker1_id,
fwdmsg_mock.assert_called_with(self.router.backend, worker1_id,
waiting_msg1)

# Forward waiting_msg3 -- blu is a higher priority for worker2
ready_msgid3 = 'ready19'
self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3])
fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id,
fwdmsg_mock.assert_called_with(self.router.backend, worker2_id,
waiting_msg3)

# Forward waiting_msg2
ready_msgid2 = 'ready5'
self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2])
fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id,
fwdmsg_mock.assert_called_with(self.router.backend, worker2_id,
waiting_msg2)

# There should be no keys because the code checks for their existence
Expand Down Expand Up @@ -348,7 +348,7 @@ def test_on_request(self, fwdmsg_mock, get_worker_mock,

# Router accepts job for 1 available slot
self.router.on_request(client_id, msgid, msg)
fwdmsg_mock.assert_called_with(self.router.outgoing, worker_id,
fwdmsg_mock.assert_called_with(self.router.backend, worker_id,
['', constants.PROTOCOL_VERSION,
'REQUEST', msgid, ] + msg)
self.assertEqual(self.router.workers[worker_id]['available_slots'], 0)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@
'bin/emq-jobmanager',
'bin/emq-router',
'bin/emq-scheduler',
'bin/emq-pubsub'
'bin/emq-publisher'
],
)

0 comments on commit 89ed238

Please sign in to comment.