Skip to content

Commit

Permalink
Allow tasks to be scheduled on a specific cluster (#555)
Browse files Browse the repository at this point in the history
* Allows schedule by cluster

* Update docs

* Add example in doc

* Fixes cluster field must be blank

* Adds cluster arg to schedule func

* Adds cluster field in list_display/filter for admin

* Uses Q filter to fetch schedule

* Fixes ref in doc

* Adds tests
  • Loading branch information
midse committed May 14, 2021
1 parent e807174 commit 957f807
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 1 deletion.
3 changes: 2 additions & 1 deletion django_q/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ScheduleAdmin(admin.ModelAdmin):
"func",
"schedule_type",
"repeats",
"cluster",
"next_run",
"last_run",
"success",
Expand All @@ -77,7 +78,7 @@ class ScheduleAdmin(admin.ModelAdmin):
if not croniter:
readonly_fields = ("cron",)

list_filter = ("next_run", "schedule_type")
list_filter = ("next_run", "schedule_type", "cluster")
search_fields = ("func",)
list_display_links = ("id", "name")

Expand Down
1 change: 1 addition & 0 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ def scheduler(broker: Broker = None):
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
.filter(db.models.Q(cluster__isnull=True) | db.models.Q(cluster=Conf.PREFIX))
):
args = ()
kwargs = {}
Expand Down
18 changes: 18 additions & 0 deletions django_q/migrations/0014_schedule_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.2 on 2021-05-11 05:59

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_q', '0013_task_attempt_count'),
]

operations = [
migrations.AddField(
model_name='schedule',
name='cluster',
field=models.CharField(blank=True, default=None, max_length=100, null=True),
),
]
1 change: 1 addition & 0 deletions django_q/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class Schedule(models.Model):
help_text=_("Cron expression"),
)
task = models.CharField(max_length=100, null=True, editable=False)
cluster = models.CharField(max_length=100, default=None, null=True, blank=True)

def success(self):
if self.task and Task.objects.filter(id=self.task):
Expand Down
3 changes: 3 additions & 0 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def schedule(func, *args, **kwargs):
:param repeats: how many times to repeat. 0=never, -1=always.
:param next_run: Next scheduled run.
:type next_run: datetime.datetime
:param cluster: optional cluster name.
:param cron: optional cron expression
:param kwargs: function keyword arguments.
:return: the schedule object.
Expand All @@ -100,6 +101,7 @@ def schedule(func, *args, **kwargs):
repeats = kwargs.pop("repeats", -1)
next_run = kwargs.pop("next_run", timezone.now())
cron = kwargs.pop("cron", None)
cluster = kwargs.pop("cluster", None)

# check for name duplicates instead of am unique constraint
if name and Schedule.objects.filter(name=name).exists():
Expand All @@ -117,6 +119,7 @@ def schedule(func, *args, **kwargs):
repeats=repeats,
next_run=next_run,
cron=cron,
cluster=cluster,
)
# make sure we trigger validation
s.full_clean()
Expand Down
42 changes: 42 additions & 0 deletions django_q/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,45 @@ def test_scheduler(broker, monkeypatch):
assert schedule.next_run > now
# Done
broker.delete_queue()

monkeypatch.setattr(Conf, 'PREFIX', 'some_cluster_name')
# create a schedule on another cluster
schedule = create_schedule('math.copysign',
1, -1,
name='test schedule on a another cluster',
hook='django_q.tests.tasks.result',
schedule_type=Schedule.HOURLY,
cluster="some_other_cluster_name",
repeats=1)
# run scheduler
scheduler(broker=broker)
# set up the workflow
task_queue = Queue()
stop_event = Event()
stop_event.set()
# push it
pusher(task_queue, stop_event, broker=broker)

# queue must be empty
assert task_queue.qsize() == 0

monkeypatch.setattr(Conf, 'PREFIX', 'default')
# create a schedule on the same cluster
schedule = create_schedule('math.copysign',
1, -1,
name='test schedule with no cluster',
hook='django_q.tests.tasks.result',
schedule_type=Schedule.HOURLY,
cluster="default",
repeats=1)
# run scheduler
scheduler(broker=broker)
# set up the workflow
task_queue = Queue()
stop_event = Event()
stop_event.set()
# push it
pusher(task_queue, stop_event, broker=broker)

# queue must contain a task
assert task_queue.qsize() == 1
2 changes: 2 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Configuration is handled via the ``Q_CLUSTER`` dictionary in your :file:`setting
All configuration settings are optional:

.. _name:

name
~~~~

Expand Down
11 changes: 11 additions & 0 deletions docs/schedules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ You can manage them through the :ref:`admin_page` or directly from your code wit
cron = '0 22 * * 1-5')
# Restrain a schedule to a specific cluster
schedule('math.hypot',
3, 4,
schedule_type=Schedule.DAILY,
cluster='my_cluster')
Missed schedules
----------------
Expand Down Expand Up @@ -116,6 +122,7 @@ Reference
:param str cron: Cron expression for the Cron type.
:param int repeats: Number of times to repeat schedule. -1=Always, 0=Never, n =n.
:param datetime next_run: Next or first scheduled execution datetime.
:param str cluster: optional cluster name. Task will be executed only on a cluster with a matching :ref:`name`.
:param dict q_options: options passed to async_task for this schedule
:param kwargs: optional keyword arguments for the scheduled function.

Expand Down Expand Up @@ -175,6 +182,10 @@ Reference
Number of times to repeat the schedule. -1=Always, 0=Never, n =n.
When set to -1, this will keep counting down.

.. py:attribute:: cluster
Task will be executed only on a cluster with a matching :ref:`name`.

.. py:attribute:: next_run
Datetime of the next scheduled execution.
Expand Down

0 comments on commit 957f807

Please sign in to comment.