# Celery worker setup

1) add <font color='goldenrod'>celery.py</font> to the main app and add these lines
<font color='steelblue'>
> from celery import Celery

> os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celerytst.settings")

> app.config_from_object("django.conf:settings", namespace="CELERY")

> app.autodiscover_tasks()
</font>
2) add this to setting.py  
<font color='steelblue'>
> CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0")
</font>
3) add <font color='goldenrod'>tasks.py</font> to your app and use <font color='goldenrod'>@shared_task</font> decorator to define it as a task   

# Independed Celery worker

1) make a new directory next to yor project dir 
2) add a <font color='goldenrod'>celerytask.py</font> and <font color='goldenrod'>celeryconfig.py</font> to it
3) add <font color='goldenrod'>Dockerfile</font> and <font color='goldenrod'>requirement.txt</font> if you want to build it seperately
4) in <font color='goldenrod'>celerytask.py</font> make a new celery instance and choose celeryconfig as a setting file 
<font color='steelblue'>
    > app = Celery('mytask')
    
    > app.config_from_object('celeryconfig')
    
    > @app.task

    > def your_task():
    >   return
</font>

5) add congifs to the celeryconfig.py
<font color='steelblue'>
    > broker_url = 'redis://redis:6379/0'
    > result_backend = 'redis://redis:6379/0'
</font>

6) add a service using this directory to the docker compose file

## Celery settings

In [None]:
'''-------------- celery@44b015f2d5cf v5.5.1 (immunity)
 --- ***** ----- 
 -- ******* ---- Linux-6.10.14-linuxkit-x86_64-with-glibc2.36 2025-04-12 06:45:38
 --*** --- * --- 
 --** ---------- [config]
 --** ---------- .> app:         mytask:0x7f0ddfae1c00
 --** ---------- .> transport:   redis://redis:6379/0   -----------------------------------> these two lines must be redis
 --** ---------- .> results:     redis://redis:6379/0   -----------------------------------> these two lines must be redis
 --*** --- * --- .> concurrency: 16 (prefork)
 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
 --- ***** ----- 
  -------------- [queues]
                 .> celery           exchange=celery(direct) key=celery
                 
 
 [tasks]
   . celerytask.your_task  ----------------------------> here your detected tasks will be shown                     '''

# Workers routing

we can route our tasks to diffrent workers and routing could be based on load balance or task types

passing to queue:

1) we use <font color='goldenrod'>-Q <queuename> flag</font> to set a worker to watch a individuaqueue  <font color='goldenrod'>celery -A <#filename> worker --loglevel=info -Qqueue2</font> in order to pass a task to a spicifiedqueuee this command in celery.py 
    <font color='steelblue'>
    >app.conf.task_routes = {'newapp.tasks.task1': {'queue':'queue1'}}
    
    >app.conf.task_routes = {'<#task location>': {'queue':'<queuename>'},}
    </font> 
3) if there is a independed worker we have to creat a <font color='goldenrod'>task.py</font> file and write tasks in there and import task using this command
    <font color='steelblue'>
    >app.conf.imports = ('newapp.tasks')

    >app.conf.imports = ('<#task location>')
    
    >app.autodiscover_tasks()
    </font>



# Tasks priority

in celery prioritazing could be done in two ways:
1) set priority to a task
2) set priority to a queue and put task in them based on your priority

 <font color='goldenrod'>Redis does not support first method </font>and we should use <font color='goldenrod'>RabitMQ </font>to do that

### set priority to a queue

1) first set multiple queus for worker in docker-compose 
<font color='steelblue'>
  > celery -A celerytst worker --loglevel=info -Q queue1,queue2,queue3 
</font>

remember that when passing -Q queus are sorted based on their priority meaning queue1 > queue2 > queue3
2) now set each task to a queue using app.conf.task_routes

 <font color='steelblue'>
app.conf.task_routes = {<br>
    'newapp.tasks.task1': {'queue':'queue1'}, <br>
    'newapp.tasks.task2': {'queue':'queue2'}, <br>
    'newapp.tasks.task3': {'queue':'queue3'}} <br>
    </font>
    
3) now task1 is prioritized on task2 and task2 on task3

# Task groups

we use groups to run <font color='goldenrod'>multiple tasks in parallel</font>
1) first we have to make a group out of tasks with this command
<font color='steelblue'>
> from celery import group

> task_group = group(task1.s(), task2.s(),task3.s())
</font>

2) to execute the task group use
<font color='steelblue'>
> task_group.apply_async()
</font>

3) now task1,2,3 are executed together


# Task chains

we use chains to <font color='goldenrod'>save a sequel between multiple tasks</font> when a task is dependent to another task
1) first we make a chain
<font color='steelblue'>
> from celery import chain
> task_chain = chain(task1.s(), task2.s(),task3.s())
</font>

2) to execute the task group use
<font color='steelblue'>
> task_chain.apply_async()
</font>

3) now task1,2,3 are executed in order

# Task rate limit

set a rate limit for a task that manages how many times a task could be executed in a minute 
1) one way is to use it on @shared_task decorator
<font color='steelblue'>
> @shared_task(task_rate_limit= '5/m')
</font>

2) other way is to use this command in celery.py file
<font color='steelblue'>
> app = Celery('proj_name')
> app.conf.task_default_rate_limit = '5/m'
</font>

# RabitMQ

1) in order to work with RabbitMQ we have to install <font color='goldenrod'>pika</font> library
<font color='steelblue'>
> pip install pika
</font>

2) create a service in docker compose with <font color='goldenrod'>management</font> tag for image (gives us interface for rabbitmq)
3) expose ports <font color='goldenrod'>5672</font> for ramitmq and <font color='goldenrod'>15672</font> for its interface
4) add these commands to celery.py <font color='goldenrod'>tasks</font> is the name of the queue we are using
<font color='steelblue'>
> from kombu import Exchange, Queue

> app.conf.task_queues = [Queue('tasks', Exchange('tasks'), routing_key='tasks',queue_arguments={'x-max-priority': 10}),]
</font>

5) and these command to manage the prioritis
<font color='steelblue'>

> app.conf.task_acks_late = True

> app.conf.task_queue_max_priority = 10

> app.conf.task_default_priority = 5

> app.conf.worker_prefetch_multiplier = 1 

> app.conf.worker_concurrency = 1

</font>

6) add a task with
<font color='steelblue'>

> @app.task(queue='tasks')

> def task1():

>    return
</font>

7) change setting.py to use rabbitmq
<font color='steelblue'>

> CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672")
</font>

8) call a task in django container termainal and set its priority
<font color='steelblue'>

> task1.apply_async(priority=4)

> task2.apply_async(priority=9)
</font>

9) task2 will be executed before task1


# Tasks argument

1) we can pass both positional and key argument to the task by using this command

<font color='steelblue'>
> task1.apply_async(args=[arg1,arg2], kwargs={key1:val1, key2:val2})
</font>

2) and in the task function define args and kwargs as

<font color='steelblue'>
> @app.task(queue='tasks')<br>
> def task1(a, b, message=None):<br>
>   return f"{message} : {a+b}"<br>
</font>

3) it will return the result in celery terminal but if you want to see it in django terminal use

<font color='steelblue'>
> res = task1.apply_async(args=[arg1,arg2], kwargs={key1:val1, key2:val2})<br>
> print(res.get())
</font>

### Async results

there are many functions that help us find a task status in django (we saved the task as res variable): <br><br>
<font color='goldenrod'>res.isCompleted():</font>   ---------->   shows wheater a task is still runing or not <br>
<font color='goldenrod'>res.ready():</font>  ---------->   shows if the task was recived by celery or not <br>
<font color='goldenrod'>res.isSuccessful():</font>  ---------->   returns true if the task was executed successfully <br>
<font color='goldenrod'>res.get():</font>  ---------->   stop the threads and wait for the output to arrives <br>
<font color='goldenrod'>res.getResult():</font>  ---------->   takes output if the task was successful <br>
<font color='goldenrod'>res.getException():</font>  ---------->   return any exceptions along the way <br>




# Flower

flower is an interface for monitoring workers and tasks
1) first add a new service to docker-compose named flower
2) use port 5555
3) pass rabbitmq as BROKER_URL environment variable
<font color='steelblue'>
<br>
flower:<br>
  container_name: flower<br>
  image: mher/flower<br>
  ports:<br>
    - 5555:5555<br>
  environment:<br>
    - "CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672"<br>
</font> 

4) open localhost on port 5555