# Мануал по airflow(дполняется)

сайт -> https://airflow.apache.org/

Airflow зачем?

- это планировщик задач . Чтобы не запускать ежедневно в ручную одно и то же.

airflow базируется на работе DAG - это направленный ацикличный граф. То есть мы декомпозируем нашу задачу на подзадачи, затем пишет граф зависимостей без цикла.

airflow - это Оркестратор — это система, которая организует управление микросервисными приложениями.

пример кода ниже:

In [None]:
"""
Test documentation
"""
from datetime import datetime, timedelta
from textwrap import dedent

# Для объявления DAG нужно импортировать класс из airflow
from airflow import DAG

# Операторы - это кирпичики DAG, они являются звеньями в графе
# Будем иногда называть операторы тасками (tasks)
from airflow.operators.bash import BashOperator

with DAG(
    'tutorial',
    # Параметры по умолчанию для тасок
    default_args={
        # Если прошлые запуски упали, надо ли ждать их успеха
        'depends_on_past': False,
        # Кому писать при провале
        'email': ['airflow@example.com'],
        # А писать ли вообще при провале?
        'email_on_failure': False,
        # Писать ли при автоматическом перезапуске по провалу
        'email_on_retry': False,
        # Сколько раз пытаться запустить, далее помечать как failed
        'retries': 1,
        # Сколько ждать между перезапусками
        'retry_delay': timedelta(minutes=5),  # timedelta из пакета datetime
    },
    # Описание DAG (не тасок, а самого DAG)
    description='A simple tutorial DAG',
    # Как часто запускать DAG
    schedule_interval=timedelta(days=1),
    # С какой даты начать запускать DAG
    # Каждый DAG "видит" свою "дату запуска"
    # это когда он предположительно должен был
    # запуститься. Не всегда совпадает с датой на вашем компьютере
    start_date=datetime(2022, 1, 1),
    # Запустить за старые даты относительно сегодня
    # https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html
    catchup=False,
    # теги, способ помечать даги
    tags=['example'],
) as dag:

    # t1, t2, t3 - это операторы (они формируют таски, а таски формируют даг)
    t1 = BashOperator(
        task_id='print_date',  # id, будет отображаться в интерфейсе
        bash_command='date',  # какую bash команду выполнить в этом таске
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,  # переопределили настройку из DAG
        bash_command='sleep 5',
        retries=3,  # тоже переопределили retries (было 1)
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )  # dedent - это особенность Airflow, в него нужно оборачивать всю доку

    dag.doc_md = __doc__  # Можно забрать докстрингу из начала файла вот так
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # а можно явно написать
    # формат ds: 2021-12-25
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )  # поддерживается шаблонизация через Jinja
    # https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#concepts-jinja-templating

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
    )

    # А вот так в Airflow указывается последовательность задач
    t1 >> [t2, t3]
    # будет выглядеть вот так
    #      -> t2
    #  t1 | 
    #      -> t3

## Операторы (Дополняется)

Операторы - это звенья airflow , иногда называются task'ами.

Существуют BashOperator, PythonOperator, PostgresOperator, SSHOperator, DummyOperator:

* башопертаор для команд терминала
* питон - для скриптов , с аргументами
* dummy - просто опертор без всего, для красивого вида
* postgre - Этот оператор выдает инструкцию SQL для базы данных Postgres.
* SSHOperator- баш, но имеет встроенную поддержку подклдючения ssh

Рассмотрим подробнее:

### bashoperator

In [None]:
# Операторы - это кирпичики DAG, они являются звеньями в графе
# Будем иногда называть операторы тасками (tasks)
from airflow.operators.bash import BashOperator

# вот так можно попросить Airflow подставить логическую дату
# в формате YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    
    task_id="test_env",
    
    bash_command="/tmp/test.sh ",  # обратите внимание на пробел в конце!
    # пробел в конце нужен в случае BashOperator из-за проблем с шаблонизацией
    # вики на проблему https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694614
    # и обсуждение https://github.com/apache/airflow/issues/1017
    
    dag=dag,  # говорим, что таска принадлежит дагу из переменной dag
    
    env={"DATA_INTERVAL_START": date},  # задает переменные окружения
)

* task_id - id для каждой таски
* bash_command - команда выполнения
* dag - какому dag принадлежит , если работаем не через with
* env - переменные окружения

### Python operator

In [None]:
def my_sleeping_function(random_base):
    """Заснуть на random_base секунд"""
    time.sleep(random_base)

# Генерируем таски в цикле - так тоже можно
for i in range(5):
    # Каждый таск будет спать некое количество секунд
    task = PythonOperator(
        task_id='sleep_for_' + str(i),  # в id можно делать все, что разрешают строки в python
        python_callable=my_sleeping_function,
        # передаем в аргумент с названием random_base значение float(i) / 10
        op_kwargs={'random_base': float(i) / 10},
    )
    # настраиваем зависимости между задачами
    # run_this - это некий таск, объявленный ранее (в этом примере не объявлен)
    run_this >> task

Мы можем генерировать несколько таск через цикл:
* в task_id указываем генерированные таски
* в python_callable функцию. Важно, если функция что-то возврщает , ее return передается в XCOM.
* op_kwargs - для передачи аргументов сначала переменную из функции, затем значение в ввиде словаря

А также указываь зависимость между тасками.

In [None]:
def print_context(ds, **kwargs):
    """Пример PythonOperator"""
    # Через синтаксис **kwargs можно получить словарь
    # с настройками Airflow. Значения оттуда могут пригодиться.
    # Пока нам не нужно
    print(kwargs)
    # В ds Airflow за нас подставит текущую логическую дату - строку в формате YYYY-MM-DD
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print_the_context',  # нужен task_id, как и всем операторам
    python_callable=print_context,  # свойственен только для PythonOperator - передаем саму функцию
)

Также хочется отметить, что ест переменные, которые уже лежат в airflow, к примеру: ds(лог. дата), **kwargs - позволяет получить словарь любых значений.

### PostgresOperator 

In [None]:
from airflow.providers.postgres.operators.postgres import PostgresOperator

t3 = PostgresOperator(
	task_id='PythonOperator',
	sql='CREATE TABLE my_table (my_column varchar(10));',
	postgres_conn_id='my_postgres_connection',
	autocommit=False
)

* task_id - id для каждой таски
* sql - запрос
* postgres_conn_id - здесь , как я понял хуки. О них нужно больше инфы.

### другие операторы

## Даты

Даты передаются через ds - это логическая дата . Зачем она нужна?

Например, если даг настроен на ежедневный запуск, то в день 2022-02-12 даг запустится с логической датой 2022-02-11 (вчера), потому что за 12-ое число данных еще может не быть.

## Xcom (передача информации)

в airflow осуществляется передача информации через xcom в формате ключ:значение

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

import requests
import json

url = 'https://covidtracking.com/api/v1/states/'
state = 'wa'

def get_testing_increase(state, ti):
    """
    Gets totalTestResultsIncrease field from Covid API for given state and returns value
    """
    res = requests.get(url + '{0}/current.json'.format(state))
    testing_increase = json.loads(res.text)['totalTestResultsIncrease']
    # в ti уходит task_instance, его передает Airflow под таким названием
    # когда вызывает функцию в ходе PythonOperator
    ti.xcom_push(
        key='testing_increase',
        value=testing_increase
    )

def analyze_testing_increases(state, ti):
    """
    Evaluates testing increase results
    """
    testing_increases = ti.xcom_pull(
        key='testing_increase',
        task_ids='get_testing_increase_data_{0}'.format(state)
    )
    print('Testing increases for {0}:'.format(state), testing_increases)

# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'xcom_dag',
    start_date=datetime(2021, 1, 1),
    max_active_runs=2,
    schedule_interval=timedelta(minutes=30),
    default_args=default_args,
    catchup=False
) as dag:
    opr_get_covid_data = PythonOperator(
        task_id = 'get_testing_increase_data_{0}'.format(state),
        python_callable=get_testing_increase,
        op_kwargs={'state':state}
    )
    opr_analyze_testing_data = PythonOperator(
        task_id = 'analyze_data',
        python_callable=analyze_testing_increases,
        op_kwargs={'state':state}
    )

    opr_get_covid_data >> opr_analyze_testing_data

Как внести данные?

* в функции мы указываем либо **kwargs, либо ti
* далее ti.xcom_push - ti (task instance)
* далее указывает key и value.
* также можно через return  (такое поведение можно убрать, если выставить в настройках {'do_xcom_push': False}).

Как получить данные?

* ti.xcom_pull
* в аргументах нужен key и task_ids оператора ,где передавался ti.

## Connections и Variables

Чтобы безопансо хранить подключения

In [None]:
from airflow.hooks.base_hook import BaseHook

connection = BaseHook.get_connection("conn_name")
conn_password = connection.password
conn_login = connection.login

Динамические переменные, которые задаются в Airflow и которые можно доставать из кода:

In [None]:
from airflow.models import Variable

is_prod = Variable.get("is_prod")  # необходимо передать имя, заданное при создании Variable
# теперь в is_prod лежит значение Variable

## Важные моменты

* идемпотентность (Несколько раз в день вызываем задачу, результат один)
* 1 таска - 1 операция
* Осторожнее с datetime.now() - может быть конфликт с логической датой
* Не хранить много в XCom
* Код в отдельном месте, затем доставялем в airflow, так как это оркестратор.
* Тестирование airflow

## Наработки:

* airflow db init важна изначальная инициализация бд для локальной работы
* создание пользователя:

airflow users create \
    --username admin \
    --firstname YOUR_FIRST_NAME \
    --lastname YOUR_LAST_NAME \
    --role Admin \
    --email YOUR_EMAIL@example.com

* airflow webserver --port 8080
* в отдельном терминале запускаем sheduler
* dag создаетс в отдельной папке dags
* если не отображаются нужно перейтив в ls ~/airflow/, изменить в airflow.cfg папку нахождения dag's.

mac приколы:

* чтобы заработал нужно скачивать warnings
* sys.path.append(os.getcwd()) помогает использовать модули без проблем
* os.environ['NO_PROXY'] = '*' для того, чтобы на mac не вхоидл в задачу ожидания


В случае ошибок:
1. Смотри на warnings
2. Обязательно запускай параллельно airflow scheduler с airflow webserver --port 8080 на локальном хосте
3. Смотри logи
4. Скачивай ,что предлагает терминал
* The scheduler does not appear to be running. Last heartbeat was received 33 seconds ago. The DAGs list may not update, and new tasks will not be scheduled. Много вариаций решений - 1 из них установка недостаящих пакетов.
* Лучше менять при импорте имена, если они как-то коррелируют
* 