- 
          
 - 
                Notifications
    
You must be signed in to change notification settings  - Fork 4.9k
 
Closed
Labels
Description
Checklist
celery version:   4.1.0 (latentcall)
python versioin:   Python 3.6.0
OS version:   Darwin iDocker 17.3.0 Darwin Kernel Version 17.3.0: Thu Nov  9 18:09:22 PST 2017; root:xnu-4570.31.3~1/RELEASE_X86_64 x86_64
Steps to reproduce
# Two tasks
@celery.task
def get_host_info(name=None, instance_id=None):
    host_info = {'name': name, 'instance_id': instance_id,
                 'cpu_cores': 4}
    task_logger.info('Fetching host info ...')
    return host_info
@celery.task
def get_db_info_based_on_host(kwargs):
    name = kwargs.get('name')
    instance_id = kwargs.get('instance_id')
    cpu_cores = kwargs.get('cpu_cores')
    db_info = {'name': name, 'instance_id': instance_id,
               'cpu_cores': cpu_cores, 'db_version': '5.6'
    task_logger.info('Fetching db info ...')
    return db_info
# make a chain of two tasks above, and call it with  kwargs
ret = chain(get_host_info.s(), get_db_info_based_on_host.s()).apply_async(kwargs=dict(name='demo_name', instance_id='aaabb'))
ret.get()
# And I got the result as below
#{
#    "cpu_cores": 4, 
#    "instance_id": null, 
#    "db_version": "5.6", 
#    "name": null
#}
Here is the worker's output.
[2018-04-05 23:12:30,534: INFO/MainProcess] Received task: cmdb_worker.tasks.demo_tasks.get_host_info[d3b14906-b538-4670-b00e-1bcabb2e8fd4]  
[2018-04-05 23:12:30,540: INFO/ForkPoolWorker-2] cmdb_worker.tasks.demo_tasks.get_host_info[d3b14906-b538-4670-b00e-1bcabb2e8fd4]: Fetching host info ...
[2018-04-05 23:12:30,603: INFO/MainProcess] Received task: cmdb_worker.tasks.demo_tasks.get_db_info_based_on_host[f2007c3f-424f-445d-a0b9-38440bb16e67]  
[2018-04-05 23:12:30,606: INFO/ForkPoolWorker-3] cmdb_worker.tasks.demo_tasks.get_db_info[f2007c3f-424f-445d-a0b9-38440bb16e67]: Fetching db info ...
[2018-04-05 23:12:30,609: INFO/ForkPoolWorker-2] Task cmdb_worker.tasks.demo_tasks.get_host_info[d3b14906-b538-4670-b00e-1bcabb2e8fd4] succeeded in 0.06943560403306037s: {'name': None, 'instance_id': None, 'cpu_cores': 4}
[2018-04-05 23:12:30,614: INFO/ForkPoolWorker-3] Task cmdb_worker.tasks.demo_tasks.get_db_info_based_on_host[f2007c3f-424f-445d-a0b9-38440bb16e67] succeeded in 0.0084876399487257s: {'name': None, 'instance_id': None, 'cpu_cores': 4, 'db_version': '5.6'}
Expected behavior
When called apply_async with kwargs, chain should pass it to the first task in the chain.
It works when passing kwargs to task.s() directly. But it doesn't seem to be a good way. (Any suggestions ?)
ret = chain(get_host_info.s(dict(name='demo_name', instance_id='aaabb')), get_db_info_based_on_host.s()).apply_async()Actual behavior
The first task got an empty kwargs even a kwargs was passed in chain.apply_async()
Is it a better way to passing kwargs to a task in a chain ?