在之前,我们已经建立好环境。下面测试一下环境,发送一个计算平方根的任务。 定义任务模块tasks.py。在开始,导入必须的模块。
from math import sqrt
from celery import Celery
然后,创建Celery实例,代表客户端应用:
app = Celery('tasks', broker='redis://192.168.25.21:6379/0')
在初始化时我们传入了模块的名称和broker的地址。 然后,启动result backend,如下:
app.config.CELERY_RESULT_BACKEND = 'redis://192.168.25.21:6379/0'
用@app.tack装饰器定义任务:
@app.task
def square_root(value):
return sqrt(value)
到此,我们完成了tasks.py模块的定义,我们需要初始化服务端的workers。我们创建了一个单独的目录叫做8397_07_broker。拷贝tasks.py模块到这个目录,运行如下命令:
$celery –A tasks worker –-loglevel=INFO
上述命令初始化了Clery Server,—A代表Celery应用。下图是初始化的部分截图
现在,Celery Server等待接收任务并且发送给workers。 下一步就是在客户端创建应用调用tasks。
上述步骤不能忽略,因为下面会用在之前创建的东西。
在客户端机器,我们有celery_env虚拟环境,现在创建一个task_dispatcher.py模块很简单,如下步骤;
-
导入logging模块来显示程序执行信息,导入Celery模块:
import logging from celery import Celery
-
下一步是创建Celery实例,和服务端一样:
#logger configuration... app = Celery('tasks', broker='redis://192.168.25.21:6379/0') app.conf.CELERY_RESULT_BACKEND = 'redis://192.168.25.21:6397/0'
由于我们在接下的内容中要复用这个模块来实现任务的调用,下面我们创建一个方法来封装sqrt_task(value)的发送,我们将创建manage_sqrt_task(value)方法:
def manage_sqrt_task(value):
result = app.send_task('tasks.sqrt_task', args=(value,))
logging.info(result.get())
从上述代码我们发现客户端应用不需要知道服务端的实现。通过Celery类中的send_task方法,我们传入module.task格式的字符串和以元组的方式传入参数就可以调用一个任务。最后,我们看一看log中的结果。 在__main__中,我们调用了manage_sqrt_task(value)方法:
if __name__ == '__main__':
manage_sqrt_task(4)
下面的截图是执行task_dispatcher.py文件的结果:
在客户端,通过get()方法得到结果,这是通过send_task()返回的AsyncResult实例中的重要特征。结果如下图: