# Установка AirFlow. Создание и основные параметры DAG. Web UI

Airflow можно развернуть по-разному: это могут быть как поднятые компоненты Airflow на одной машине вместе с запускаемыми воркерами, а может быть Airflow поднят, как сервис в pod'е кластера Kubernetes, и воркеры будут запускаться на отдельных pod'ах. Второй подход распространен в production среде и хорошо масштабируется, однако он более сложен в настройке и мы рассмотрим его в следующих уроках, а в рамках текущего развернем Airflow на своей локальной машине и напишем свой первый DAG.

Для начала вспомним архитектуру Airflow, чтобы понять, что нам предстоит запустить:

<img src="images/2/arch-diag-basic.png" style="width: 800px;"> 

Apache Airflow состоит из нескольких основных частей:

* БД для хранения метаданных о пайплайнах (Metadata DB), чаще всего используют PostgreSQL, также встречается MySQL
* Веб-приложение (Webserver) с панелью управления, написано на Flask
* Планировщик (Scheduler), в production среде чаще всего используется Celery
* Исполнитель (Executor) на схеме показан отдельно, как это часто подразумевается в документации, но в реальности это не отдельный процесс, а работающий в рамках Scheduler'а
* Воркер (Worker), выполняющий работу, в production среде также чаще всего встречается конфигурация с Celery

На схеме также показан airflow.cfg, в котором хранятся настройки Airflow и к которому обращаются Web Server, Scheduler и Workers, а также отображены DAG'и - файлы с кодом на Python, где описаны пайплайны, к ним тоже обращаются остальные компоненты Airflow.

Приступим к установке и настройке Apache Airflow руками без использования готовых Docker-образов ([подробнее](https://airflow.apache.org/docs/apache-airflow/stable/production-deployment.html) об образе с Airflow и деплое в проде), чтобы наглядно показать как всё запускается изнутри.

## Установка Airflow

### Шаг 1. Окружение и airflow package

Создаём новое виртуальное окружение Python, и ставим в него Apache Airflow версии 2.0.0:

Отключить виртуальное окружение можно с помощью команды __deactivate__.

Подводные камни:

* С помощью __venv__ нельзя создать окружение с версией Python, отличной от уже установленной в системе
* Могут быть проблемы совместимости версий пакетов: так, например, на MacOS с версией Python __3.8.2__ установка Airflow 2.0.0 падает при установке пакета __setproctitle__ с ошибкой _"ERROR: Could not build wheels for setproctitle which use PEP 517 and cannot be installed directly"_

В случае конфликтов версий, предлагается явно создать окружение с Python __3.6.9__, это можно сделать с помощью __conda__, не удаляя при этом существующую версию Python на компьютере:

Отключить conda окружение можно с помощью команды __conda deactivate__.

У Airflow много зависимостей в отличие от того же Luigi, поэтому в терминале при установке будет много информации. После установки Airflow можете посмотреть зависимости через __pip freeze__.

После установки пакета apache-airflow, в виртуальном окружении будет доступна команда airflow. Запустите её без параметров, чтобы увидеть список доступных команд. Например, основную информацию об Airflow можно узнать с помощью команды __airflow info__, а список наиболее полезных комманд - __airflow cheat-sheet__. Подробнее с Airflow CLI можно познакомиться [здесь](https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html).

### Шаг 2. БД и airflow.cfg

Apache Airflow свои настройки хранит в файле __airflow.cfg__, который по умолчанию будет создан в домашней директории юзера по пути __~/airflow/airflow.cfg__. В частности, в начале файла указан путь к папке, где будут располагаться DAG'и (dags_folder).

Путь можно изменить, присвоив переменной окружения новое значение:

Далее выполняем инициализацию для базы данных ([подробнее](https://airflow.apache.org/docs/apache-airflow/stable/howto/initialize-database.html)):

Эта команда накатит все миграции, по умолчанию в качестве базы данных Airflow использует SQLite. Для демонстрационных возможностей это нормально, но в реальном бою лучше всё же переключиться на MySQL или PostgreSQL. Будем использовать PostgreSQL, поэтому если он у вас не стоит, то самое время установить PostgreSQL.

---
Установим актуальную версию PostgreSQL: 

* на MacOS пользуемся пакетным менеджером Homebrew
* на Ubuntu отличия минимальные, ориентироваться можно на [шпаргалку](https://khashtamov.com/ru/postgresql-cheatsheet/)

При установке автоматически должна будет создана папка на диске с БД, если этого не произошло, создадим БД сами, а также запустим PostgreSQL сервер (остановить можно с помощью такой же команды с __stop__):

Флаг __-D__ означает, что PostgreSQL сервер запустится как демон и будет работать в фоновом режиме.

---
Войдем в интерактивный терминал PostgreSQL, создадим базу данных и пользователя к ней для Airflow:

Теперь открываем airflow.cfg и правим значение параметров:

* __sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow_metadata__
* __load_examples = False__

Последний параметр отвечает за загрузку примеров с DAG'ами, они в общем случае не нужны, хотя сами можете попробовать оставить и посмотреть примеры. Если что-то нечаянно удалите в файле конфига, default версию можно взять в [репозитории Apache Airflow](https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg).

В качестве python-драйвера для PostgreSQL используем __psycopg2__, поэтому его необходимо доставить в окружение:

Инициализируем новую базу данных:

В стандартной конфигурации Airflow предлагает нам использовать SequentialExecutor, но мы ведь стараемся подражать продуктивной среде, поэтому будем использовать LocalExecutor. В airflow.cfg поменяйте значение параметра __executor__ на __LocalExecutor__.

Интересный факт - если в качестве базы метаданных использовать однопоточный SQLite, то LocalExecutor превратится в SequentialExecutor.

---

### Шаг 3. Запуск webserver и scheduler

Перед запуском Airflow осталось выполнить еще одно действие - создать пользователя:

Запускаем веб-приложение на 8080 порту и логинимся под созданным пользователем (если этот порт у вас занят, укажите другой свободный):

Если все настроено верно, то увидим перед собой интерфейс:

<img src="images/2/airflow_wo_scheduler.png" style="width: 800px;"> 

На странице можно заметить сообщение: 

_"The scheduler does not appear to be running. 
The DAGs list may not update, and new tasks will not be scheduled."_

Сообщение указывает на то, что не запущен планировщик Airflow - Scheduler. Он отвечает за DAG discovery (обнаружение новых DAG'ов), а также за планирование их запуска. Запустим планировщик командой:

Можно запустить планировщик в отдельном терминале, либо использовать менеджер терминалов __tmux__, например. Из интересного стоит отметить, что можно настроить запуск сервисов Airflow через [systemd](https://github.com/apache/incubator-airflow/tree/master/scripts/systemd) или [docker](https://github.com/puckel/docker-airflow) (примеры из репозитория Airflow).

Итак, база настроена, веб-приложение и планировщик запущены. Теперь напишем первый __data pipeline__.

## Создание DAG и его параметры

### Настройка среды

Как вы наверняка обратили внимание, в файле настроек airflow.cfg есть параметр __dags_folder__, он указывает на путь, где лежат файлы с DAG'ами. Это путь \$AIRFLOW_HOME/dags. Именно туда мы положим наш код с задачами.

Писать код предлагается в __PyCharm__, можно создавать и редактировать \*.py файлы с DAG'ами в отдельном проекте, а в папку \$AIRFLOW_HOME/dags копировать по мере готовности. Для подсветки синтаксиса модулей Airflow и соответствующей версии Python, нужно в настройках интерпретатора выбрать существующее окружение conda369, которое мы создали ранее:

<img src="images/2/pycharm_create.png" style="width: 800px;"> 

### Создание и запуск DAG

Для демонстрации возьмем пример с датасетом __Titanic__ из популярного соревнования на [Kaggle](https://www.kaggle.com/c/titanic/overview). Сделаем следующее: скачаем датасет, а затем создадим сводную таблицу - сгруппируем пассажиров по полу и пассажирскому классу, чтобы узнать количество людей в каждом классе. Результатом будет новый csv-файл со сводной таблицей. 

Для наглядности посмотрим содержимое датасета:

In [11]:
import pandas as pd

url = "https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv"
df = pd.read_csv(url)

In [12]:
print('Dataset size:', c.shape)
df.head()

Dataset size: (887, 8)


Unnamed: 0,Survived,Pclass,Name,Sex,Age,Siblings/Spouses Aboard,Parents/Children Aboard,Fare
0,0,3,Mr. Owen Harris Braund,male,22.0,1,0,7.25
1,1,1,Mrs. John Bradley (Florence Briggs Thayer) Cum...,female,38.0,1,0,71.2833
2,1,3,Miss. Laina Heikkinen,female,26.0,0,0,7.925
3,1,1,Mrs. Jacques Heath (Lily May Peel) Futrelle,female,35.0,1,0,53.1
4,0,3,Mr. William Henry Allen,male,35.0,0,0,8.05


Познакомившись с данными, напишем процессинг датасета и создание DAG:

In [None]:
import os
import datetime as dt
import pandas as pd
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# базовые аргументы DAG
args = {
    'owner': 'airflow',  # Информация о владельце DAG
    'start_date': dt.datetime(2020, 12, 23),  # Время начала выполнения пайплайна
    'retries': 1,  # Количество повторений в случае неудач
    'retry_delay': dt.timedelta(minutes=1),  # Пауза между повторами
    'depends_on_past': False,  # Запуск DAG зависит ли от успешности окончания предыдущего запуска по расписанию
}


def get_path(file_name):
    return os.path.join(os.path.expanduser('~'), file_name)


def download_titanic_dataset():
    url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
    df = pd.read_csv(url)
    df.to_csv(get_path('titanic.csv'), encoding='utf-8')


def pivot_dataset():
    titanic_df = pd.read_csv(get_path('titanic.csv'))
    df = titanic_df.pivot_table(index=['Sex'],
                                columns=['Pclass'],
                                values='Name',
                                aggfunc='count').reset_index()
    df.to_csv(get_path('titanic_pivot.csv'))
    
# В контексте DAG'а зададим набор task'ок
# Объект-инстанс Operator'а - это и есть task
with DAG(
        dag_id='titanic_pivot',  # Имя DAG
        schedule_interval=None,  # Периодичность запуска, например, "00 15 * * *"
        default_args=args,  # Базовые аргументы
) as dag:
    # BashOperator, выполняющий указанную bash-команду
    first_task = BashOperator(
        task_id='first_task',
        bash_command='echo "Here we start! Info: run_id={{ run_id }} | dag_run={{ dag_run }}"',
        dag=dag,
    )
    # Загрузка датасета
    create_titanic_dataset = PythonOperator(
        task_id='download_titanic_dataset',
        python_callable=download_titanic_dataset,
        dag=dag,
    )
    # Чтение, преобразование и запись датасета
    pivot_titanic_dataset = PythonOperator(
        task_id='pivot_dataset',
        python_callable=pivot_dataset,
        dag=dag,
    )
    # Порядок выполнения тасок
    first_task >> create_titanic_dataset >> pivot_titanic_dataset

Основные моменты:

* В DAG'е у нас используются 2 PythonOperator и 1 BashOperator. Обратите внимание, что PythonOperator'ы принимают функцию, которую необходимы выполнить. В первом случае это download_titanic_dataset, которая скачивает датасет из сети, во втором случае это pivot_dataset, которая сохраняет сводную таблицу из исходного файла (сохраненного предыдущей функцией).
* В schedule_interval время задается в UTC, то есть нужно помнить о разнице часовых поясов, при этом в UI для удобства можно выбрать часовой пояс и даты будут отображаться соответственно (появилось в версии [1.10.10](https://airflow.apache.org/blog/airflow-1.10.10/#allow-user-to-chose-timezone-to-use-in-the-rbac-ui)); расписание может указываться в виде строки (str), объекта datetime.timedelta или в формате cron.
* В Airflow допустимы конструкции >> и << (перегрузка операторов битового сдвига), а также методы .set_upstream и .set_downstream, для описания зависимости между двумя операторами, а именно порядка выполнения тасок; можно также таски задавать списком: t1 >> \[t2, t3\].
* В bash_command использованы конструкции вида {{ run_id }}, это __шаблоны Jinja__ - [Jinja Templating](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#jinja-templating), они работают на основе Python-библиотеки для рендеринга шаблонов [Jinja](https://jinja.palletsprojects.com/en/master/), которая компилирует каждый шаблон в Python executable, который принимает на вход контекст и возвращает строку — отрендеренный шаблон. Вместе с [Macros](https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html)'ами и Variables (переменные рассмотрим в след. уроке) они позволяют удобно использовать в разработке различные переменные и форматировать их.

Следующие конструкции равносильны:

Параметров DAG много больше, более подробно можно прочитать в [документации](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html).

Далее готовый файл можно протестировать, проверив корректность кода, выполнив:

Если ошибок нет, то далее файл с DAG'ом нужно поместить в папку, в которую "смотрит" scheduler - $AIRFLOW_HOME/dags, причем с определенной периодичностью, дефолтно 300 сек - __каждые 5 минут__, за это отвечает параметр в секции \[scheduler\] - __dag_dir_list_interval__.

Добавить DAG можно вручную скопировав \*.py файл, либо следующей командой:

Если всё сделано верно, то в списке появится наш DAG. Его можно активировать по кнопке __Pause/Unpause DAG__ и запустить - __Trigger Dag__.

<img src="images/2/first_dag.png" style="width: 800px;">

<img src="images/2/trigger_dag.png" style="width: 1000px;">

Таски по очереди перейдут в статус __"success"__ и после выполнения всех тасок результат работы DAG'а можно посмотреть в файле, расположенного на вашем локальном компьютере по пути: __~/titanic_pivot.csv__

<img src="images/2/dag_result.png" style="width: 300px;">

### WebUI

Познакомимся детальнее с Airflow WebUI, рассмотрим интерфейс DAG:

* Представления DAG: __Tree View__ и __Graph View__
* Время выполнения и попытки тасок: __Task Duration__ и __Task Tries__
* Общее время выполнения DAG: __Landing Times__
* Диаграмма Ганта, показывающая время исполнения тасок в интервалах: __Gantt__
* Детали DAG'а: __Details__
* Текущий код DAG'а: __Code__

Посмотрев вкладку DAGs, изучим другие вкладки:

* Security: пользователи, поли, статистика, доступы
* Browse: списки DAG Runs / Jobs / Tasks, Logs
* Admin: Variables, Configurations, Connections, Plugins, Pools, XComs
* Docs: ссылки на офф. документацию и Git-репозиторий
* Profile: данные профиля юзера

Скриншоты с основными вкладками WebUI также есть в [документации](https://airflow.apache.org/docs/apache-airflow/stable/ui.html).

Как видно, интерфейс Airflow предлагает множество полезных функций и наглядно визуализирует все процессы. Пример с нашим DAG'ом довольно маленький, в реальном проекте DAG'и достигают десятков или даже сотен тасок:

<img src="images/2/dag_view.png" style="width: 800px;"> 

Инстанс DAG'а в определенный момент времени называется __DAG Run__. Каждый DAG может как иметь расписание, так и не иметь, которое показывает, как DAG Run был создан.

__Мониторить__ процесс исполнения задачи можно по __логу__, доступному в TaskInstance -> Log, причем логи разбиты по попыткам (Log by attempts). Для задач, исполняемых на кластере, например, при spark-submit, лог особенно интересен, так как там можно посмотреть __application id__ на кластере Hadoop и по этому id посмотреть детали задачи уже в UI кластера (All Application). Также в TaskInstance есть Task Actions для перезапуска тасок и изменения статуса.

<img src="images/2/taskinstance.png" style="width: 500px;">

### Summary

Airflow сравнительно не трудно развернуть с нуля и установить PostgreSQL в качестве метабазы вместо SQLite. У Airflow много зависимостей, поэтому нужно быть внимательнее с потенциальными конфликтами. Для полноценной работы Airflow помимо базы нужно запустить webserver и scheduler. Наглядный Web UI позволяет обозревать существующие паплайны, запускать их и мониторить. Scheduler следит за появлением новых DAG'ов (в $AIRFLOW_HOME/dags) и отображает их в UI по мере появления.

Airflow комплексный инструмент, предлагающий дата-инжинеру множество удобных возможностей для самых разных задач и хорошо подходит для классической задачи ETL:

<img src="images/2/airflow_etl.png" style="width: 600px;"> 

## Дополнительные материалы

1. [PostgreSQL Client Applications](https://www.postgresql.org/docs/13/app-psql.html)
2. [PostgreSQL Server Applications](https://www.postgresql.org/docs/13/app-postgres.html)
3. [Введение в Apache Airflow](https://khashtamov.com/ru/apache-airflow-introduction/)
4. [Как мы оркестрируем процессы обработки данных с помощью Apache Airflow](https://habr.com/ru/company/lamoda/blog/518620/)
5. [Airflow — инструмент, чтобы удобно и быстро разрабатывать и поддерживать batch-процессы обработки данных](https://habr.com/ru/company/mailru/blog/339392/)

## Домашнее задание

__1.__ Установить и настроить Airflow по материалам лекции, создать проект в PyCharm, запустить пример DAG'а из лекции на локальном компьютере и убедиться в успешном расчете пайплайна.

__2.__ Написать функцию __mean_fare_per_class()__, которая считывает файл titanic.csv и расчитывает среднюю арифметическую цену билета (Fare) для каждого класса (Pclass) и сохраняет результирующий датафрейм в файл __titanic_mean_fares.csv__

__3.__ Добавить в DAG таск с названием __mean_fares_titanic_dataset__, который будет исполнять функцию mean_fare_per_class(), причем эта задача должна запускаться в параллель с pivot_titanic_dataset после таски create_titanic_dataset.

__4.__ В конец пайплайна (после завершения тасок pivot_titanic_dataset и mean_fares_titanic_dataset) добавить шаг с названием __last_task__, на котором в STDOUT выводится строка, сообщающая об окончании расчета и выводящая execution date в формате YYYY-MM-DD. Пример строки: "Pipeline finished! Execution date is 2020-12-28"

__Формат сдачи д/з__: приложите ссылку на ваш Git с кодом DAG'а и туда же загрузите два скриншота с:

* выводом комманды в консоли "head ~/titanic_mean_fares.csv"
* содержанием лога инстанса таски last_task в UI