Skip to content

Commit

Permalink
Replace celery with django-q2 (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alschn authored Feb 2, 2024
1 parent dd75732 commit 08fe1a9
Show file tree
Hide file tree
Showing 17 changed files with 683 additions and 662 deletions.
4 changes: 1 addition & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ gunicorn = "*"
django-environ = "*"
django-storages = "*"
boto3 = "*"
celery = "*"
django-celery-results = "*"
django-celery-beat = "*"
django-q2 = "*"
flower = "*"
django-friendship = "*"
django-jsonform = "*"
Expand Down
941 changes: 472 additions & 469 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
<h1>VibeLink</h1>
<img src="https://img.shields.io/badge/Python-14354C?style=for-the-badge&logo=python&logoColor=white" alt=""/>
<img src="https://img.shields.io/badge/Django-092E20?style=for-the-badge&logo=django&logoColor=white" alt=""/>
<img src="https://img.shields.io/badge/Celery-8C9A41?&style=for-the-badge&logo=celery&logoColor=white" alt=""/>
<img src="https://img.shields.io/badge/PostgreSQL-316192?style=for-the-badge&logo=postgresql&logoColor=white" alt=""/>
<img src="https://img.shields.io/badge/Redis-%23DD0031.svg?&style=for-the-badge&logo=redis&logoColor=white" alt=""/>
<img src="https://img.shields.io/badge/Docker-008FCC?style=for-the-badge&logo=docker&logoColor=white" alt=""/>
Expand Down
3 changes: 0 additions & 3 deletions core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from .celery import app as celery_app

__all__ = ('celery_app',)
29 changes: 0 additions & 29 deletions core/celery/__init__.py

This file was deleted.

72 changes: 49 additions & 23 deletions core/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
BASE_DIR = Path(__file__).resolve().parent.parent.parent

env = Env()
env.read_env(BASE_DIR / 'env/backend.env')
env.read_env()

# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
Expand Down Expand Up @@ -59,8 +59,7 @@
'allauth.socialaccount',
'dj_rest_auth.registration',

'django_celery_results',
'django_celery_beat',
'django_q',

'friendship',

Expand All @@ -69,6 +68,7 @@
'accounts',
'links',
'tracks',
# 'emails',
]

MIDDLEWARE = [
Expand Down Expand Up @@ -250,7 +250,7 @@
}

# Authentication backends
# https://django-allauth.readthedocs.io/en/latest/configuration.html
# https://docs.allauth.org/en/latest/account/configuration.html

AUTHENTICATION_BACKENDS = [
'django.contrib.auth.backends.ModelBackend',
Expand Down Expand Up @@ -307,11 +307,12 @@
SPECTACULAR_SETTINGS = {
'TITLE': 'VibeLink API',
'DESCRIPTION': 'VibeLink REST API provided by Alschn',
'VERSION': '1.0.0',
'VERSION': '0.1.0',
'CONTACT': {
'name': 'Alschn',
'url': 'https://github.com/Alschn/',
},
'SCHEMA_PATH_PREFIX': '/api/',
'SERVE_PERMISSIONS': ['rest_framework.permissions.AllowAny'],
}

Expand All @@ -326,31 +327,56 @@

CORS_ORIGIN_REGEX_WHITELIST = env.list('CORS_ORIGIN_REGEX_WHITELIST', default=[])

# Redis settings
# Django Q configuration
# https://django-q2.readthedocs.io/en/master/configure.html

REDIS_HOST = env('REDIS_HOST', default='redis_db')
REDIS_PORT = env.int('REDIS_PORT', default=6379)

# default values taken from the documentation
Q_CLUSTER_NAME = env('Q_CLUSTER_NAME', default='vibelink')
Q_CLUSTER_WORKERS = env.int('Q_CLUSTER_WORKERS', default=4)
Q_CLUSTER_DAEMONIZE_WORKERS = env.bool('Q_CLUSTER_DAEMONIZE_WORKERS', default=False)
Q_CLUSTER_RECYCLE = env.int('Q_CLUSTER_RECYCLE', default=500)
Q_CLUSTER_TIMEOUT = env.int('Q_CLUSTER_TIMEOUT', default=60)
Q_CLUSTER_COMPRESS = env.bool('Q_CLUSTER_COMPRESS', default=True)
Q_CLUSTER_MAX_ATTEMPTS = env.int('Q_CLUSTER_MAX_ATTEMPTS', default=0)
Q_CLUSTER_RETRY = env.int('Q_CLUSTER_RETRY', default=60)
Q_CLUSTER_SAVE_LIMIT = env.int('Q_CLUSTER_SAVE_LIMIT', default=250)
Q_CLUSTER_QUEUE_LIMIT = env.int('Q_CLUSTER_QUEUE_LIMIT', default=500)
Q_CLUSTER_CPU_AFFINITY = env.int('Q_CLUSTER_CPU_AFFINITY', default=1)
Q_CLUSTER_SYNC = False
Q_CLUSTER_CATCH_UP = True

Q_CLUSTER = {
'name': Q_CLUSTER_NAME,
'workers': Q_CLUSTER_WORKERS,
'daemonize_workers': Q_CLUSTER_DAEMONIZE_WORKERS,
'recycle': Q_CLUSTER_RECYCLE,
'timeout': Q_CLUSTER_TIMEOUT,
'compress': Q_CLUSTER_COMPRESS,
'max_attempts': Q_CLUSTER_MAX_ATTEMPTS,
'retry': Q_CLUSTER_RETRY,
'save_limit': Q_CLUSTER_SAVE_LIMIT,
'queue_limit': Q_CLUSTER_QUEUE_LIMIT,
'cpu_affinity': Q_CLUSTER_CPU_AFFINITY,
'sync': Q_CLUSTER_SYNC,
'catch_up': Q_CLUSTER_CATCH_UP,
'label': 'Django Q2',
'redis': {
'host': REDIS_HOST,
'port': REDIS_PORT,
'db': 0,
}
}

# Django cache settings
# https://docs.djangoproject.com/en/4.2/topics/cache/

REDIS_HOST = env('REDIS_HOST', default=None)

REDIS_PORT = env('REDIS_PORT', cast=int, default=None)

REDIS_URL = env('REDIS_URL', default=None)

CELERY_TIMEZONE = TIME_ZONE

CELERY_BROKER_URL = REDIS_URL or f'redis://{REDIS_HOST}:{REDIS_PORT}'

CELERY_RESULT_BACKEND = 'django-db'

CELERY_CACHE_BACKEND = 'default'

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': CELERY_BROKER_URL,
'LOCATION': f'redis://{REDIS_HOST}:{REDIS_PORT}',
},
}

Expand Down
66 changes: 66 additions & 0 deletions core/shared/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import inspect
from datetime import timedelta, datetime
from typing import Any
from typing import Callable

from django.db import connection
from django.utils import timezone
from django_q.models import Schedule
from django_q.tasks import async_task


def get_function_module_path(func: Callable) -> str:
"""
Expand All @@ -18,3 +25,62 @@ def get_function_module_path(func: Callable) -> str:
"""

return f"{inspect.getmodule(func).__name__}.{func.__name__}"


def create_async_task(func: Callable, *args: Any, **kwargs: Any):
"""
Wrapper around `django_q.tasks.async_task` function
that automatically resolves a path to the given callable.
"""

return async_task(
get_function_module_path(func),
*args,
**kwargs
)


def assure_scheduled(
schedule_func: Callable[..., Schedule],
name: str,
recreate: bool = False,
*args,
**kwargs
) -> Schedule | None:
"""
Checks if a schedule with given name exists and if not, creates it.
If it exists, then it is deleted and recreated to ensure its details are up-to-date.
"""
tables = connection.introspection.table_names()

if Schedule._meta.db_table not in tables:
print(
f'Schedule table not in database, "{schedule_func.__name__}" not scheduled, '
f'please run migrations and restart the app'
)
return

queryset = Schedule.objects.filter(name=name)

if queryset.exists():
if recreate:
print(f"Schedule with name `{name}` already exists. Recreating...")
queryset.delete()
return schedule_func(*args, **kwargs, name=name)

print(f"Schedule with name `{name}` already exists. Skipping...")
return queryset.first()

print(f"Creating schedule with name `{name}`...")
return schedule_func(*args, **kwargs, name=name)


def get_daily_schedule_next_run_time(
*, hour: int, minute: int,
second: int = 0, microsecond: int = 0
) -> datetime:
now = timezone.now()
return now.replace(
hour=hour, minute=minute,
second=second, microsecond=microsecond
) + timedelta(days=1)
38 changes: 3 additions & 35 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,17 @@ services:
- "8000:8000"
command: python manage.py runserver 0.0.0.0:8000

celery_worker:
container_name: vibe-link-celery-worker
django_q:
container_name: vibe-link-django_q
build: .
env_file:
- env/backend.env
stdin_open: true
tty: true
depends_on:
- postgres_db
- redis_db
volumes:
- .:/app/
command: celery -A core.celery.app worker -l INFO

celery_beat:
container_name: vibe-link-celery-beat
build: .
env_file:
- env/backend.env
stdin_open: true
tty: true
depends_on:
- postgres_db
- redis_db
volumes:
- .:/app/
command: celery -A core.celery.app beat -l INFO

flower:
container_name: vibe-link-flower
build: .
env_file:
- env/backend.env
stdin_open: true
tty: true
depends_on:
- postgres_db
- redis_db
volumes:
- .:/app/
ports:
- "5555:5555"
command: celery -A core.celery.app flower -l INFO
command: python manage.py qcluster

volumes:
postgres-data:
Expand Down
9 changes: 9 additions & 0 deletions links/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@
class LinksConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'links'

def ready(self) -> None:
from core.shared.tasks import assure_scheduled
from links.queue.schedules import schedule_daily_generate_link_requests

assure_scheduled(
schedule_daily_generate_link_requests,
name='schedule_daily_generate_link_requests'
)
7 changes: 4 additions & 3 deletions links/emails.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Iterable
import typing

from accounts.models import User
if typing.TYPE_CHECKING:
from accounts.models import User


def send_link_requests_email_notifications(users: Iterable[User]) -> None:
def send_link_requests_email_notifications(users: typing.Iterable['User']) -> None:
"""Send email notifications to users about newly created link requests."""

# todo: implement
37 changes: 21 additions & 16 deletions links/queue/schedules.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
import random

from celery import shared_task
from django.utils import timezone
from django_q.models import Schedule
from django_q.tasks import schedule

from core.shared.tasks import get_function_module_path
from core.shared.tasks import (
get_function_module_path,
get_daily_schedule_next_run_time
)
from links.queue.tasks import generate_link_requests


@shared_task
def schedule_generate_link_requests() -> str:
from django_celery_beat.models import PeriodicTask, ClockedSchedule
def schedule_daily_generate_link_requests(*args, **kwargs) -> Schedule:
return schedule(
get_function_module_path(schedule_generate_link_requests),
*args,
schedule_type=Schedule.DAILY,
repeats=-1,
next_run=get_daily_schedule_next_run_time(hour=0, minute=0),
**kwargs,
)


def schedule_generate_link_requests() -> Schedule:
now = timezone.now()
end_of_today = now.replace(hour=23, minute=59, second=59)

Expand All @@ -21,16 +33,9 @@ def schedule_generate_link_requests() -> str:

task_name = 'generate-link-requests-' + random_time.strftime('%Y-%m-%d-%H-%M-%S')

# create a new schedule required by periodic task
clocked, _ = ClockedSchedule.objects.get_or_create(clocked_time=random_time)

# schedule one-off task at random time
PeriodicTask.objects.create(
return schedule(
get_function_module_path(generate_link_requests),
name=task_name,
task=get_function_module_path(generate_link_requests),
one_off=True,
clocked=clocked,
start_time=random_time,
schedule_type=Schedule.ONCE,
next_run=random_time
)

return task_name
Loading

0 comments on commit 08fe1a9

Please sign in to comment.