Skip to content

Commit

Permalink
[AIRFLOW-6316] Use sphinx render in tutorial.rst
Browse files Browse the repository at this point in the history
Recently we hard code in tutorial.rst which
is hard to maintain, such as `set_upstream`
is change to shift in tutorial.py but still
in tutorial.rst. Use sphinx is a better way
  • Loading branch information
zhongjiajie committed Dec 21, 2019
1 parent 65ef8f3 commit 38194ee
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 180 deletions.
36 changes: 27 additions & 9 deletions airflow/example_dags/tutorial.py
Expand Up @@ -22,12 +22,19 @@
Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html)
"""
# [START tutorial]
from datetime import timedelta

import airflow
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
Expand All @@ -53,38 +60,47 @@
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
# [END basic_task]

# [START documentation]
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# [END documentation]

dag.doc_md = __doc__

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag,
)

# [START jinja_template]
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
Expand All @@ -100,5 +116,7 @@
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
# [END jinja_template]

t1 >> [t2, t3]
# [END tutorial]
3 changes: 2 additions & 1 deletion docs/conf.py
Expand Up @@ -127,7 +127,8 @@
'sphinx.ext.intersphinx',
'autoapi.extension',
'exampleinclude',
'docroles'
'docroles',
'removemarktransform',
]

autodoc_default_options = {
Expand Down
209 changes: 39 additions & 170 deletions docs/tutorial.rst
Expand Up @@ -30,63 +30,10 @@ Example Pipeline definition
Here is an example of a basic pipeline definition. Do not worry if this looks
complicated, a line by line explanation follows below.

.. code:: python
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]

It's a DAG definition file
--------------------------
Expand All @@ -113,13 +60,10 @@ Importing Modules
An Airflow pipeline is just a Python script that happens to define an
Airflow DAG object. Let's start by importing the libraries we will need.

.. code:: python
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START import_module]
:end-before: [END import_module]

Default Arguments
-----------------
Expand All @@ -128,24 +72,10 @@ explicitly pass a set of arguments to each task's constructor
(which would become redundant), or (better!) we can define a dictionary
of default parameters that we can use when creating tasks.

.. code:: python
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START default_args]
:end-before: [END default_args]

For more information about the BaseOperator's parameters and what they do,
refer to the :py:class:`airflow.models.BaseOperator` documentation.
Expand All @@ -163,29 +93,21 @@ that defines the ``dag_id``, which serves as a unique identifier for your DAG.
We also pass the default argument dictionary that we just defined and
define a ``schedule_interval`` of 1 day for the DAG.

.. code:: python
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START instantiate_dag]
:end-before: [END instantiate_dag]

Tasks
-----
Tasks are generated when instantiating operator objects. An object
instantiated from an operator is called a constructor. The first argument
``task_id`` acts as a unique identifier for the task.

.. code:: python
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START basic_task]
:end-before: [END basic_task]

Notice how we pass a mix of operator specific arguments (``bash_command``) and
an argument common to all operators (``retries``) inherited
Expand Down Expand Up @@ -217,21 +139,10 @@ this feature exists, get you familiar with double curly brackets, and
point to the most common template variable: ``{{ ds }}`` (today's "date
stamp").

.. code:: python
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START jinja_template]
:end-before: [END jinja_template]

Notice that the ``templated_command`` contains code logic in ``{% %}`` blocks,
references parameters like ``{{ ds }}``, calls a function as in
Expand Down Expand Up @@ -264,6 +175,17 @@ regarding custom filters have a look at the
For more information on the variables and macros that can be referenced
in templates, make sure to read through the :doc:`macros-ref`

Adding DAG and Tasks documentation
----------------------------------
We can add documentation for DAG or each single task. DAG documentation only support
markdown so far and task documentation support plain text, markdown, reStructuredText,
json, yaml

.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START documentation]
:end-before: [END documentation]

Setting up Dependencies
-----------------------
We have tasks ``t1``, ``t2`` and ``t3`` that do not depend on each other. Here's a few ways
Expand Down Expand Up @@ -306,63 +228,10 @@ Recap
Alright, so we have a pretty basic DAG. At this point your code should look
something like this:

.. code:: python
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]

.. _testing:

Expand Down

0 comments on commit 38194ee

Please sign in to comment.