Skip to content

Latest commit

 

History

History
111 lines (99 loc) · 4.46 KB

1.md

File metadata and controls

111 lines (99 loc) · 4.46 KB

啥系统都得有任务

我们在开发过程中,难免会有异步任务、定时任务需求.
在python中一般使用celery来完成任务.

tag: 0.4.1

后端

首先安装celery依赖pip install celery.

涉及文件及目录

requirements.txt
rurality/celery.py
rurality/__init__.py
rurality/settings.py
account/tasks.py

代码

requirements.txt:
增加了celery的依赖,通过pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/.


rurality/celery.py:
这个文件的内容其实没有什么东西,就是一些常规的写法,主要是实例一个celery,配置去settings中读取以CELERY开头的内容.


rurality/init.py:
这里就更没啥了,就是在这里写上,以后可以方便引用celery.

from rurality import celery_app

就像上面这样引用.


rurality/settings.py:
这里主要增加了有关celery的配置.

# 有关celery配置
from kombu import Exchange, Queue

因为要使用优先级队列,所以这里先引用这两个玩意.

# celery configs
CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'amqp://guest:guest@localhost:5672//')
CELERY_TIMEZONE = 'Asia/Shanghai'
# 一次在broker中取几个任务
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

因为我这里要使用优先级队列,所以我使用了rabbitmq作为broker,rabbitmq我是直接使用docker在本地安装的.
同时时区配置成和django一样的时区.

# 默认不记录结果
# 需要记录结果的任务,则单独指定ignore_result=False
# @celery_app.task(bind=True, ignore_result=False)
# CELERY_RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', 'mongodb://localhost:27017/')
# CELERY_TASK_IGNORE_RESULT = True

因为我现在对于任务的结果不关心,所以我暂时不设置backend,以后如果有需求再打开.

CELERY_DEFAULT_QUEUE = 'default'
CELERY_TASK_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default', queue_arguments={'x-max-priority': 20}),
    Queue('low_priority', Exchange('low_priority'), routing_key='low_priority', queue_arguments={'x-max-priority': 10}),
    Queue('high_priority', Exchange('high_priority'), routing_key='high_priority', queue_arguments={'x-max-priority': 30}),
)
# 有关优先级队列启动问题
# celery  -A rurality worker -l info -n worker-hd1 -Q high_priority,default
# celery  -A rurality worker -l info -n worker-hd2 -Q high_priority,default
# celery  -A rurality worker -l info -n worker-hl1 -Q high_priority,low_priority
# 如果启动多个worker可以指定处理的队列,
# 示例中三个worker都可以处理高优先级队列,两个可以处理default队列,只有一个处理低优先级队列

CELERY_TASK_ROUTES = {
    'account.tasks.*': {'queue': 'high_priority'},
    '*': {"queue": 'default'},
}
# 除了上面可以不同任务指定不同队列外,在调用时也可以指定
# hello_task.apply_async(queue='low_priority')

设置celery队列,先设置一个默认队列是default,然后设置分别设置low_priority和high_priority队列.
为了充分体现不同优先级的地位,我们启动三个worker,每个worker设置好可以处理的队列.
最后就成了低优先级的只有一个worker处理,默认优先级的有两个worker处理,高优先级的有三个worker处理.
这样就充分体现了VIP的优势.
CELERY_TASK_ROUTES中可以给不同任务指定不同的队列,并且可以使用通配符.
在调用任务时,也可以使用.apply_async(queue='low_priority')来指定队列.

# 定时任务,此命令只需要在一台机器上运行
# celery -A rurality beat -l info
CELERY_BEAT_SCHEDULE = {
    'timer_hello_task': {
        'task': 'account.tasks.timer_hello_task',
        'schedule': 10.0,
    },
}

定时任务是通过celery的beat来完成的,这里写了一个示例,每10秒执行一次account.tasks.timer_hello_task.


account/tasks.py:
这里就写了两个demo任务,可以通过以下步骤来尝试.

  • 首先确保安装了rabbitmq,并且正确的配置了celery.
  • 执行celery -A rurality worker -l info -n worker-hd1 -Q high_priority,default
  • 执行celery -A rurality beat -l info
  • 进入python manage.py shell
  • 执行from account.tasks import hello_task
  • 执行hello_task.delay()

以上就可以看到任务执行了,并且也可以看到定时那个任务一直在执行.