Skip to content

Commit

Permalink
feat: add in Background Job doctype
Browse files Browse the repository at this point in the history
Signed-off-by: Akhil Narang <me@akhilnarang.dev>
  • Loading branch information
akhilnarang committed Feb 28, 2024
1 parent 68b83b1 commit 82af7f5
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 0 deletions.
Empty file.
8 changes: 8 additions & 0 deletions frappe/core/doctype/background_task/background_task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) 2024, Frappe Technologies and contributors
// For license information, please see license.txt

// frappe.ui.form.on("Background Task", {
// refresh(frm) {

// },
// });
100 changes: 100 additions & 0 deletions frappe/core/doctype/background_task/background_task.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{
"actions": [],
"allow_rename": 1,
"creation": "2024-02-23 13:12:36.222147",
"doctype": "DocType",
"engine": "InnoDB",
"field_order": [
"task_id",
"status",
"user",
"method",
"result",
"success_callback",
"failure_callback",
"stopped_callback"
],
"fields": [
{
"fieldname": "task_id",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Task ID",
"not_nullable": 1,
"read_only": 1,
"reqd": 1,
"search_index": 1
},
{
"fieldname": "status",
"fieldtype": "Select",
"label": "Status",
"options": "Queued\nIn Progress\nCompleted\nFailed\nStopped",
"read_only": 1,
"reqd": 1,
"search_index": 1
},
{
"fieldname": "user",
"fieldtype": "Link",
"label": "User",
"options": "User",
"read_only": 1,
"search_index": 1
},
{
"fieldname": "result",
"fieldtype": "Code",
"label": "Result",
"read_only": 1
},
{
"fieldname": "method",
"fieldtype": "Data",
"label": "Method",
"read_only": 1
},
{
"fieldname": "success_callback",
"fieldtype": "Code",
"label": "Success Callback",
"read_only": 1
},
{
"fieldname": "failure_callback",
"fieldtype": "Code",
"label": "Failure Callback",
"read_only": 1
},
{
"fieldname": "stopped_callback",
"fieldtype": "Code",
"label": "Stopped Callback",
"read_only": 1
}
],
"index_web_pages_for_search": 1,
"links": [],
"modified": "2024-02-27 13:52:49.741902",
"modified_by": "Administrator",
"module": "Core",
"name": "Background Task",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"sort_field": "modified",
"sort_order": "DESC",
"states": []
}
214 changes: 214 additions & 0 deletions frappe/core/doctype/background_task/background_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Copyright (c) 2024, Frappe Technologies and contributors
# For license information, please see license.txt
import traceback
from collections.abc import Callable
from typing import Any

from redis import Redis
from rq.job import Callback, Job

import frappe
from frappe.model.document import Document


class BackgroundTask(Document):
# begin: auto-generated types
# This code is auto-generated. Do not modify anything in this block.

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from frappe.types import DF

failure_callback: DF.Code | None
method: DF.Data | None
result: DF.Code | None
status: DF.Literal["Queued", "In Progress", "Completed", "Failed", "Stopped"]
stopped_callback: DF.Code | None
success_callback: DF.Code | None
task_id: DF.Data
user: DF.Link | None
# end: auto-generated types

pass


def enqueue(
method: str | Callable,
queue: str = "default",
timeout: int | None = None,
event: str | None = None,
enqueue_after_commit: bool = False,
on_success: Callable | str | None = None,
on_failure: Callable | str | None = None,
on_stopped: Callable | str | None = None,
at_front: bool = False,
**kwargs,
) -> Job | Any | None:
"""
Enqueue method to be executed using a background worker
:param method: method string or method object
:param queue: should be either long, default or short
:param timeout: should be set according to the functions
:param event: this is passed to enable clearing of jobs from queues
:param enqueue_after_commit: if True, enqueue after the current transaction is committed
:param on_success: Success callback
:param on_failure: Failure callback
:param on_stopped: Stopped callback
:param at_front: Enqueue the job at the front of the queue or not
:param kwargs: keyword arguments to be passed to the method
:return: Job object normally, if executing now then the result of the method, nothing if enqueueing after commit
"""

from frappe.utils.background_jobs import (
RQ_JOB_FAILURE_TTL,
RQ_RESULTS_TTL,
create_job_id,
execute_job,
get_queue,
get_queues_timeout,
)

task_id = create_job_id()

try:
q = get_queue(queue)
except ConnectionError:
if frappe.local.flags.in_migrate:
# If redis is not available during migration, execute the job directly
print(f"Redis queue is unreachable: Executing {method} synchronously")
return frappe.call(method, **kwargs)
raise

if not timeout:
timeout = get_queues_timeout().get(queue) or 300

queue_args = {
"site": frappe.local.site,
"user": frappe.session.user,
"method": method,
"event": event,
"job_name": frappe.cstr(method),
"kwargs": kwargs,
}

def enqueue_call():
return q.enqueue_call(
execute_job,
on_success=Callback(func=success_callback),
on_failure=Callback(func=failure_callback),
on_stopped=Callback(func=stopped_callback),
timeout=timeout,
kwargs=queue_args,
at_front=at_front,
failure_ttl=frappe.conf.get("rq_job_failure_ttl", RQ_JOB_FAILURE_TTL),
result_ttl=frappe.conf.get("rq_results_ttl", RQ_RESULTS_TTL),
job_id=task_id,
)

doc = frappe.new_doc(
"Background Job",
task_id=task_id.split("::")[-1],
user=frappe.session.user,
status="Queued",
method=frappe.utils.method_to_string(method),
)
if on_success:
doc.success_callback = frappe.utils.method_to_string(on_success)

if on_failure:
doc.failure_callback = frappe.utils.method_to_string(on_failure)

if on_stopped:
doc.stopped_callback = frappe.utils.method_to_string(on_stopped)

doc.insert(ignore_permissions=True)
frappe.utils.notify_user(
frappe.session.user,
"Alert",
frappe.session.user,
"Background Job",
doc.name,
frappe._("Job queued:") + f" {doc.method}",
)

if enqueue_after_commit:
frappe.db.after_commit.add(enqueue_call)
return

return enqueue_call()


def success_callback(job: Job, connection: Redis, result: Any) -> None:
"""Callback function to update the status of the job to "Completed"."""
frappe.init(site=job._kwargs.get("site"))
frappe.connect()
doc = frappe.get_doc("Background Task", {"task_id": job.id.split("::")[-1]}, for_update=True)
doc.status = "Completed"
doc.result = result
doc.save()
if doc.success_callback:
frappe.call(doc.success_callback, job, connection, result)

frappe.utils.notify_user(
frappe.session.user,
"Alert",
frappe.session.user,
"Background Job",
doc.name,
frappe._("Job successfully completed:") + f" {doc.method}",
)
frappe.db.commit()
frappe.destroy()


def failure_callback(job: Job, connection: Redis, *exc_info) -> None:
"""Callback function to update the status of the job to "Failed"."""
frappe.init(site=job._kwargs.get("site"))
frappe.connect()
doc = frappe.get_doc("Background Task", {"task_id": job.id.split("::")[-1]}, for_update=True)
doc.status = "Failed"
doc.result = "".join(traceback.format_exception(*exc_info))
doc.save()
from frappe.utils.background_jobs import truncate_failed_registry

if doc.failure_callback:
frappe.call(doc.failure_callback, job, connection, *exc_info)
else:
frappe.call(truncate_failed_registry, job, connection, *exc_info)
if failure_callback := job._kwargs.get("on_failure", truncate_failed_registry):
failure_callback(job, connection, *exc_info)
frappe.utils.notify_user(
frappe.session.user,
"Alert",
frappe.session.user,
"Background Job",
doc.name,
frappe._("Job failed:") + f" {doc.method}",
)
frappe.db.commit()
frappe.destroy()


def stopped_callback(job: Job, connection: Redis, *args, **kwargs) -> None:
"""Callback function to update the status of the job to "Stopped"."""
frappe.init(site=job._kwargs.get("site"))
frappe.connect()
print("Stopped with args", args)
print("Stopped with kwargs", kwargs)
doc = frappe.get_doc("Background Task", {"task_id": job.id.split("::")[-1]}, for_update=True)
doc.status = "Stopped"
doc.save()
if doc.stopped_callback:
frappe.call(doc.stopped_callback, job, connection, *args, **kwargs)
frappe.utils.notify_user(
frappe.session.user,
"Alert",
frappe.session.user,
"Background Job",
doc.name,
frappe._("Job stopped:") + f" {doc.method}",
)
frappe.db.commit()
frappe.destroy()
9 changes: 9 additions & 0 deletions frappe/core/doctype/background_task/test_background_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright (c) 2024, Frappe Technologies and Contributors
# See license.txt

# import frappe
from frappe.tests.utils import FrappeTestCase


class TestBackgroundTask(FrappeTestCase):
pass

0 comments on commit 82af7f5

Please sign in to comment.