Skip to content

subdag content is not executed by trigger_dag #739

@thibault-ketterer

Description

@thibault-ketterer

HI, I think I got a problem with triggered dag containing a subdag
I am using airflow 1.6.1
python3 / celery / mysql

when I trigger my dag like this
airflow trigger_dag all_reload
the subdag within 'all_reload' does not get scheduled.

the sceduler give me some

[celery] queuing ('all_reload', 'test_reload_data', datetime.datetime(2015, 12, 7, 18, 24, 20)) through celery, queue=default
[...]
2015-12-07 18:45:31,052 - root - INFO - Loaded DAG <DAG: all_reload.test_reload_data> 
2015-12-07 18:45:31,052 - root - INFO - Loaded DAG <DAG: all_reload> 
2015-12-07 18:45:31,069 - root - INFO - Checking state for <DagRun all_reload @ 2015-12-07 18:24:20: None, externally triggered: True> 
2015-12-07 18:45:31,074 - root - INFO - Marking run <DagRun all_reload @ 2015-12-07 18:24:20: None, externally triggered: True> successful 

The subdag is marked has scheduled but nothing happens and the scheduler mark the job as successfull.

I have other subdags on the same config (daily scheduled) and everything is running fine.

Here is an example dag definition to reproduce the problem:

from datetime import datetime, timedelta
import time

from airflow import DAG
from airflow.operators import DummyOperator, SubDagOperator, PythonOperator

def sleep_2():
    print("sleep 2")
    time.sleep(2)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now() - timedelta(days=1),
    'email': ['a@aaa.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_interval': timedelta(minutes=1),
}


def build_reload():

    main_dag = DAG('all_reload', default_args=default_args, schedule_interval=None)

    global_reload = DummyOperator(
        task_id='global_reload',
        dag=main_dag
        )

    subname = 'test_reload_data'

    task = PythonOperator(
        task_id='sleep_for_2',
        python_callable=sleep_2,
        dag=main_dag)
    task.set_upstream(global_reload)

    dag2 = DAG('all_reload.' + subname, default_args=default_args, schedule_interval=None)

    PythonOperator(
        task_id='sleep_for_2_sub',
        python_callable=sleep_2,
        dag=dag2)

    update_ui = SubDagOperator(
        task_id=subname,
        dag=main_dag,
        subdag=dag2,
        )
    update_ui.set_upstream(task)

    return main_dag

main_dag = build_reload()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions