Skip to content

Commit

Permalink
Refactor k/v iteration in scheduler loop
Browse files Browse the repository at this point in the history
  • Loading branch information
sideshowdave7 committed May 2, 2017
1 parent b7a137e commit eda45f0
Showing 1 changed file with 67 additions and 39 deletions.
106 changes: 67 additions & 39 deletions eventmq/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
=============================
Handles cron and other scheduled tasks
"""
from future.utils import iteritems

from hashlib import sha1 as emq_hash
import json
from json import dumps as serialize
Expand Down Expand Up @@ -47,6 +49,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
"""
Keeper of time, master of schedules
"""
# TODO: Remove dependency on redis, make the backing store a generic
# interface
SERVICE_TYPE = constants.CLIENT_TYPE.scheduler

def __init__(self, override_settings=None, skip_signal=False, *args,
Expand Down Expand Up @@ -126,6 +130,9 @@ def load_jobs(self):
def _start_event_loop(self):
"""
Starts the actual event loop. Usually called by :meth:`Scheduler.start`
This loop is responsible for sending REQUESTs for scheduled jobs when
their next scheduled time has occurred
"""
while True:
if self.received_disconnect:
Expand All @@ -139,79 +146,100 @@ def _start_event_loop(self):
msg = self.frontend.recv_multipart()
self.process_message(msg)

# TODO: distribute me!
for hash_, cron in self.cron_jobs.items():
# If the time is now, or passed
if cron[0] <= ts_now:
msg = cron[1]
queue = cron[3]
for hash_, cron in iteritems(self.cron_jobs):
# the next ts this job should be executed in
next_monotonic = cron[0]
# 1 = the function to be executed
job_message = cron[1]
# 2 = the croniter iterator for this job
interval_iterator = cron[2]
# 3 = the queue to execute the job in
queue = cron[3]

# If the time is now, or passed
if next_monotonic <= ts_now:
# Run the msg
logger.debug("Time is: %s; Schedule is: %s - Running %s"
% (ts_now, cron[0], msg))
% (ts_now, next_monotonic, job_message))

self.send_request(msg, queue=queue)
self.send_request(job_message, queue=queue)

# Update the next time to run
cron[0] = next(cron[2])
next_monotonic = next(interval_iterator)
logger.debug("Next execution will be in %ss" %
seconds_until(cron[0]))
seconds_until(next_monotonic))

cancel_jobs = []
for k, v in self.interval_jobs.iteritems():
# TODO: Refactor this entire loop to be readable by humankind
# The schedule time has elapsed
if v[0] <= m_now:
msg = v[1]
queue = v[3]

logger.debug("Time is: %s; Schedule is: %s - Running %s"
% (ts_now, v[0], msg))
# Iterate all interval style jobs and update their state,
# send REQUESTs if necessary
for job_hash, job in iteritems(self.interval_jobs):
# the next (monotonic) ts that this job should be executed in
next_monotonic = job[0]
# the function to be executed
job_message = job[1]
# the interval iter for this job
interval_iterator = job[2]
# the queue to execute the job in
queue = job[3]
# run_count: # of times to execute this job
run_count = job[4]

if next_monotonic <= m_now:
# The schedule time has elapsed

# v[4] is the current remaining run_count
if v[4] != INFINITE_RUN_COUNT:
# If run_count was 0, we cancel the job
if v[4] <= 0:
cancel_jobs.append(k)
logger.debug("Time is: %s; Schedule is: %s - Running %s"
% (ts_now, next_monotonic, job_message))

# Only do run_count processing if its set to anything
# besides the default of INFINITE
if run_count != INFINITE_RUN_COUNT:
# If run_count was <= 0, we cancel the job
if run_count <= 0:
cancel_jobs.append(job_hash)
else:
# Decrement run_count
v[4] -= 1
run_count -= 1
# Persist the change to redis
try:
message = deserialize(self.redis_server.get(k))
message = deserialize(
self.redis_server.get(job_hash))
new_headers = []
for header in message[1].split(','):
if 'run_count:' in header:
new_headers.append(
'run_count:{}'.format(v[4]))
'run_count:{}'.format(run_count))
else:
new_headers.append(header)
message[1] = ",".join(new_headers)
self.redis_server.set(k, serialize(message))
self.redis_server.set(
job_hash, serialize(message))
except Exception as e:
logger.warning(
'Unable to update key in redis '
'server: {}'.format(e))
# Perform the request since run_count still > 0
self.send_request(msg, queue=queue)
v[0] = next(v[2])
self.send_request(job_message, queue=queue)
next_monotonic = next(interval_iterator)
else:
# Scheduled job is in running infinitely
# Send job and update next schedule time
self.send_request(msg, queue=queue)
v[0] = next(v[2])
self.send_request(job_message, queue=queue)
next_monotonic = next(interval_iterator)

# Cancel and remove jobs where run_count has reached 0,
# and persist that to redis
for job in cancel_jobs:
try:
logger.debug('Cancelling job due to run_count: {}'
.format(k))
self.redis_server.delete(k)
self.redis_server.lrem('interval_jobs', 0, k)
.format(job_hash))
self.redis_server.delete(job_hash)
self.redis_server.lrem('interval_jobs', 0, job_hash)
except Exception as e:
logger.warning(
'Unable to update key in redis '
'server: {}'.format(e))
del self.interval_jobs[k]
del self.interval_jobs[job_hash]

if not self.maybe_send_heartbeat(events):
break
Expand All @@ -234,19 +262,19 @@ def redis_server(self):
else:
return self._redis_server

def send_request(self, jobmsg, queue=None):
def send_request(self, job_message, queue=None):
"""
Send a request message to the broker
Args:
jobmsg: The message to send to the broker
job_message: The message to send to the broker
queue: The name of the queue to use_impersonation
Returns:
str: ID of the message
"""
jobmsg = json.loads(jobmsg)
msgid = send_request(self.frontend, jobmsg, queue=queue,
job_message = json.loads(job_message)
msgid = send_request(self.frontend, job_message, queue=queue,
reply_requested=True)

return msgid
Expand Down

0 comments on commit eda45f0

Please sign in to comment.