# Разработка ETL-пайплайна для агрегации данных о поездках


- Автор: Коростин Никита
- Дата: 05.11.2025

## Описание проекта

Вы работаете аналитиком данных. Каждый день компания обрабатывает миллионы поездок, оплаченных разными способами. Чтобы финансовая и продуктовая команды регулярно получали актуальные данные о выручке и поведении пассажиров, коллеги решили настроить автоматическую обработку этих данных.

Ваша задача — построить витрину данных, которая будет агрегировать информацию о поездках по каждому способу оплаты. Для этого нужно написать PySpark-скрипт, который рассчитает ключевые показатели: количество поездок, среднюю стоимость поездки, средние чаевые и суммарную выручку по каждому типу оплаты.

Чтобы процесс был полностью автоматическим и не зависел от ручных запусков, необходимо создать DAG в Airflow. Этот DAG должен ежедневно:

* проверять наличие новых файлов с данными;

* запускать Spark-задачу;

* формировать обновлённую итоговую таблицу.

Эта таблица станет основой для финансовых отчётов и аналитических дашбордов.

## Описание данных

Таблица `taxi_data` содержит данные об активности пользователей и состоит из следующих полей:

* `taxi_id` — идентификатор водителя;

* `trip_start_timestamp` — время начала поездки;

* `trip_end_timestamp` — время окончания поездки;

* `trip_seconds` — длительность поездки в секундах;

* `trip_miles` — дистанция поездки;

* `fare` — стоимость поездки;

* `tips` — размер чаевых;

* `trip_total` — общая стоимость поездки: стоимость поездки + чаевые + комиссия;

* `payment_type` — способ оплаты.

## Что нужно сделать

В проекте вам предстоит автоматизировать подготовку витрины данных по поездкам:

1. Сначала необходимо написать Spark-скрипт, который будет обрабатывать данные о поездках и агрегировать показатели по способам оплаты `payment_type`. Понадобится рассчитать несколько показателей:

* количество поездок, которое показывает общий спрос и загрузку сервиса;
* среднюю стоимость `fare`, которое отражает уровень среднего чека поездки;
* средние чаевые `tips` — индикатор удовлетворённости клиентов и мотивации водителей;
* суммарную выручку `trip_total` — ключевой показатель дохода компании.

Все результаты должны собираться в одну итоговую таблицу `taxi_payment_summary`. После этого таблицу нужно записать в ClickHouse с помощью JDBC-драйвера.

2. Далее вам понадобится настроить DAG в Airflow. DAG должен запускаться ежедневно. Перед запуском он проверяет наличие файла с данными за нужную дату в S3-хранилище и только после появления файла запускает Spark-задачу.

Когда всё будет готово, перенесите свой код и результаты в шаблон для ревьюеров в следующем уроке. Учтите, что этот шаблон служит только для передачи решения — запустить в нём код не получится.

## Шаг 1. Настройте Spark-агрегацию

Ваши данные для подключения к DBeaver:
*   Имя пользователя — (скрыто)
*   Пароль — (скрыто)

Данные хранятся в формате Parquet, поэтому для чтения используйте метод `spark.read.parquet()`. Это быстрее и надёжнее, чем CSV.

Сгруппируйте данные по полю `payment_type` и рассчитайте четыре показателя:

* количество поездок — `count(*)`;
* среднюю стоимость — `avg(...)`;
* средние чаевые — `avg(...)`;
* суммарную выручку — `sum(...)`.

Так получится витрина для анализа информации по каждому способу оплаты. После этого настройте запись полученной таблицы в ClickHouse. Проверьте, что всё работает, и переходите в шагу 2.

In [None]:
#filename=my_spark_job.py

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import sys

# Создаём Spark-сессию и при необходимости добавляем конфигурации
spark = SparkSession.builder.appName("myAggregateTest").config("fs.s3a.endpoint", "storage.yandexcloud.net").getOrCreate()

# Указываем порт и параметры кластера ClickHouse
jdbcPort = 8443
jdbcHostname = "(скрыто)"
username = "da_20250922_ac1634726c"
password = "(скрыто)"
jdbcDatabase = "playground_" + username
jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true"

df = spark.read.parquet("s3a://da-plus-dags/project_04/taxi_data.parquet")

summary_df = df.groupBy("payment_type").agg(
    F.count("*").alias("trip_count"),
    F.avg("fare").alias("avg_fare"),
    F.avg("tips").alias("avg_tips"),
    F.sum("trip_total").alias("total_revenue")
)

# Запись результата в ClickHouse
summary_df.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("user", username) \
    .option("password", password) \
    .option("dbtable", "taxi_payment_summary") \
    .mode("append") \
    .save()


## Шаг 2. Настройте DAG

Дату в качестве параметра передавать не нужно. В DAG используйте `S3KeySensor`, чтобы дождаться появления файла в S3. После этого запускайте `DataprocCreatePysparkJobOperator`, передав путь к вашему скрипту. Дополнительный класс-оператор создавать не требуется.

Ваши данные для подключения к Airflow:
*   IP — (скрыто)
*   Имя пользователя — da_20250922_ac1634726c
*   Пароль — (скрыто)

In [None]:
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.yandex.operators.dataproc import DataprocCreatePysparkJobOperator

DAG_ID = "taxi_payment_processing"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False
) as dag:
    wait_for_input = S3KeySensor(
        task_id='wait_for_data',
        bucket_name='da-plus-dags',
        bucket_key='project_04/taxi_data.parquet',
        aws_conn_id='s3',
        poke_interval=300,
        timeout=3600,
    )

    run_pyspark = DataprocCreatePysparkJobOperator(
        task_id="run_pyspark_job",
        name="taxi_payment_analysis",
        cluster_id="(скрыто)",
        main_python_file_uri="s3a://da-plus-dags/da_20250922_ac1634726c/jobs/my_spark_job.py"
    )

    wait_for_input >> run_pyspark

## Шаг 3. Запустите DAG с помощью Airflow UI

Теперь можно переходить к запуску. Нажмите кнопку «Проверить», подождите 5 минут и снова нажмите её. Вам будут показаны данные для входа в веб-интерфейс Airflow. В интерфейсе найдите ваш DAG и запустите его.

Проверьте, что DAG выполнился, а результат соответствует ожиданиям. Если всё получилось — поздравляем, проект завершён!