# Airflow

Airflow는 파이프라인을 DAG(Directed Acyclic Graph)로 정의하고,
파이프라인에서 수행해야 할 각 단계 작업을 Task라고 부른다.

```Bash
pip install apache-airflow
airflow db init  # 기본은 SQLite로, 실제론 Postgresql 등을 사용할 것.
airflow scheduler  # 작업 종속성을 설정
airflow webserver -p 8081  # 작업 시작, 중지, 모니터링 가능한 서버 열기
```

Task operator: shell이나 파이썬 인터프리터 중 하나에게 정의된 작업 수행을 명할 수 있다.

이 노트북을 파이썬 스크립트로 전환 후, airflow_pipeline.py(가칭)을 AIRFLOW_HOME/dags 아래
저장한다. 그 후 스크립트를 실행하면 `$ airflow dags list`에서 등록된 이 그래프를 볼 수 있다.
출력되는 그래프명은 `DAG.dag_id`에 지정한 것이다.


In [1]:
from airflow import DAG
# Task operator 정의: `.bash import BashOperator`도 가능.
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

In [2]:
project_cfg = {
    "owner": "airflow",
    "email": ["hukanj@gmail.com"],
    "email_on_failure": True,
    "start_date": datetime.today(),
    "retries": 1,
    "retry_delay": timedelta(hours=1)}

In [3]:
dag = DAG(dag_id="basic_pipeline",
          default_args=project_cfg,
          schedule_interval=timedelta(days=1))

In [4]:
# 파이썬 작업 연산자에게는 작업(task)이란 파이썬 함수 형태다.
def example_task(_id, **kwargs):
    print(f"Task {_id} started.")
    return f"Completed task {_id}"

In [5]:
task1 = PythonOperator(
    task_id="task1",
    python_callable=example_task,  # 정의한 태스크
    op_kwargs={"_id": 1},
    dag=dag)  # 이 태스크를 파넣을 그래프 지정.

task2 = PythonOperator(
    task_id="task2",
    python_callable=example_task,
    op_kwargs={"_id": 2},
    dag=dag)

## Task dependency
Task1 -> Task2 -> Task3 -> ... 식으로 선-후행 작업 관계를 정의할 수 있다.
이 때 선행 작업은 upstream, 후행 작업은 downstream으로 칭한다.

In [6]:
# 방법 1: T1이 upstream, T2가 downstream(T1 -> T2)
task1.set_downstream(task2)

In [7]:
# 방법 2: 동일하게 T1이 upstream, T2가 downstream
task1 >> task2



<Task(PythonOperator): task2>