Skip to content

Commit

Permalink
Add by worker counts
Browse files Browse the repository at this point in the history
  • Loading branch information
sideshowdave7 committed Jun 20, 2017
1 parent 482857c commit 80abb14
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion eventmq/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ def __init__(self, *args, **kwargs):
# Includes REQUESTS in flight but not REQUESTS queued
self.processed_message_counts = {}

# Same as above but Key: Worker.uuid
self.processed_message_counts_by_worker = {}

#: Tracks the last time the scheduler queue was cleaned out of dead
#: schedulers
self._meta['last_scheduler_cleanup'] = 0
Expand Down Expand Up @@ -503,6 +506,11 @@ def on_request(self, sender, msgid, msg, depth=1):
else:
self.processed_message_counts[queue_name] += 1

if queue_name not in self.processed_message_counts_by_worker:
self.processed_message_counts_by_worker[worker_addr] = 1
else:
self.processed_message_counts_by_worker[worker_addr] += 1

fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION,
'REQUEST', msgid, ] + msg)

Expand Down Expand Up @@ -885,8 +893,10 @@ def get_status(self):
(str) Serialized information about the current state of the router.
"""
return json.dumps({
'job_latencies': None, # Deprecated, to be removed from reporting
'job_latencies_count': len(self.job_latencies),
'processed_messages': self.processed_message_counts,
'processed_messages_by_worker':
self.processed_message_counts_by_worker,
'executed_functions': self.executed_functions,
'waiting_message_counts': [
'{}: {}'.
Expand Down

0 comments on commit 80abb14

Please sign in to comment.