Skip to content

Commit

Permalink
Keep track of running idempotent jobs
Browse files Browse the repository at this point in the history
In order to be able to restart them in case the broker fails.
  • Loading branch information
NicolasLM committed Feb 10, 2018
1 parent cb4a92b commit 9b0717b
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ build
doc/_build

.coverage
.pytest_cache
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
'move_future_jobs.lua',
'enqueue_job.lua',
'enqueue_future_job.lua',
'flush.lua'
'flush.lua',
'get_job_from_queue.lua'
],
},
)
18 changes: 16 additions & 2 deletions spinach/brokers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from logging import getLogger
import threading
from typing import Optional
import uuid

from ..job import Job, JobStatus
from ..const import WAIT_FOR_EVENT_MAX_SECONDS
Expand All @@ -16,6 +17,7 @@ class Broker(ABC):
def __init__(self):
self._something_happened = threading.Event()
self._namespace = None
self._id = uuid.uuid4()

def wait_for_event(self):
next_future_job_delta = self.next_future_job_delta
Expand Down Expand Up @@ -62,16 +64,25 @@ def job_ran(self, job: Job, err: Optional[Exception]):
"""Notification that a job has been ran (successfully or not)."""
if not err:
job.status = JobStatus.SUCCEEDED
return self._something_happened.set()
self._remove_job_from_running(job)
self._something_happened.set()
return

if job.should_retry:
job.retries += 1
job.at = (
datetime.now(timezone.utc) + exponential_backoff(job.retries)
)
return self.enqueue_job(job)
self.enqueue_job(job)
return

job.status = JobStatus.FAILED
self._remove_job_from_running(job)
self._something_happened.set()

@abstractmethod
def _remove_job_from_running(self, job: Job):
"""Remove a job from the list of running ones."""

@abstractmethod
def get_job_from_queue(self, queue: str):
Expand Down Expand Up @@ -99,3 +110,6 @@ def next_future_job_delta(self) -> Optional[float]:
@abstractmethod
def flush(self):
"""Delete everything in the namespace."""

def __repr__(self):
return '<{}: {}>'.format(self.__class__.__name__, self._id)
9 changes: 9 additions & 0 deletions spinach/brokers/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self):
self._lock = threading.RLock()
self._queues = dict()
self._future_jobs = list()
self._running_jobs = list()

def _get_queue(self, queue_name: str):
queue_name = self._to_namespaced(queue_name)
Expand Down Expand Up @@ -76,3 +77,11 @@ def flush(self):
with self._lock:
self._queues = dict()
self._future_jobs = list()

def _remove_job_from_running(self, job: Job):
"""Remove a job from the list of running ones.
Easy, the memory broker doesn't track running jobs. If the broker dies
there is nothing we can do.
"""
pass
60 changes: 45 additions & 15 deletions spinach/brokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@

from ..brokers.base import Broker
from ..job import Job, JobStatus
from ..const import FUTURE_JOBS_KEY, NOTIFICATIONS_KEY
from ..const import FUTURE_JOBS_KEY, NOTIFICATIONS_KEY, RUNNING_JOBS_KEY

logger = getLogger('spinach.broker')
here = path.abspath(path.dirname(__file__))


# Todo: make move_future_jobs into a more generic task not always executed at
# arbiter loop (like once per minute) that:
# - moves future jobs
# - registers the broker
# - moves running jobs from stale brokers

class RedisBroker(Broker):

def __init__(self, redis: Optional[StrictRedis]=None):
Expand All @@ -27,6 +33,7 @@ def __init__(self, redis: Optional[StrictRedis]=None):
self._enqueue_job = self._load_script('enqueue_job.lua')
self._enqueue_future_job = self._load_script('enqueue_future_job.lua')
self._flush = self._load_script('flush.lua')
self._get_job_from_queue = self._load_script('get_job_from_queue.lua')

self._subscriber_thread = None
self._must_stop = threading.Event()
Expand All @@ -36,32 +43,43 @@ def _load_script(self, filename: str) -> Script:
script_data = f.read()
return self._r.register_script(script_data)

def _run_script(self, script: Script, *args):
args = [str(self._id)] + list(args)
return script(args=args)

def enqueue_job(self, job: Job):
"""Add a job to a queue"""
"""Add a job to a queue."""
if job.should_start:
job.status = JobStatus.QUEUED
self._enqueue_job(args=[
self._run_script(
self._enqueue_job,
self._to_namespaced(job.queue),
self._to_namespaced(NOTIFICATIONS_KEY),
job.serialize()
])
job.serialize(),
job.id,
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
)
else:
job.status = JobStatus.WAITING
self._enqueue_future_job(args=[
self._run_script(
self._enqueue_future_job,
self._to_namespaced(FUTURE_JOBS_KEY),
self._to_namespaced(NOTIFICATIONS_KEY),
job.at_timestamp,
job.serialize()
])
job.serialize(),
job.id,
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
)

def move_future_jobs(self) -> int:
num_jobs_moved = self._move_future_jobs(args=[
num_jobs_moved = self._run_script(
self._move_future_jobs,
self.namespace,
self._to_namespaced(FUTURE_JOBS_KEY),
self._to_namespaced(NOTIFICATIONS_KEY),
math.ceil(datetime.now(timezone.utc).timestamp()),
JobStatus.QUEUED.value
])
)
logger.debug("Redis moved %s job(s) from future to current queues",
num_jobs_moved)
return num_jobs_moved
Expand All @@ -76,13 +94,25 @@ def _get_next_future_job(self)-> Optional[Job]:
return Job.deserialize(job[0].decode())

def get_job_from_queue(self, queue: str) -> Optional[Job]:
job_json_string = self._r.lpop(self._to_namespaced(queue))
job_json_string = self._run_script(
self._get_job_from_queue,
self._to_namespaced(queue),
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
JobStatus.RUNNING.value
)
if not job_json_string:
return None

job = Job.deserialize(job_json_string.decode())
job.status = JobStatus.RUNNING
return job
return Job.deserialize(job_json_string.decode())

def _remove_job_from_running(self, job: Job):
if job.max_retries == 0:
return

self._r.hdel(
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
str(job.id)
)

def _subscriber_func(self):
logger.debug('Redis broker subscriber started')
Expand Down Expand Up @@ -114,4 +144,4 @@ def stop(self):
self._must_stop.set()

def flush(self):
self._flush(args=[self.namespace])
self._run_script(self._flush, self.namespace)
12 changes: 8 additions & 4 deletions spinach/brokers/redis_scripts/enqueue_future_job.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
local future_jobs = ARGV[1]
local notifications = ARGV[2]
local at_timestamp = ARGV[3]
local job_json = ARGV[4]
local broker_id = ARGV[1]
local future_jobs = ARGV[2]
local notifications = ARGV[3]
local at_timestamp = ARGV[4]
local job_json = ARGV[5]
local job_id = ARGV[6]
local running_jobs_key = ARGV[7]

redis.call('zadd', future_jobs, at_timestamp, job_json)
redis.call('hdel', running_jobs_key, job_id)
redis.call('publish', notifications, '')

10 changes: 7 additions & 3 deletions spinach/brokers/redis_scripts/enqueue_job.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
local queue = ARGV[1]
local notifications = ARGV[2]
local job_json = ARGV[3]
local broker_id = ARGV[1]
local queue = ARGV[2]
local notifications = ARGV[3]
local job_json = ARGV[4]
local job_id = ARGV[5]
local running_jobs_key = ARGV[6]

redis.call('rpush', queue, job_json)
redis.call('hdel', running_jobs_key, job_id)
redis.call('publish', notifications, '')

3 changes: 2 additions & 1 deletion spinach/brokers/redis_scripts/flush.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local namespace = ARGV[1]
local broker_id = ARGV[1]
local namespace = ARGV[2]
local pattern = string.format("%s/*", namespace)

for _, key in ipairs(redis.call('keys', pattern)) do
Expand Down
22 changes: 22 additions & 0 deletions spinach/brokers/redis_scripts/get_job_from_queue.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
local broker_id = ARGV[1]
local queue = ARGV[2]
local running_jobs_key = ARGV[3]
local job_status_running = tonumber(ARGV[4])

local job_json = redis.call('lpop', queue)
if not job_json then
return nil
end

local job = cjson.decode(job_json)
job["status"] = job_status_running
local job_json = cjson.encode(job)

if job["max_retries"] == 0 then
-- job is not idempotent, there is no use to track
-- if it's running
return job_json
end

redis.call('hset', running_jobs_key, job["id"], job_json)
return job_json
14 changes: 8 additions & 6 deletions spinach/brokers/redis_scripts/move_future_jobs.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
local namespace = ARGV[1]
local future_jobs = ARGV[2]
local notifications = ARGV[3]
local now = ARGV[4]
local job_status_queued = ARGV[5]
local broker_id = ARGV[1]
local namespace = ARGV[2]
local future_jobs = ARGV[3]
local notifications = ARGV[4]
local now = ARGV[5]
local job_status_queued = tonumber(ARGV[6])

local jobs_json = redis.call('zrangebyscore', future_jobs, '-inf', now, 'LIMIT', 0, 1)
local jobs_moved = 0
Expand All @@ -15,7 +16,8 @@ for i, job_json in ipairs(jobs_json) do
local job = cjson.decode(job_json)
local queue = string.format("%s/%s", namespace, job["queue"])
job["status"] = job_status_queued
redis.call('rpush', queue, job_json)
local job_json_updated = cjson.encode(job)
redis.call('rpush', queue, job_json_updated)
redis.call('zrem', future_jobs, job_json)
jobs_moved = jobs_moved + 1
end
Expand Down
1 change: 1 addition & 0 deletions spinach/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
DEFAULT_MAX_RETRIES = 0

FUTURE_JOBS_KEY = '_future-jobs'
RUNNING_JOBS_KEY = '_running-jobs-on-broker-{}'
NOTIFICATIONS_KEY = '_notifications'

WAIT_FOR_EVENT_MAX_SECONDS = 60
9 changes: 7 additions & 2 deletions spinach/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ def serialize(self):
'queue': self.queue,
'max_retries': self.max_retries,
'retries': self.retries,
'at': self.at.timestamp(),
'at': int(self.at.timestamp()), # seconds component
'at_us': self.at.microsecond, # microseconds component
'task_args': self.task_args,
'task_kwargs': self.task_kwargs
}, sort_keys=True)

@classmethod
def deserialize(cls, job_json_string: str):
job_dict = json.loads(job_json_string)
at = datetime.fromtimestamp(job_dict['at'], tz=timezone.utc)
at = at.replace(microsecond=job_dict['at_us'])
job = Job(
job_dict['task_name'],
job_dict['queue'],
datetime.fromtimestamp(job_dict['at'], tz=timezone.utc),
at,
job_dict['max_retries'],
task_args=tuple(job_dict['task_args']),
task_kwargs=job_dict['task_kwargs'],
Expand All @@ -101,6 +104,8 @@ def __eq__(self, other):
try:
if not getattr(self, attr) == getattr(other, attr):
return False

except AttributeError:
return False

return True
5 changes: 5 additions & 0 deletions tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,8 @@ def test_job_ran(broker):
broker.job_ran(job, RuntimeError('Error'))
assert job.status is JobStatus.WAITING
assert job.at > now


def test_repr(broker):
assert broker.__class__.__name__ in repr(broker)
assert str(broker._id) in repr(broker)

0 comments on commit 9b0717b

Please sign in to comment.