Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix update tasks #3958

Merged
merged 9 commits into from Apr 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 14 additions & 1 deletion celery/beat.py
Expand Up @@ -2,6 +2,7 @@
"""The periodic task scheduler."""
from __future__ import absolute_import, unicode_literals

import copy
import errno
import heapq
import os
Expand Down Expand Up @@ -197,6 +198,7 @@ def __init__(self, app, schedule=None, max_interval=None,
self.max_interval)
self.Producer = Producer or app.amqp.Producer
self._heap = None
self.old_schedulers = None
self.sync_every_tasks = (
app.conf.beat_sync_every if sync_every_tasks is None
else sync_every_tasks)
Expand Down Expand Up @@ -257,7 +259,9 @@ def tick(self, event_t=event_t, min=min, heappop=heapq.heappop,
adjust = self.adjust
max_interval = self.max_interval

if self._heap is None:
if (self._heap is None or
not self.schedules_equal(self.old_schedulers, self.schedule)):
self.old_schedulers = copy.copy(self.schedule)
self.populate_heap()

H = self._heap
Expand All @@ -281,6 +285,15 @@ def tick(self, event_t=event_t, min=min, heappop=heapq.heappop,
return min(verify[0], max_interval)
return min(adjust(next_time_to_run) or max_interval, max_interval)

def schedules_equal(self, old_schedules, new_schedules):
if set(old_schedules.keys()) != set(new_schedules.keys()):
return False
for name, old_entry in old_schedules.items():
new_entry = new_schedules.get(name)
if not new_entry or old_entry.schedule != new_entry.schedule:
return False
return True

def should_sync(self):
return (
(not self._last_sync or
Expand Down
63 changes: 62 additions & 1 deletion t/unit/app/test_beat.py
Expand Up @@ -8,7 +8,7 @@
from celery import uuid
from celery.beat import event_t
from celery.five import keys, string_t
from celery.schedules import schedule
from celery.schedules import schedule, crontab
from celery.utils.objects import Bunch


Expand Down Expand Up @@ -315,6 +315,23 @@ def test_ticks(self):
scheduler.update_from_dict(s)
assert scheduler.tick() == min(nums) - 0.010

def test_ticks_schedule_change(self):
# initialise schedule and check heap is not initialized
scheduler = mScheduler(app=self.app)
assert scheduler._heap is None

# set initial schedule and check heap is updated
schedule_5 = schedule(5)
scheduler.add(name='test_schedule', schedule=schedule_5)
scheduler.tick()
assert scheduler._heap[0].entry.schedule == schedule_5

# update schedule and check heap is updated
schedule_10 = schedule(10)
scheduler.add(name='test_schedule', schedule=schedule(10))
scheduler.tick()
assert scheduler._heap[0].entry.schedule == schedule_10

def test_schedule_no_remain(self):
scheduler = mScheduler(app=self.app)
scheduler.add(name='test_schedule_no_remain',
Expand Down Expand Up @@ -349,6 +366,50 @@ def test_populate_heap(self, _when):
scheduler.populate_heap()
assert scheduler._heap == [event_t(1, 5, scheduler.schedule['foo'])]

def create_schedule_entry(self, schedule):
entry = dict(
name='celery.unittest.add',
schedule=schedule,
app=self.app,
)
return beat.ScheduleEntry(**dict(entry))

def test_schedule_equal_schedule_vs_schedule_success(self):
scheduler = beat.Scheduler(app=self.app)
a = {'a': self.create_schedule_entry(schedule(5))}
b = {'a': self.create_schedule_entry(schedule(5))}
assert scheduler.schedules_equal(a, b)

def test_schedule_equal_schedule_vs_schedule_fail(self):
scheduler = beat.Scheduler(app=self.app)
a = {'a': self.create_schedule_entry(schedule(5))}
b = {'a': self.create_schedule_entry(schedule(10))}
assert not scheduler.schedules_equal(a, b)

def test_schedule_equal_crontab_vs_crontab_success(self):
scheduler = beat.Scheduler(app=self.app)
a = {'a': self.create_schedule_entry(crontab(minute=5))}
b = {'a': self.create_schedule_entry(crontab(minute=5))}
assert scheduler.schedules_equal(a, b)

def test_schedule_equal_crontab_vs_crontab_fail(self):
scheduler = beat.Scheduler(app=self.app)
a = {'a': self.create_schedule_entry(crontab(minute=5))}
b = {'a': self.create_schedule_entry(crontab(minute=10))}
assert not scheduler.schedules_equal(a, b)

def test_schedule_equal_crontab_vs_schedule_fail(self):
scheduler = beat.Scheduler(app=self.app)
a = {'a': self.create_schedule_entry(crontab(minute=5))}
b = {'a': self.create_schedule_entry(schedule(5))}
assert not scheduler.schedules_equal(a, b)

def test_schedule_equal_different_key_fail(self):
scheduler = beat.Scheduler(app=self.app)
a = {'a': self.create_schedule_entry(schedule(5))}
b = {'b': self.create_schedule_entry(schedule(5))}
assert not scheduler.schedules_equal(a, b)


def create_persistent_scheduler(shelv=None):
if shelv is None:
Expand Down