Skip to content

Commit

Permalink
[AIRFLOW-7066] Use sphinx syntax in concepts.rst (#7729)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiajie committed Mar 15, 2020
1 parent 271ee64 commit 0740daf
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 75 deletions.
3 changes: 3 additions & 0 deletions airflow/example_dags/example_latest_only_with_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"""
Example LatestOnlyOperator and TriggerRule interactions
"""

# [START example]
import datetime as dt

from airflow import DAG
Expand All @@ -41,3 +43,4 @@

latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
# [END example]
2 changes: 2 additions & 0 deletions airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

"""Example DAG demonstrating the usage of the SubDagOperator."""

# [START example_subdag_operator]
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
Expand Down Expand Up @@ -66,3 +67,4 @@
)

start >> section_1 >> some_other_task >> section_2 >> end
# [END example_subdag_operator]
2 changes: 2 additions & 0 deletions airflow/example_dags/subdags/subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

"""Helper function to generate a DAG and operators given some arguments."""

# [START subdag]
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

Expand Down Expand Up @@ -46,3 +47,4 @@ def subdag(parent_dag_name, child_dag_name, args):
)

return dag_subdag
# [END subdag]
87 changes: 12 additions & 75 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -784,54 +784,17 @@ Note that SubDAG operators should contain a factory method that returns a DAG
object. This will prevent the SubDAG from being treated like a separate DAG in
the main UI. For example:

.. code:: python
#dags/subdag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
dummy_operator = DummyOperator(
task_id='dummy_task',
dag=dag,
)
return dag
.. exampleinclude:: ../airflow/example_dags/subdags/subdag.py
:language: python
:start-after: [START subdag]
:end-before: [END subdag]

This SubDAG can then be referenced in your main DAG file:

.. code:: python
# main_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag
PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'
main_dag = DAG(
dag_id=PARENT_DAG_NAME,
schedule_interval=timedelta(hours=1),
start_date=datetime(2016, 1, 1)
)
sub_dag = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
main_dag.schedule_interval),
task_id=CHILD_DAG_NAME,
dag=main_dag,
)
.. exampleinclude:: ../airflow/example_dags/example_subdag_operator.py
:language: python
:start-after: [START example_subdag_operator]
:end-before: [END example_subdag_operator]

You can zoom into a SubDagOperator from the graph view of the main DAG to show
the tasks contained within the SubDAG:
Expand Down Expand Up @@ -997,36 +960,10 @@ right now is not between its ``execution_time`` and the next scheduled

For example, consider the following DAG:

.. code:: python
#dags/latest_only_with_trigger.py
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=1),
start_date=dt.datetime(2019, 2, 28),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task3.set_upstream([task1, task2])
task4 = DummyOperator(task_id='task4', dag=dag,
trigger_rule=TriggerRule.ALL_DONE)
task4.set_upstream([task1, task2])
.. exampleinclude:: ../airflow/example_dags/example_latest_only_with_trigger.py
:language: python
:start-after: [START example]
:end-before: [END example]

In the case of this DAG, the task ``task1`` is directly downstream of
``latest_only`` and will be skipped for all runs except the latest.
Expand Down

0 comments on commit 0740daf

Please sign in to comment.