# Автоматизация с помощью Airflow

- Автор: Коростин Никита
- Дата: 01.11.2025

## Цели и задачи проекта

Сервис предоставляет доступ к контенту разных форматов, включая текст, аудио и не только. В этом проекте вы построите пайплайн в Airflow, который будет запускать PySpark-скрипт для обработки данных и создания витрин. Эти витрины помогут команде сервиса быстрее и проще готовить отчёты.

## Описание данных

Таблица `bookmate.audition` содержит данные об активности пользователей и включает столбцы:

* `audition_id` — уникальный идентификатор сессии чтения или прослушивания;

* `puid` — идентификатор пользователя;

* `usage_platform_ru` — название платформы, с помощью которой пользователь взаимодействует с контентом;

* `msk_business_dt_str` — дата и время события (строка, часовой пояс — МСК);

* `app_version` — версия приложения;

* `adult_content_flg` — значение, которое показывает, был ли контент для взрослых (`True` или `False`);

* `hours` — длительность сессии чтения или прослушивания в часах;

* `hours_sessions_long` — длительность длинных сессий в часах;

* `kids_content_flg` — значение, которое показывает, был ли это детский контент (`True` или `False`);

* `main_content_id` — идентификатор основного контента;

* `usage_geo_id` — идентификатор географического местоположения пользователя.

Таблица `bookmate.content` включает столбцы:

* `main_content_id` — идентификатор основного контента;

* `main_author_id` — идентификатор основного автора контента;

* `main_content_type` — тип контента: аудио, текст или другой;

* `main_content_name` — название контента;

* `main_content_duration_hours` — длительность контента в часах;

* `published_topic_title_list` — список жанров или тем контента.</font>

## Содержимое проекта

Проект предполагает несколько шагов:

1. Написать Spark-код — вы подключитесь к своему хранилищу данных и укажете, куда сохранять результат.

2. Создать DAG — он будет запускать Spark-код. В DAG вы опишете задачи с помощью `PythonOperator` и `DataprocCreatePysparkJobOperator`, настроите зависимости и получите агрегированные данные для первых бизнес-выводов.

3. Запустить Airflow — именно он будет управлять вашим пайплайном.

## 1. Написание Spark-кода

Ваши данные для подключения к DBeaver:
*   Имя пользователя — (скрыто)
*   Пароль — (скрыто)

В коде ниже приведён написанный Spark-скрипт. Ваша задача — правильно указать данные для подключения к вашему хранилищу: порты, параметры кластера ClickHouse и путь, куда будут записываться агрегаты.

In [None]:
# filename=my_spark_job.py
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import sys

# Создаём Spark-сессию и при необходимости добавляем конфигурации
spark = SparkSession.builder.appName("myAggregateTest").config("fs.s3a.endpoint", "storage.yandexcloud.net").getOrCreate()

# Указываем порт и параметры кластера ClickHouse
jdbcPort = 8443
jdbcHostname = "(скрыто)"
username = "(скрыто)"
jdbcDatabase = "playground_" + username
jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true"

# Получаем аргумент из Airflow
my_date = sys.argv[1].replace('-', '_')

# Считываем исходные данные за нужную дату
df = spark.read.csv(f"s3a://da-plus-dags/script_bookmate/data_{my_date}/audition_content.csv", inferSchema=True, header=True)

# Строим агрегат по пользователям
result_df = df.groupBy("puid").agg(
    F.countDistinct("audition_id").alias("audition_count"),
    F.avg("hours").alias("avg_hours")
)

result_df.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("user", username) \
    .option("password", "(скрыто)") \
    .option("dbtable", "bookmate_user_aggregate") \
    .mode('append') \
    .save()

В результате будет создан файл с названием, указанным в первой строке. Этот файл можно будет запустить с помощью Airflow, но сначала понадобится настроить DAG.

## 2. Создание DAG

Теперь, когда Spark-код готов, нужно создать DAG, который будет его запускать.

### Задание 1

Создайте «каркас» нового DAG для вашего проекта. DAG должен запускаться каждый день начиная с 1 января 2025 года. При этом запускать DAG за пропущенные даты не нужно.

Используйте менеджер контекста `with ... as dag:` — так все задачи будут корректно привязаны к DAG. После конструкции пока напишите только `pass`.

In [None]:
# filename=bookmate_dag.py

from datetime import datetime
from airflow import DAG
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.providers.yandex.operators.dataproc import DataprocCreatePysparkJobOperator

class PysparkJobOperator(DataprocCreatePysparkJobOperator):
    template_fields = ("cluster_id", "args",)

DAG_ID = "audition_content_analysis"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False
) as dag:
    pass

### Задание 2

Теперь добавьте проверку входного файла. DAG не должен стартовать, пока в S3 не появится файл с данными за нужную дату.

Для решения используйте сенсор `S3KeySensor`. Он должен проверять наличие файла каждые 5 минут и ждать максимум час. В качестве аргумента для параметра `bucket_name` укажите строку `"da-plus-dags"`.

Файл называется `audition_content.csv`, но в имени папки должна быть дата запуска в формате `YYYY_MM_DD`. Например, для 5 января 2025 путь будет таким: `script_bookmate/data_2025_01_05/audition_content.csv`. Путь к данным  должен быть аргументом для параметра `bucket_key` в `S3KeySensor`.

In [None]:
# filename=bookmate_dag.py

from datetime import datetime
from airflow import DAG
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.providers.yandex.operators.dataproc import DataprocCreatePysparkJobOperator

class PysparkJobOperator(DataprocCreatePysparkJobOperator):
    template_fields = ("cluster_id", "args",)

DAG_ID = "audition_content_analysis"

    wait_for_input = S3KeySensor(
        task_id='wait_for_input',
        bucket_name="da-plus-dags",
        bucket_key="script_bookmate/data_{{ ds.replace('-', '_') }}/audition_content.csv",
        aws_conn_id='s3',
        poke_interval=300, 
        timeout=3600,   
    )

### Задание 3

Создайте задачу для запуска Spark-скрипта через Airflow. Укажите путь к файлу, который вы создали на первом шаге проекта.

In [None]:
# filename=bookmate_dag.py

from airflow.providers.yandex.operators.dataproc import DataprocCreatePysparkJobOperator

    run_pyspark = PysparkJobOperator(
        task_id="run_pyspark_job",
        name="audition_content_analysis",
        cluster_id="(скрыто)",
        main_python_file_uri="s3a://da-plus-dags/script_bookmate/my_spark_job.py",
        args=["{{ ds }}"]
)

### Задание 4

Теперь соберите все фрагменты вместе:

* Опишите DAG с нужными параметрами.

* Добавьте сенсор для ожидания входного файла.

* Добавьте Spark-задачу для запуска скрипта.

* Настройте зависимости так, чтобы Spark-задача запускалась только после появления файла.

Ваши данные для подключения к Airflow:
*   IP — (скрыто)
*   Имя пользователя — (скрыто)
*   Пароль — (скрыто)

In [None]:
# filename=bookmate_dag.py

from datetime import datetime
from airflow import DAG
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.providers.yandex.operators.dataproc import DataprocCreatePysparkJobOperator

class PysparkJobOperator(DataprocCreatePysparkJobOperator):
    template_fields = ("cluster_id", "args",)

DAG_ID = "audition_content_analysis"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False
) as dag:
    # 1) Ждём появления входного файла в S3
    wait_for_input = S3KeySensor(
        task_id='wait_for_input',
        bucket_name="da-plus-dags",
        bucket_key="script_bookmate/data_{{ ds.replace('-', '_') }}/audition_content.csv",
        aws_conn_id='s3',
        poke_interval=300, 
        timeout=3600,    
    )

    run_pyspark = PysparkJobOperator(
        task_id="run_pyspark_job",
        name="audition_content_analysis",
        cluster_id="(скрыто)",
        main_python_file_uri="s3a://da-plus-dags/script_bookmate/my_spark_job.py",
        args=["{{ ds }}"]
    )

    wait_for_input >> run_pyspark

## 3. Запуск Airflow

Теперь можно переходить к запуску. Нажмите кнопку «Проверить», подождите 5 минут и снова нажмите её. Вам будут показаны данные для входа в веб-интерфейс Airflow. В интерфейсе найдите ваш DAG и запустите его.

Проверьте, что DAG выполнился, а результат соответствует ожиданиям. Если всё получилось — поздравляем, проект завершён!