# Домашнее задание 6. Автоматизация расчетов с помощью Apache AirFlow

## Шаг 1. Создать Airflow DAG, который будет ежедневно выполнять следующие действия:
загружать данные из файлов с локальной директории в соответствующие таблицы (параллельные task-и)
- о клиентах — customer;
- о продуктах — product;
- о заказах — orders;
- связывающие заказы и продукты — order_items.

### Импорт библиотек

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pendulum import datetime
import csv
import os

DATA_PATH = "/opt/airflow/data"
CONN_ID = "postgres"

### Создание таблиц

In [None]:
DDL_SQL = """CREATE TABLE IF NOT EXISTS public.customer(
    customer_id           INT PRIMARY KEY,
    first_name            VARCHAR(50)  NOT NULL,
    last_name             VARCHAR(50),
    gender                VARCHAR(10),
    dob                   DATE,
    job_title             VARCHAR(200),
    job_industry_category VARCHAR(50),
    wealth_segment        VARCHAR(30)  NOT NULL,
    deceased_indicator    BOOLEAN      NOT NULL,
    owns_car              BOOLEAN      NOT NULL,
    address               VARCHAR(200) NOT NULL,
    postcode              VARCHAR(10)  NOT NULL,
    state                 VARCHAR(50)  NOT NULL,
    country               VARCHAR(50)  NOT NULL,
    property_valuation    SMALLINT     NOT NULL
);

CREATE TABLE IF NOT EXISTS public.product_stage (
    product_id     INT,
    brand          VARCHAR(50),
    product_line   VARCHAR(20),
    product_class  VARCHAR(10),
    product_size   VARCHAR(10),
    list_price     NUMERIC(10,2),
    standard_cost  NUMERIC(10,2)
);

CREATE TABLE IF NOT EXISTS public.orders (
    order_id     INT         PRIMARY KEY,
    customer_id  INT         NOT NULL,
    order_date   DATE        NOT NULL,
    online_order BOOLEAN,
    order_status VARCHAR(20) NOT NULL
);

CREATE TABLE IF NOT EXISTS public.order_items (
    order_item_id                INT           PRIMARY KEY,
    order_id                     INT           NOT NULL,
    product_id                   INT           NOT NULL,
    quantity                     SMALLINT      NOT NULL,
    item_list_price_at_sale      NUMERIC(10,2) NOT NULL,
    item_standard_cost_at_sale   NUMERIC(10,2)
);"""

### Функции обработки исходных данных и загрузки

In [None]:
# обработка to boolean
def str_to_bool(value: str) -> bool:
    if value is None:
        raise ValueError("NULL boolean")
    v = value.strip().lower()
    if v in ("yes", "y", "true", "1"):
        return True
    if v in ("no", "n", "false", "0"):
        return False
    raise ValueError(f"Invalid boolean value: {value}")

# обработка пола
def normalize_gender(value: str) -> str | None:
    if value is None:
        return None
    v = value.strip().lower()
    if v == "":
        return None

    # учёт опечаток
    female_variants = {"f", "female", "femal", "femla", "femail", "femael", "fem"}
    male_variants   = {"m", "male", "mela", "mael", "mlae"}
    if v in female_variants:
        return "F"
    if v in male_variants:
        return "M"
    if v[0] == "f":
        return "F"
    if v[0] == "m":
        return "M"
    return None

def exec_sql(sql: str):
    hook = PostgresHook(postgres_conn_id=CONN_ID)
    with hook.get_conn() as conn, conn.cursor() as cur:
        cur.execute(sql)
        conn.commit()

def load_customer():
    fixed = 0
    unknown = 0
    total = 0

    hook = PostgresHook(postgres_conn_id=CONN_ID)
    path = os.path.join(DATA_PATH, "customer.csv")
    with hook.get_conn() as conn, conn.cursor() as cur:
        # idempotent
        cur.execute("TRUNCATE TABLE public.customer;")
        with open(path, encoding="utf-8") as f:
            reader = csv.DictReader(f, delimiter=";")
            for r in reader:
                total += 1

                g_raw = r["gender"]
                g_norm = normalize_gender(g_raw)

                # статистика
                if (g_raw or "").strip() != "" and g_norm is None:
                    unknown += 1
                elif (g_raw or "").strip().lower() not in ("", "f", "m", "female", "male") and g_norm is not None:
                    fixed += 1

                cur.execute("""
                    INSERT INTO public.customer VALUES (
                        %(customer_id)s,
                        %(first_name)s,
                        %(last_name)s,
                        %(gender)s,
                        %(dob)s,
                        %(job_title)s,
                        %(job_industry_category)s,
                        %(wealth_segment)s,
                        %(deceased_indicator)s,
                        %(owns_car)s,
                        %(address)s,
                        %(postcode)s,
                        %(state)s,
                        %(country)s,
                        %(property_valuation)s
                    )
                """, {
                    "customer_id": int(r["customer_id"]),
                    "first_name": r["first_name"],
                    "last_name": r["last_name"] or None,
                    "gender": g_norm,
                    "dob": r["DOB"] or None,
                    "job_title": r["job_title"] or None,
                    "job_industry_category": r["job_industry_category"] or None,
                    "wealth_segment": r["wealth_segment"],
                    "deceased_indicator": str_to_bool(r["deceased_indicator"]),
                    "owns_car": str_to_bool(r["owns_car"]),
                    "address": r["address"],
                    "postcode": r["postcode"],
                    "state": r["state"],
                    "country": r["country"],
                    "property_valuation": int(r["property_valuation"]),
                })
        conn.commit()
    print(
        f"[customer] rows={total}, "
        f"gender_fixed={fixed}, "
        f"gender_unknown={unknown}"
    )

def load_order_items():
    hook = PostgresHook(postgres_conn_id=CONN_ID)
    path = os.path.join(DATA_PATH, "order_items.csv")
    with hook.get_conn() as conn, conn.cursor() as cur:
        cur.execute("TRUNCATE TABLE public.order_items;")
        with open(path, encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for r in reader:
                cur.execute("""
                    INSERT INTO public.order_items VALUES (
                        %(order_item_id)s,
                        %(order_id)s,
                        %(product_id)s,
                        %(quantity)s,
                        %(item_list_price_at_sale)s,
                        %(item_standard_cost_at_sale)s
                    )
                """, {
                    "order_item_id": int(r["order_item_id"]),
                    "order_id": int(r["order_id"]),
                    "product_id": int(r["product_id"]),
                    "quantity": int(float(r["quantity"])),
                    "item_list_price_at_sale": float(r["item_list_price_at_sale"]),
                    "item_standard_cost_at_sale": (
                        float(r["item_standard_cost_at_sale"])
                        if r["item_standard_cost_at_sale"] else None
                    ),
                })
        conn.commit()

def load_product_stage():
    hook = PostgresHook(postgres_conn_id=CONN_ID)
    path = os.path.join(DATA_PATH, "product.csv")
    with hook.get_conn() as conn, conn.cursor() as cur:
        cur.execute("TRUNCATE TABLE public.product_stage;")
        with open(path, encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for r in reader:
                cur.execute("""
                    INSERT INTO public.product_stage VALUES (
                        %(product_id)s,
                        %(brand)s,
                        %(product_line)s,
                        %(product_class)s,
                        %(product_size)s,
                        %(list_price)s,
                        %(standard_cost)s
                    )
                """, {
                    "product_id": int(r["product_id"]),
                    "brand": r["brand"],
                    "product_line": r["product_line"],
                    "product_class": r["product_class"],
                    "product_size": r["product_size"],
                    "list_price": float(r["list_price"]),
                    "standard_cost": (
                        float(r["standard_cost"]) if r["standard_cost"] else None
                    ),
                })
        conn.commit()
        
def rebuild_product():
    hook = PostgresHook(postgres_conn_id=CONN_ID)
    sql = """
    TRUNCATE TABLE public.product;
    INSERT INTO public.product
    SELECT
        product_id,
        brand,
        product_line,
        product_class,
        product_size,
        list_price,
        standard_cost
    FROM (
        SELECT
            *,
            ROW_NUMBER() OVER (
                PARTITION BY product_id
                ORDER BY list_price DESC
            ) AS rn
        FROM public.product_stage
    ) t
    WHERE rn = 1;
    """
    with hook.get_conn() as conn, conn.cursor() as cur:
        cur.execute(sql)
        conn.commit()

def load_simple(table, filename, delimiter=","):
    hook = PostgresHook(postgres_conn_id=CONN_ID)
    path = os.path.join(DATA_PATH, filename)
    with hook.get_conn() as conn, conn.cursor() as cur:
        cur.execute(f"TRUNCATE TABLE public.{table};")
        with open(path, encoding="utf-8") as f:
            reader = csv.DictReader(f, delimiter=delimiter)
            cols = reader.fieldnames
            sql = f"""
                INSERT INTO public.{table} ({", ".join(cols)})
                VALUES ({", ".join(["%s"] * len(cols))})
            """
            for r in reader:
                cur.execute(sql, [r[c] or None for c in cols])
        conn.commit()

### Запуск DAG

In [None]:
with DAG(
    dag_id="daily_parallel_insert_clear",
    start_date=datetime(2025, 1, 1, tz="Europe/Moscow"),
    schedule="0 19 * * *",
    catchup=False,
    tags=["etl", "strict", "public"],
) as dag:

    create_tables = PythonOperator(
        task_id="create_tables",
        python_callable=exec_sql,
        op_kwargs={"sql": DDL_SQL},
    )

    t_customer = PythonOperator(
        task_id="load_customer",
        python_callable=load_customer,
    )

    t_product_stage = PythonOperator(
    task_id="load_product_stage",
    python_callable=load_product_stage,
    )

    t_product_rebuild = PythonOperator(
        task_id="rebuild_product",
        python_callable=rebuild_product,
    )

    t_orders = PythonOperator(
        task_id="load_orders",
        python_callable=load_simple,
        op_kwargs={"table": "orders", "filename": "orders.csv"},
    )

    t_order_items = PythonOperator(
    task_id="load_order_items",
    python_callable=load_order_items,
    )


### Запуск тасков

In [None]:
   # cначала всегда создаём таблицы
    create_tables >> [
    t_customer,
    t_orders,
    t_order_items,
    t_product_stage
    ]

    # product строится только после staging
    t_product_stage >> t_product_rebuild

## Шаг 2. Далее выполнить следующие запросы, записав ответы в отдельные файлы (параллельные task):
- Найти имена и фамилии клиентов с ТОП-3 минимальной и ТОП-3 максимальной суммой транзакций за весь период (учесть клиентов, у которых нет заказов).
- Найти ТОП-5 клиентов (по общему доходу) в каждом сегменте благосостояния (wealth_segment). Вывести имя, фамилию, сегмент и общий доход. Если в сегменте менее 5 клиентов, вывести всех.

### Формулируем запросы

In [None]:
SQL_TOP3 = """
WITH client_revenue AS (
    SELECT
        c.customer_id,
        c.first_name,
        c.last_name,
        COALESCE(
            SUM(oi.quantity * oi.item_list_price_at_sale),
            0
        ) AS total_revenue
    FROM public.customer c
    LEFT JOIN public.orders o
        ON c.customer_id = o.customer_id
    LEFT JOIN public.order_items oi
        ON o.order_id = oi.order_id
    GROUP BY
        c.customer_id,
        c.first_name,
        c.last_name
),
ranked AS (
    SELECT *,
           ROW_NUMBER() OVER (ORDER BY total_revenue ASC)  AS rn_min,
           ROW_NUMBER() OVER (ORDER BY total_revenue DESC) AS rn_max
    FROM client_revenue
)
SELECT
    first_name,
    last_name,
    total_revenue,
    CASE
        WHEN rn_min <= 3 THEN 'MIN'
        WHEN rn_max <= 3 THEN 'MAX'
    END AS category
FROM ranked
WHERE rn_min <= 3
   OR rn_max <= 3
ORDER BY category, total_revenue;
"""

SQL_TOP5 = """
WITH client_revenue AS (
    SELECT
        c.customer_id,
        c.first_name,
        c.last_name,
        c.wealth_segment,
        COALESCE(
            SUM(oi.quantity * oi.item_list_price_at_sale),
            0
        ) AS total_revenue
    FROM public.customer c
    LEFT JOIN public.orders o
        ON c.customer_id = o.customer_id
    LEFT JOIN public.order_items oi
        ON o.order_id = oi.order_id
    GROUP BY
        c.customer_id,
        c.first_name,
        c.last_name,
        c.wealth_segment
),
ranked AS (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY wealth_segment
               ORDER BY total_revenue DESC
           ) AS rn
    FROM client_revenue
)
SELECT
    first_name,
    last_name,
    wealth_segment,
    total_revenue
FROM ranked
WHERE rn <= 5
ORDER BY wealth_segment, total_revenue DESC;
"""

### Функция записи результата в csv-файл

In [None]:
def to_csv(sql: str, output_filename: str):
    hook = PostgresHook(postgres_conn_id=CONN_ID)

    with hook.get_conn() as conn, conn.cursor() as cur:
        cur.execute(sql)
        rows = cur.fetchall()
        columns = [desc[0] for desc in cur.description]

    output_path = os.path.join("/opt/airflow/results", output_filename)

    with open(output_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(columns)
        writer.writerows(rows)

### Доработка DAG

In [None]:
with DAG(
    dag_id="daily_parallel_insert_clear",
    start_date=datetime(2025, 1, 1, tz="Europe/Moscow"),
    schedule="0 19 * * *",
    catchup=False,
    tags=["etl", "strict", "public"],
) as dag:
    t_top3 = PythonOperator(
    task_id="top3_clients",
    python_callable=to_csv,
    op_kwargs={
        "sql": SQL_TOP3,
        "output_filename": "top3_clients.csv",
        },
    )
    t_top5 = PythonOperator(
    task_id="top5_clients",
    python_callable=to_csv,
    op_kwargs={
        "sql": SQL_TOP5,
        "output_filename": "top5_clients.csv",
        },
    )


### Дополняем таски

In [None]:
    # выполняем запросы по заданию
    [
    t_customer,
    t_orders,
    t_order_items,
    t_product_rebuild,
        ] >> t_top3

    [
    t_customer,
    t_orders,
    t_order_items,
    t_product_rebuild,
        ] >> t_top5

## Шаг 3. Проверить, что запросы из пункта выше не вернули нулевое количество строк. 
В случае непрохождения проверки отправить уведомление на почту (print ошибки)

In [None]:
def assert_query_not_empty(sql: str):
    hook = PostgresHook(postgres_conn_id=CONN_ID)
    sql_clean = sql.strip().rstrip(";")
    check_sql = f"""
    SELECT COUNT(*) FROM (
        {sql_clean}
    ) t
    """
    with hook.get_conn() as conn, conn.cursor() as cur:
        cur.execute(check_sql)
        cnt = cur.fetchone()[0]
    if cnt == 0:
        print("Ошибка")
        raise ValueError("No rows returned - ERROR!!!!!")

### Доработка DAG

In [None]:
with DAG(
    dag_id="daily_parallel_insert_clear",
    start_date=datetime(2025, 1, 1, tz="Europe/Moscow"),
    schedule="0 19 * * *",
    catchup=False,
    tags=["etl", "strict", "public"],
) as dag:

    t_check_top3 = PythonOperator(
        task_id="check_top3_not_empty",
        python_callable=assert_query_not_empty,
        op_kwargs={"sql": SQL_TOP3},
        )

    t_check_top5 = PythonOperator(
        task_id="check_top5_not_empty",
        python_callable=assert_query_not_empty,
        op_kwargs={"sql": SQL_TOP5},
        )


### Дополнение тасков

In [None]:
    [
    t_customer,
    t_orders,
    t_order_items,
    t_product_rebuild,
        ] >> t_top3 >> t_check_top3
    [
    t_customer,
    t_orders,
    t_order_items,
    t_product_rebuild,
        ] >> t_top5 >> t_check_top5

## Шаг 4. Вывести сообщение об успешном или неуспешном выполнении DAG.

### Функции

In [None]:
def print_success():
    print("Успешно")

def print_failure():
    print("Ошибка")

### Доработка DAG

In [None]:
with DAG(
    dag_id="daily_parallel_insert_clear",
    start_date=datetime(2025, 1, 1, tz="Europe/Moscow"),
    schedule="0 19 * * *",
    catchup=False,
    tags=["etl", "strict", "public"],
) as dag:
    
    t_success = PythonOperator(
        task_id="print_success",
        python_callable=print_success,
        trigger_rule="all_success",
        )

    t_failure = PythonOperator(
        task_id="print_failure",
        python_callable=print_failure,
        trigger_rule="one_failed",
        )

### Добавление тасков

In [None]:
    [t_check_top3, t_check_top5] >> t_success   
    [t_check_top3, t_check_top5] >> t_failure