-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(async): Add django-q2 with ORM broker and demo job #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2fe8cee
01d444f
9ec7aa7
87a6d97
de05edd
fcb30d9
6e65b26
9b5efd9
ade5709
d6f5159
0b7c407
050b52e
094440f
e1222fa
a33ef10
b63acc7
a414274
1cca0d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| import logging | ||
|
taylor-osler-sentry marked this conversation as resolved.
|
||
| import os | ||
| import re | ||
| import shutil | ||
| import signal | ||
| import subprocess | ||
| import sys | ||
| import threading | ||
| from http.server import BaseHTTPRequestHandler, HTTPServer | ||
| from typing import Any | ||
|
|
||
| from django.core.management.base import BaseCommand | ||
| from django_q.conf import Conf | ||
| from django_q.humanhash import humanize | ||
| from django_q.status import Stat | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _CLUSTER_NAME_RE = re.compile(r"Q Cluster (\S+) starting\.") | ||
|
|
||
| _state: dict[str, Any] = {} | ||
| _shutdown = threading.Event() | ||
|
|
||
|
|
||
| class _HealthHandler(BaseHTTPRequestHandler): | ||
| def do_GET(self) -> None: | ||
| cluster_name = _state.get("cluster_name") | ||
|
|
||
| if not cluster_name: | ||
| self._respond(503, "cluster not yet started", None) | ||
| return | ||
|
|
||
| # TODO: this is awkward. Because the output is "humanized" we can't do a simple query. | ||
| # TODO: is there maybe some way to un-humanize? | ||
| target = next( | ||
| (s for s in Stat.get_all() if humanize(s.cluster_id.hex) == cluster_name), | ||
| None, | ||
| ) | ||
|
|
||
| if target is None: | ||
| self._respond(503, cluster_name, "not found or still starting") | ||
| elif target.status in (Conf.IDLE, Conf.WORKING): | ||
| self._respond(200, cluster_name, target.status) | ||
| else: | ||
|
sentry-warden[bot] marked this conversation as resolved.
|
||
| status = target.status | ||
| self._respond(500, cluster_name, status) | ||
|
|
||
| def _respond(self, code: int, cluster_name: str, status: Any) -> None: | ||
| self.send_response(code) | ||
| self.send_header("Content-type", "text/html") | ||
| self.end_headers() | ||
| self.wfile.write( | ||
| ( | ||
| "<html><head><title>Django-Q Health Check</title></head>" | ||
| f"<body><p>Health check returned {code} response</p>" | ||
| f"<p>Cluster {cluster_name} status: {status}</p></body></html>" | ||
| ).encode() | ||
| ) | ||
|
taylor-osler-sentry marked this conversation as resolved.
|
||
|
|
||
| def log_message(self, format: str, *args: Any) -> None: | ||
| pass | ||
|
|
||
|
|
||
| def _start_health_server() -> HTTPServer: | ||
| port = int(os.environ.get("PORT", "8080")) | ||
| server = HTTPServer(("0.0.0.0", port), _HealthHandler) | ||
| thread = threading.Thread(target=server.serve_forever, daemon=True) | ||
| thread.start() | ||
| logger.info("Health check server listening on port %d", port) | ||
|
sentry-warden[bot] marked this conversation as resolved.
|
||
| return server | ||
|
|
||
|
|
||
| def _handle_shutdown(signum: int, frame: Any) -> None: | ||
| logger.info("Received signal %d, shutting down", signum) | ||
| _shutdown.set() | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Run a Q cluster subprocess wrapped with an HTTP health check server." | ||
|
|
||
| def handle(self, *args: Any, **options: Any) -> None: | ||
| _shutdown.clear() | ||
| _state.clear() | ||
| signal.signal(signal.SIGTERM, _handle_shutdown) | ||
| signal.signal(signal.SIGINT, _handle_shutdown) | ||
|
|
||
| server = _start_health_server() | ||
|
|
||
| django_admin = shutil.which("django-admin") | ||
| if django_admin is None or django_admin == "": | ||
|
taylor-osler-sentry marked this conversation as resolved.
|
||
| django_admin = "/app/.venv/bin/django-admin" | ||
|
|
||
| proc = None | ||
| try: | ||
| proc = subprocess.Popen( | ||
| [django_admin, "qcluster", "--settings", "firetower.settings"], | ||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.STDOUT, | ||
| text=True, | ||
| bufsize=1, | ||
| ) | ||
|
|
||
| def _pump_output() -> None: | ||
| assert proc.stdout is not None | ||
| for line in proc.stdout: | ||
| sys.stdout.write(line) | ||
| sys.stdout.flush() | ||
| if "cluster_name" not in _state: | ||
| match = _CLUSTER_NAME_RE.search(line) | ||
| if match: | ||
| _state["cluster_name"] = match.group(1) | ||
| logger.info("Detected cluster name: %s", match.group(1)) | ||
|
|
||
| pump_thread = threading.Thread(target=_pump_output, daemon=True) | ||
| pump_thread.start() | ||
|
|
||
| while not _shutdown.is_set(): | ||
| if proc.poll() is not None: | ||
| logger.warning( | ||
| "qcluster subprocess exited with code %s", proc.returncode | ||
| ) | ||
| break | ||
| _shutdown.wait(timeout=1) | ||
|
Check warning on line 123 in src/firetower/incidents/management/commands/wrapped_worker.py
|
||
| finally: | ||
|
sentry-warden[bot] marked this conversation as resolved.
|
||
| server.shutdown() | ||
| server.server_close() | ||
|
Check failure on line 126 in src/firetower/incidents/management/commands/wrapped_worker.py
|
||
| if proc and proc.poll() is None: | ||
| proc.terminate() | ||
|
Check warning on line 128 in src/firetower/incidents/management/commands/wrapped_worker.py
|
||
| try: | ||
| proc.wait(timeout=10) | ||
| except subprocess.TimeoutExpired: | ||
| proc.kill() | ||
| proc.wait() | ||
|
taylor-osler-sentry marked this conversation as resolved.
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| from django.db import migrations | ||
|
|
||
| from firetower.incidents.tasks import SCHEDULES | ||
|
taylor-osler-sentry marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def create_schedule(apps, schema_editor): | ||
| Schedule = apps.get_model("django_q", "Schedule") | ||
| schedule_name = "schedule_demo" | ||
| Schedule.objects.get_or_create( | ||
| name=schedule_name, defaults=SCHEDULES[schedule_name] | ||
| ) | ||
|
taylor-osler-sentry marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def delete_schedule(apps, schema_editor): | ||
| Schedule = apps.get_model("django_q", "Schedule") | ||
| schedule_name = "schedule_demo" | ||
| Schedule.objects.filter(name=schedule_name).delete() | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| ("incidents", "0015_add_notion_troubleshooting_link_type"), | ||
| ("django_q", "0018_task_success_index"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.RunPython(create_schedule, delete_schedule), | ||
| ] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| import functools | ||
| import logging | ||
| import re | ||
| from typing import Protocol | ||
|
|
||
| from datadog import statsd | ||
| from django_q.tasks import Schedule | ||
|
|
||
| from firetower.incidents.models import Incident | ||
|
|
||
| SCHEDULES = { | ||
| "schedule_demo": { | ||
| "func": "firetower.incidents.tasks.schedule_demo", | ||
| "schedule_type": Schedule.MINUTES, # Minutes | ||
|
github-actions[bot] marked this conversation as resolved.
sentry[bot] marked this conversation as resolved.
cursor[bot] marked this conversation as resolved.
|
||
| "minutes": 5, | ||
| "repeats": -1, # repeat indefinitely | ||
| }, | ||
| } | ||
|
|
||
| DATADOG_INVALID_CHARS = re.compile(r"[^A-Za-z0-9-_.\/]") | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class NamedFunction(Protocol): | ||
| __name__: str | ||
|
|
||
| def __call__(self) -> None: | ||
| pass | ||
|
|
||
|
|
||
| def datadog_log(f: NamedFunction) -> NamedFunction: | ||
| task_name: str = DATADOG_INVALID_CHARS.sub("_", f.__name__) | ||
| tags = [f"task:{task_name}"] | ||
|
|
||
| @functools.wraps(f) | ||
| def wrapper() -> None: | ||
| statsd.increment("django_q.task.run", 1, tags) | ||
| try: | ||
| f() | ||
| except Exception as e: | ||
| statsd.increment("django_q.task.error", 1, tags) | ||
| logger.error( | ||
| f"Error while executing task '{task_name}': {e}", exc_info=True | ||
| ) | ||
|
sentry-warden[bot] marked this conversation as resolved.
sentry[bot] marked this conversation as resolved.
|
||
| raise e | ||
|
cursor[bot] marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| else: | ||
| statsd.increment("django_q.task.success", 1, tags) | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
| @datadog_log | ||
| def schedule_demo() -> None: | ||
| incident = Incident.objects.order_by("-created_at").first() | ||
| if incident: | ||
| logger.info(f"Most recent incident: INC-{incident.id}: {incident.title}") | ||
| else: | ||
| logger.info("No incidents found.") | ||


Uh oh!
There was an error while loading. Please reload this page.