In [2]:
!pip install "apache-airflow==2.0.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.0.1/constraints-3.6.txt"

Collecting apache-airflow==2.0.1
[?25l  Downloading https://files.pythonhosted.org/packages/4d/be/7228d4505133bdfedaab0afedb272463190c214baea43fc15be5e3accaae/apache_airflow-2.0.1-py3-none-any.whl (4.5MB)
[K     |████████████████████████████████| 4.5MB 5.8MB/s 
[?25hCollecting python-daemon==2.2.4
  Downloading https://files.pythonhosted.org/packages/5a/0c/57f15b1572661877ff1acbe66c2f5be9d999ae5fb128e22933d374f62aa1/python_daemon-2.2.4-py2.py3-none-any.whl
Collecting Flask-Caching==1.9.0
  Downloading https://files.pythonhosted.org/packages/d1/9f/135bcf47fdb585dffcf9f918664ab9d63585aae8e722948b2abca041312f/Flask_Caching-1.9.0-py2.py3-none-any.whl
Collecting cached-property==1.5.2
  Downloading https://files.pythonhosted.org/packages/48/19/f2090f7dad41e225c7f2326e4cfe6fff49e57dedb5b53636c9551f86b069/cached_property-1.5.2-py2.py3-none-any.whl
Collecting jsonschema==3.2.0
[?25l  Downloading https://files.pythonhosted.org/packages/c5/8f/51e89ce52a085483359217bc72cdbf6e75ee595d5b1d4b5ad

Criando a DAG

In [6]:
from datetime import datetime, timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

In [7]:
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    # Start on 27th of June, 2020
    'start_date': datetime(2020, 6, 27),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    # In case of errors, do one retry
    'retries': 1,
    # Do the retry with 30 seconds delay after the error
    'retry_delay': timedelta(seconds=30),
    # Run once every 15 minutes
    'schedule_interval': '*/15 * * * *'
}
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

[<Task(BashOperator): sleep>, <Task(BashOperator): templated>]

In [8]:
# initialize the database tables
!airflow db init

# print the list of active DAGs
!airflow dags list

# prints the list of tasks in the "tutorial" DAG
!airflow tasks list tutorial

# prints the hierarchy of tasks in the "tutorial" DAG
!airflow tasks list tutorial --tree

DB: sqlite:////root/airflow/airflow.db
[[34m2021-03-17 18:54:32,462[0m] {[34mdb.py:[0m674} INFO[0m - Creating tables[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migrati

In [9]:
!airflow tasks test tutorial print_date 2015-06-01


[[34m2021-03-17 18:54:55,238[0m] {[34mdagbag.py:[0m448} INFO[0m - Filling up the DagBag from [01m/root/airflow/dags[22m[0m
[[34m2021-03-17 18:54:55,303[0m] {[34mtaskinstance.py:[0m851} INFO[0m - Dependencies all met for [01m<TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>[22m[0m
[[34m2021-03-17 18:54:55,307[0m] {[34mtaskinstance.py:[0m851} INFO[0m - Dependencies all met for [01m<TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>[22m[0m
[[34m2021-03-17 18:54:55,307[0m] {[34mtaskinstance.py:[0m1042} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2021-03-17 18:54:55,307[0m] {[34mtaskinstance.py:[0m1043} INFO[0m - Starting attempt 1 of 2[0m
[[34m2021-03-17 18:54:55,307[0m] {[34mtaskinstance.py:[0m1044} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2021-03-17 18:54:55,307[0m] {[34mtaskinstance.py:[0m1063

In [10]:
!airflow tasks test tutorial sleep 2015-06-01

[[34m2021-03-17 18:55:08,268[0m] {[34mdagbag.py:[0m448} INFO[0m - Filling up the DagBag from [01m/root/airflow/dags[22m[0m
[[34m2021-03-17 18:55:08,332[0m] {[34mtaskinstance.py:[0m851} INFO[0m - Dependencies all met for [01m<TaskInstance: tutorial.sleep 2015-06-01T00:00:00+00:00 [None]>[22m[0m
[[34m2021-03-17 18:55:08,336[0m] {[34mtaskinstance.py:[0m851} INFO[0m - Dependencies all met for [01m<TaskInstance: tutorial.sleep 2015-06-01T00:00:00+00:00 [None]>[22m[0m
[[34m2021-03-17 18:55:08,336[0m] {[34mtaskinstance.py:[0m1042} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2021-03-17 18:55:08,336[0m] {[34mtaskinstance.py:[0m1043} INFO[0m - Starting attempt 1 of 4[0m
[[34m2021-03-17 18:55:08,336[0m] {[34mtaskinstance.py:[0m1044} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2021-03-17 18:55:08,337[0m] {[34mtaskinstance.py:[0m1063} INFO[0m

In [11]:
!airflow tasks test tutorial templated 2015-06-01


[[34m2021-03-17 18:55:19,043[0m] {[34mdagbag.py:[0m448} INFO[0m - Filling up the DagBag from [01m/root/airflow/dags[22m[0m
[[34m2021-03-17 18:55:19,109[0m] {[34mtaskinstance.py:[0m851} INFO[0m - Dependencies all met for [01m<TaskInstance: tutorial.templated 2015-06-01T00:00:00+00:00 [None]>[22m[0m
[[34m2021-03-17 18:55:19,113[0m] {[34mtaskinstance.py:[0m851} INFO[0m - Dependencies all met for [01m<TaskInstance: tutorial.templated 2015-06-01T00:00:00+00:00 [None]>[22m[0m
[[34m2021-03-17 18:55:19,113[0m] {[34mtaskinstance.py:[0m1042} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2021-03-17 18:55:19,113[0m] {[34mtaskinstance.py:[0m1043} INFO[0m - Starting attempt 1 of 2[0m
[[34m2021-03-17 18:55:19,113[0m] {[34mtaskinstance.py:[0m1044} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2021-03-17 18:55:19,113[0m] {[34mtaskinstance.py:[0m1063} 

In [14]:
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    # Start on 27th of June, 2020
    'start_date': datetime(2020, 6, 27),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    # In case of errors, do one retry
    'retries': 1,
    # Do the retry with 30 seconds delay after the error
    'retry_delay': timedelta(seconds=30),
    # Run once every 15 minutes
    'schedule_interval': '*/15 * * * *'
}
with DAG(    dag_id='simple_bash_dag',
    default_args=default_args,
    schedule_interval=None,
    tags=['my_dags'],
) as dag:    #Here we define our first task
    t1 = BashOperator(bash_command="touch ~/my_bash_file.txt", task_id="create_file")    #Here we define our second task
    t2 = BashOperator(bash_command="mv ~/my_bash_file.txt ~/my_bash_file_changed.txt", task_id="change_file_name")    # Configure T2 to be dependent on T1’s execution
    t1 >> t2

In [15]:
!airflow scheduler


[[34m2021-03-17 18:57:44,171[0m] {[34mdagbag.py:[0m448} INFO[0m - Filling up the DagBag from [01m/root/airflow/dags[22m[0m
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 374, in task_test
    task = dag.get_task(task_id=args.task_id)
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/dag.py", line 1532, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task create_file not found
