Skip to content

Commit

Permalink
feat: add in an API endpoint to enqueue a task, and stop a task
Browse files Browse the repository at this point in the history
Make use of `rq.Job`'s `meta` field instead of relying on
`kwargs` -> `Job._kwargs` - doesn't seem to be reliable when a job is
killed

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
  • Loading branch information
akhilnarang committed Apr 2, 2024
1 parent f22787c commit 4b08426
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 13 deletions.
66 changes: 55 additions & 11 deletions frappe/core/doctype/background_task/background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
from typing import Any

from redis import Redis
from rq.job import Callback, Job
from rq.command import send_stop_job_command
from rq.job import Callback, InvalidJobOperation, Job

import frappe
from frappe import _
from frappe.model.document import Document
from frappe.utils.background_jobs import get_redis_conn


class BackgroundTask(Document):
Expand All @@ -30,7 +33,49 @@ class BackgroundTask(Document):
user: DF.Link | None
# end: auto-generated types

pass
def stop(self):
"""Stop the task"""
try:
send_stop_job_command(connection=get_redis_conn(), job_id=f"{frappe.local.site}::{self.task_id}")
except InvalidJobOperation:
frappe.msgprint(_("Job is not running."), title=_("Invalid Operation"))


@frappe.whitelist(methods=["POST"])
def enqueue_task(
method: str,
queue: str = "default",
timeout: int | None = None,
event: str | None = None,
on_success: str | None = None,
on_failure: str | None = None,
on_stopped: str | None = None,
at_front: bool = False,
kwargs: dict | None = None,
):
if kwargs is None:
kwargs = {}

job = enqueue(
method=method,
queue=queue,
timeout=timeout,
event=event,
on_success=on_success,
on_failure=on_failure,
on_stopped=on_stopped,
at_front=at_front,
**kwargs,
)
return {"task_id": job.id.split("::")[-1]}


@frappe.whitelist(methods=["POST"])
def stop_task(task_id: str):
if task := frappe.get_doc("Background Task", {"task_id": task_id}, for_update=True):
task.stop()
return "Stopped task"
return "Task with the given ID not found"


def enqueue(
Expand Down Expand Up @@ -84,8 +129,8 @@ def enqueue(
if not timeout:
timeout = get_queues_timeout().get(queue) or 300

queue_args = {
"site": frappe.local.site,
meta = {"site": frappe.local.site}
queue_args = meta | {
"user": frappe.session.user,
"method": method,
"event": event,
Expand All @@ -100,6 +145,7 @@ def enqueue_call():
on_failure=Callback(func=failure_callback),
on_stopped=Callback(func=stopped_callback),
timeout=timeout,
meta=meta,
kwargs=queue_args,
at_front=at_front,
failure_ttl=frappe.conf.get("rq_job_failure_ttl", RQ_JOB_FAILURE_TTL),
Expand Down Expand Up @@ -142,7 +188,7 @@ def enqueue_call():

def success_callback(job: Job, connection: Redis, result: Any) -> None:
"""Callback function to update the status of the job to "Completed"."""
frappe.init(site=job._kwargs.get("site"))
frappe.init(site=job.meta["site"])
frappe.connect()
doc = frappe.get_doc("Background Task", {"task_id": job.id.split("::")[-1]}, for_update=True)
doc.status = "Completed"
Expand All @@ -165,7 +211,7 @@ def success_callback(job: Job, connection: Redis, result: Any) -> None:

def failure_callback(job: Job, connection: Redis, *exc_info) -> None:
"""Callback function to update the status of the job to "Failed"."""
frappe.init(site=job._kwargs.get("site"))
frappe.init(site=job.meta["site"])
frappe.connect()
doc = frappe.get_doc("Background Task", {"task_id": job.id.split("::")[-1]}, for_update=True)
doc.status = "Failed"
Expand All @@ -191,17 +237,15 @@ def failure_callback(job: Job, connection: Redis, *exc_info) -> None:
frappe.destroy()


def stopped_callback(job: Job, connection: Redis, *args, **kwargs) -> None:
def stopped_callback(job: Job, connection: Redis) -> None:
"""Callback function to update the status of the job to "Stopped"."""
frappe.init(site=job._kwargs.get("site"))
frappe.init(site=job.meta["site"])
frappe.connect()
print("Stopped with args", args)
print("Stopped with kwargs", kwargs)
doc = frappe.get_doc("Background Task", {"task_id": job.id.split("::")[-1]}, for_update=True)
doc.status = "Stopped"
doc.save()
if doc.stopped_callback:
frappe.call(doc.stopped_callback, job, connection, *args, **kwargs)
frappe.call(doc.stopped_callback, job, connection)
frappe.utils.notify_user(
frappe.session.user,
"Alert",
Expand Down
27 changes: 25 additions & 2 deletions frappe/utils/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def enqueue_call():
on_failure=Callback(func=on_failure) if on_failure else None,
timeout=timeout,
kwargs=queue_args,
meta=queue_args,
at_front=at_front,
failure_ttl=frappe.conf.get("rq_job_failure_ttl") or RQ_JOB_FAILURE_TTL,
result_ttl=frappe.conf.get("rq_results_ttl") or RQ_RESULTS_TTL,
Expand Down Expand Up @@ -198,8 +199,30 @@ def run_doc_method(doctype, name, doc_method, **kwargs):
getattr(frappe.get_doc(doctype, name), doc_method)(**kwargs)


def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, retry=0):
"""Executes job in a worker, performs commit/rollback and logs if there is any error"""
def execute_job(
site: str,
method: str | Callable,
event: str | None = None,
job_name: str | None = None,
kwargs: dict | None = None,
user: str | None = None,
is_async: bool = True,
retry: int = 0,
):
"""
Executes job in a worker, performs commit/rollback and logs if there is any error
:param site: Site name
:param method: Method to be executed
:param event: Event name
:param job_name: Job name
:param kwargs: Keyword arguments to be passed to the method
:param user: User who triggered the job
:param is_async: If is_async=False, the method is executed immediately, else via a worker
:param retry: Number of times the job has been retried
:return: Return value of the method
"""
retval = None

if is_async:
Expand Down

0 comments on commit 4b08426

Please sign in to comment.