Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow tasks to be scheduled on a specific cluster #555

Merged
merged 9 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion django_q/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ScheduleAdmin(admin.ModelAdmin):
"func",
"schedule_type",
"repeats",
"cluster",
"next_run",
"last_run",
"success",
Expand All @@ -77,7 +78,7 @@ class ScheduleAdmin(admin.ModelAdmin):
if not croniter:
readonly_fields = ("cron",)

list_filter = ("next_run", "schedule_type")
list_filter = ("next_run", "schedule_type", "cluster")
search_fields = ("func",)
list_display_links = ("id", "name")

Expand Down
1 change: 1 addition & 0 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ def scheduler(broker: Broker = None):
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
.filter(db.models.Q(cluster__isnull=True) | db.models.Q(cluster=Conf.PREFIX))
):
args = ()
kwargs = {}
Expand Down
18 changes: 18 additions & 0 deletions django_q/migrations/0014_schedule_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.2 on 2021-05-11 05:59

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_q', '0013_task_attempt_count'),
]

operations = [
migrations.AddField(
model_name='schedule',
name='cluster',
field=models.CharField(blank=True, default=None, max_length=100, null=True),
),
]
1 change: 1 addition & 0 deletions django_q/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class Schedule(models.Model):
help_text=_("Cron expression"),
)
task = models.CharField(max_length=100, null=True, editable=False)
cluster = models.CharField(max_length=100, default=None, null=True, blank=True)

def success(self):
if self.task and Task.objects.filter(id=self.task):
Expand Down
3 changes: 3 additions & 0 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def schedule(func, *args, **kwargs):
:param repeats: how many times to repeat. 0=never, -1=always.
:param next_run: Next scheduled run.
:type next_run: datetime.datetime
:param cluster: optional cluster name.
:param cron: optional cron expression
:param kwargs: function keyword arguments.
:return: the schedule object.
Expand All @@ -100,6 +101,7 @@ def schedule(func, *args, **kwargs):
repeats = kwargs.pop("repeats", -1)
next_run = kwargs.pop("next_run", timezone.now())
cron = kwargs.pop("cron", None)
cluster = kwargs.pop("cluster", None)

# check for name duplicates instead of am unique constraint
if name and Schedule.objects.filter(name=name).exists():
Expand All @@ -117,6 +119,7 @@ def schedule(func, *args, **kwargs):
repeats=repeats,
next_run=next_run,
cron=cron,
cluster=cluster,
)
# make sure we trigger validation
s.full_clean()
Expand Down
42 changes: 42 additions & 0 deletions django_q/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,45 @@ def test_scheduler(broker, monkeypatch):
assert schedule.next_run > now
# Done
broker.delete_queue()

monkeypatch.setattr(Conf, 'PREFIX', 'some_cluster_name')
# create a schedule on another cluster
schedule = create_schedule('math.copysign',
1, -1,
name='test schedule on a another cluster',
hook='django_q.tests.tasks.result',
schedule_type=Schedule.HOURLY,
cluster="some_other_cluster_name",
repeats=1)
# run scheduler
scheduler(broker=broker)
# set up the workflow
task_queue = Queue()
stop_event = Event()
stop_event.set()
# push it
pusher(task_queue, stop_event, broker=broker)

# queue must be empty
assert task_queue.qsize() == 0

monkeypatch.setattr(Conf, 'PREFIX', 'default')
# create a schedule on the same cluster
schedule = create_schedule('math.copysign',
1, -1,
name='test schedule with no cluster',
hook='django_q.tests.tasks.result',
schedule_type=Schedule.HOURLY,
cluster="default",
repeats=1)
# run scheduler
scheduler(broker=broker)
# set up the workflow
task_queue = Queue()
stop_event = Event()
stop_event.set()
# push it
pusher(task_queue, stop_event, broker=broker)

# queue must contain a task
assert task_queue.qsize() == 1
2 changes: 2 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Configuration is handled via the ``Q_CLUSTER`` dictionary in your :file:`setting

All configuration settings are optional:

.. _name:

name
~~~~

Expand Down
11 changes: 11 additions & 0 deletions docs/schedules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ You can manage them through the :ref:`admin_page` or directly from your code wit
cron = '0 22 * * 1-5')


# Restrain a schedule to a specific cluster
schedule('math.hypot',
3, 4,
schedule_type=Schedule.DAILY,
cluster='my_cluster')


Missed schedules
----------------
Expand Down Expand Up @@ -116,6 +122,7 @@ Reference
:param str cron: Cron expression for the Cron type.
:param int repeats: Number of times to repeat schedule. -1=Always, 0=Never, n =n.
:param datetime next_run: Next or first scheduled execution datetime.
:param str cluster: optional cluster name. Task will be executed only on a cluster with a matching :ref:`name`.
:param dict q_options: options passed to async_task for this schedule
:param kwargs: optional keyword arguments for the scheduled function.

Expand Down Expand Up @@ -175,6 +182,10 @@ Reference
Number of times to repeat the schedule. -1=Always, 0=Never, n =n.
When set to -1, this will keep counting down.

.. py:attribute:: cluster

Task will be executed only on a cluster with a matching :ref:`name`.

.. py:attribute:: next_run

Datetime of the next scheduled execution.
Expand Down