 ## DAG creation

In [None]:
pip install apache-airflow==1.10.10 

In [5]:
from airflow import DAG
from airflow.utils.dates import days_ago
import logging

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

DEFAULT_ARGS = {
    'start_date': days_ago(2),
    'owner': 'ds',
    'poke_interval': 600
}


ModuleNotFoundError: No module named 'airflow'

In [3]:
# 1st way:
''' Mist easiest way. Creation of Dag-class variable 
 In this case every task should be linked to DAG. And inside every task it needs to be point (mention) to DAG.'''
dag = DAG("dina_simple_dag_v2",
          schedule_interval='@daily',
          default_args=DEFAULT_ARGS,
          max_active_runs=1,
          tags=['karpov']
          )
wait_until_6am = TimeDeltaSensor(
    task_id='wait_until_6am',
    delta=timedelta(seconds=6 * 60 * 60),
    dag=dag
) 

NameError: name 'DAG' is not defined

In [2]:
# 2d way
'''Similar to 1st way but should be done via "context manager" (with... as) 
There is no need to mention DAG inside every task, it's assigned automatically. '''
with DAG(
    dag_id='dina_simple_dag',
    schedule_interval='@daily',
    default_args=DEFAULT_ARGS,
    max_active_runs=1,
    tags=['karpov']
) as dag:

    wait_until_6am = TimeDeltaSensor(
        task_id='wait_until_6am',
        delta=timedelta(seconds=6 * 60 * 60)
    )

NameError: name 'DAG' is not defined

In [1]:
#3d way
'''DAG создается с помощью декоратора @dag. Нам требуется создать функцию со списком тасков внутри 
и обернуть ее в декоратор. Таким образом мы получаем переменную класса DAG. 
Эту переменную мы объявляем в глобальной области видимости, с помощью чего Airflow понимает, 
что внутри скрипта находится DAG. 
'''
@dag(
    start_date=days_ago(12),
    dag_id='dina_simple_dag',
    schedule_interval='@daily',
    default_args=DEFAULT_ARGS,
    max_active_runs=1,
    tags=['karpov']
)
def generate_dag():
    wait_until_6am = TimeDeltaSensor(
        task_id='wait_until_6am',
        delta=timedelta(seconds=6 * 60 * 60)
    )
dag = generate_dag()


NameError: name 'dag' is not defined

**BashOperator** 

Выполняет заданный bash script

```python
echo_ds = BashOperator(
    task_id='echo_ds',
    bash_command='echo {{ ds }}', # выполняемый bash script (ds = execution_date)
    dag=dag
)
```

**BranchPythonOperator**

Это оператор ветвления. В результате выполнения возвращает название следующей задачи. В нашем случае возвращает `weekend` или `weekday` в зависимости от `execution_dt`.

```python
def select_day_func(**kwargs):
    execution_dt = kwargs['templates_dict']['execution_dt']
    exec_day = datetime.strptime(execution_dt, '%Y-%m-%d').weekday()
    return 'weekend' if exec_day in [5, 6] else 'weekday'

weekday_or_weekend = BranchPythonOperator(
    task_id='weekday_or_weekend',
    python_callable=select_day_func,
    templates_dict={'execution_dt': '{{ ds }}',
    dag=dag
)
```

**PythonOperator**

```python
def weekday_func():
    logging.info("It's weekday")

weekday = PythonOperator(
    task_id='weekday',
    python_callable=weekday_func, # ссылка на функцию, выполняемую в рамках таски
    dag=dag
)
```

**DummyOperator**

```python
eod = DummyOperator(
    task_id='eod',
    trigger_rule='one_success',
    dag=dag
)
```

Оператор запускается в зависимости от заданного trigger rule.

Trigger Rule:

- all_success (по умолчанию) - таск запустится в тот момент, когда все таски, от которых он зависит, выполнятся успешно
- all_failed - все таски провалились
- all_done - все таски пришли в конечное состояние (не run / queued / retry)
- one_failed - хотя бы один таск упал
- one_success - хотя бы один таск успешно отработал
- none_failed - ни одна задача не упала
- none_failed_or_skipped - ни одна задача не упала и не была пропущена
- none_skipped - ни одна задача не была пропущена
- dummy - таск ни от чего не зависит, может стартовать в любое время

### Составление пайплайнов

Громоздкий способ: для всех переменных, которые описывают таски, определяем последователей (downstream) и предшественников (upstream). Их можно задавать по одному или же сразу подать список из нескольких задач (как для eod).

```python
wait_until_6am.set_downstream(echo_ds)
echo_ds.set_downstream(weekday_or_weekend)
weekday_or_weekend.set_downstream(weekend, Label("It's weekend"))
weekday_or_weekend.set_downstream(weekday, Label("It's weekday"))
eod.set_upstream([weekend, weekday])
```

Наглядный и короткий способ. Здесь мы так же указываем лейблы (они маркируют ребра).

```python
wait_until_6am >> echo_ds >> weekday_or_weekend >> Label("It's weekday") >> weekday >> eod
weekday_or_weekend >> Label("It's weekend") >> weekend >> eod
```

Самый короткий способ (без лейблов).

```python
wait_until_6am >> echo_ds >> weekday_or_weekend >> [weekend, weekday] >> eod
```

### Документация

Документацию можно писать не только для самого DAG-а, но и для отдельных задач: