## Preparing sources to DWH

In [199]:
from pyspark.sql import SparkSession

In [200]:
from pyspark.sql.functions import col, lit, current_timestamp

In [338]:
# Создаём SparkSession
spark = SparkSession.builder \
    .appName("ETL to DWH") \
    .getOrCreate()

In [339]:
db_url = "jdbc:postgresql://postgres_container:5432/postgres_db"

db_properties = {
    "user": "postgres_user",
    "password": "postgres_password",
    "driver": "org.postgresql.Driver"
}

In [341]:
# Функция для чтения таблиц из бд
def read_table(schema, table_name):
    return spark.read.jdbc(
        url=db_url,
        table=f"{schema}.{table_name}",
        properties=db_properties,
    )

In [342]:
# Считываем данные из источников
df_craft_market_wide = read_table("source1", "craft_market_wide")
df_craft_market_orders_customers = read_table("source2", "craft_market_orders_customers")
df_craft_market_masters_products = read_table("source2", "craft_market_masters_products")
df_craft_market_orders = read_table("source3", "craft_market_orders")
df_craft_market_craftmens = read_table("source3", "craft_market_craftsmans")
df_craft_market_customers = read_table("source3", "craft_market_customers")

# Маппируем чтобы все источники обрабатывались в одном формате для дальнейшего их union'a

In [343]:
# поля кроме id для 1 источника
columns_to_select = [col for col in df_craft_market_wide.columns if col != 'id']

In [344]:
columns_to_select

['craftsman_id',
 'craftsman_name',
 'craftsman_address',
 'craftsman_birthday',
 'craftsman_email',
 'product_id',
 'product_name',
 'product_description',
 'product_type',
 'product_price',
 'order_id',
 'order_created_date',
 'order_completion_date',
 'order_status',
 'customer_id',
 'customer_name',
 'customer_address',
 'customer_birthday',
 'customer_email']

In [321]:
source_1 = df_craft_market_wide.select(*columns_to_select)

In [346]:
# препарирование второго источника
source_2 = df_craft_market_orders_customers.join(
    df_craft_market_masters_products,
    df_craft_market_orders_customers.craftsman_id  == df_craft_market_masters_products.craftsman_id
).select(
    df_craft_market_masters_products.craftsman_id,
    df_craft_market_masters_products.craftsman_name,
    df_craft_market_masters_products.craftsman_address,
    df_craft_market_masters_products.craftsman_birthday,
    df_craft_market_masters_products.craftsman_email,
    df_craft_market_masters_products.product_id,
    df_craft_market_masters_products.product_name,
    df_craft_market_masters_products.product_description,
    df_craft_market_masters_products.product_type,
    df_craft_market_masters_products.product_price,
    df_craft_market_orders_customers.order_id,
    df_craft_market_orders_customers.order_created_date,
    df_craft_market_orders_customers.order_completion_date,
    df_craft_market_orders_customers.order_status,
    df_craft_market_orders_customers.customer_id,
    df_craft_market_orders_customers.customer_name,
    df_craft_market_orders_customers.customer_address,
    df_craft_market_orders_customers.customer_birthday,
    df_craft_market_orders_customers.customer_email,
)

In [347]:
# препарирование третьего источника
source_3 = df_craft_market_orders.join(
    df_craft_market_craftmens,
    df_craft_market_orders.craftsman_id == df_craft_market_craftmens.craftsman_id
).join(
    df_craft_market_customers,
    df_craft_market_orders.customer_id == df_craft_market_customers.customer_id
).select(
    df_craft_market_craftmens.craftsman_id,
    df_craft_market_craftmens.craftsman_name,
    df_craft_market_craftmens.craftsman_address,
    df_craft_market_craftmens.craftsman_birthday,
    df_craft_market_craftmens.craftsman_email,
    df_craft_market_orders.product_id,
    df_craft_market_orders.product_name,
    df_craft_market_orders.product_description,
    df_craft_market_orders.product_type,
    df_craft_market_orders.product_price,
    df_craft_market_orders.order_id,
    df_craft_market_orders.order_created_date,
    df_craft_market_orders.order_completion_date,
    df_craft_market_orders.order_status,
    df_craft_market_customers.customer_id,
    df_craft_market_customers.customer_name,
    df_craft_market_customers.customer_address,
    df_craft_market_customers.customer_birthday,
    df_craft_market_customers.customer_email
)

In [349]:
combined_sources = source_1.union(source_2).union(source_3).distinct() # Соединяем все источники, отсекаем дубли

## Напишите код на Spark, который будет заполнять сначала таблицы измерений и фактов в DWH

In [350]:
# функция для записи в двх
def write_table(df, schema, table_name):
    df.write.jdbc(
        url=db_url,
        table=f"{schema}.{table_name}",
        mode="append",
        properties=db_properties
    )

In [351]:
# функция для преобразования и загрузки справочников в двх
def update_dwh_dimension(combined_sources, dwh_table_name, key_columns, schema="dwh"):
    dwh_table = read_table(schema, dwh_table_name)
    need_columns = [col for col in dwh_table.columns if col not in key_columns]
    new_records = combined_sources.select(need_columns).distinct().exceptAll(dwh_table.select(need_columns)).withColumn("load_dttm", current_timestamp())
    write_table(new_records, schema, dwh_table_name)
    print(f"Updated {dwh_table_name}: {new_records.count()} new rows added.")


In [509]:
# загружаем данные в таблицы-справочники, без id т.к. они будут генерится
update_dwh_dimension(combined_sources, "d_craftsmans", ["craftsman_id", "load_dttm"])
update_dwh_dimension(combined_sources, "d_customers", ["customer_id", "load_dttm"])
update_dwh_dimension(combined_sources, "d_products", ["product_id", "load_dttm"])

Updated d_craftsmans: 0 new rows added.
Updated d_customers: 0 new rows added.
Updated d_products: 0 new rows added.


In [510]:
# Считываем таблицу фактов f_orders
f_orders = read_table("dwh", "f_orders")
# Собираем ее должным образом
f_orders_new = (
    combined_sources
    .join(d_products, combined_sources["product_id"] == d_products["product_id"], "left")
    .join(d_craftsmans, combined_sources["craftsman_id"] == d_craftsmans["craftsman_id"], "left")
    .join(d_customers, combined_sources["customer_id"] == d_customers["customer_id"], "left")
    .select(
        d_products["product_id"],
        d_craftsmans["craftsman_id"],
        d_customers["customer_id"],
        combined_sources["order_created_date"],
        combined_sources["order_completion_date"],
        combined_sources["order_status"],
        current_timestamp().alias("load_dttm")
    )
)

write_table(f_orders_new, "dwh", "f_orders")
print(f"Updated {'f_orders'}: {f_orders_new.count()} new rows added.")

f_orders.show()

Updated f_orders: 2997 new rows added.
+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|order_id|product_id|craftsman_id|customer_id|order_created_date|order_completion_date|order_status|           load_dttm|
+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|       1|        86|          86|         86|        2022-07-18|           2022-07-21|        done|2024-12-24 23:55:...|
|       2|       770|         770|        770|        2022-06-30|           2022-07-01|        done|2024-12-24 23:55:...|
|       3|       148|         148|        148|        2022-10-25|           2022-10-27|        done|2024-12-24 23:55:...|
|       4|       505|         505|        505|        2022-07-05|           2022-07-08|        done|2024-12-24 23:55:...|
|       5|       594|         594|        594|        2022-05-16|           2022-05-18|        done|2024-12

## Затем напишите код на Spark, который заполнит таблицу витрины данных из данных таблиц измерений и фактов в DWH. Напишите код инкрементальным, чтобы можно было забирать только измененные данные на источниках.

In [467]:
from pyspark.sql.functions import sum, avg, count, expr, row_number, when, current_date, date_format, coalesce
from pyspark.sql.window import Window
from datetime import date

In [588]:
# Чтение сформированных таблиц фактов и измерений из dwh
f_orders = read_table("dwh", "f_orders")
d_craftsmans = read_table("dwh", "d_craftsmans")
d_products = read_table("dwh", "d_products")
d_customers = read_table("dwh", "d_customers")

In [589]:
# Задаем отчетный период год и месяц
f_orders = f_orders.withColumn("report_period", date_format("order_created_date", "yyyy-MM"))

In [590]:
# Расчеты аггрегаций
new_craftsman_report_datamart = (
    f_orders
    .join(d_craftsmans, f_orders.craftsman_id == d_craftsmans["craftsman_id"])
    .join(d_products, f_orders.product_id == d_products["product_id"])
    .join(d_customers, f_orders.customer_id == d_customers["customer_id"])
).groupBy(
    d_craftsmans["craftsman_id"],
    d_craftsmans["craftsman_name"],
    d_craftsmans["craftsman_address"],
    d_craftsmans["craftsman_birthday"],
    d_craftsmans["craftsman_email"],
    f_orders.report_period
).agg(
    (sum(d_products["product_price"] * 0.9).cast("decimal(15,2)")).alias("craftsman_money"), # сумма денег, которую заработал мастер (-10% на платформы) за месяц
    (sum(d_products["product_price"] * 0.1).cast("bigint")).alias("platform_money"), # сумма денег, которая заработала платформа от продаж мастера за месяц
    count(f_orders.order_id).alias("count_order"), # количество заказов у мастера за месяц
    (avg(d_products["product_price"]).cast("decimal(10,2)")).alias("avg_price_order"), # средняя стоимость одного заказа у мастера за месяц
    (avg(expr("DATEDIFF(current_date(), customer_birthday) / 365.25")).cast("decimal(3,1)")).alias("avg_age_customer"), # средний возраст покупателей
    (median(coalesce(expr("DATEDIFF(order_completion_date, order_created_date)"), lit(0))).cast("decimal(10,1)")).alias("median_time_order_completed"), # медианное время в днях от момента создания заказа до его завершения  за месяц
    sum(when(f_orders.order_status == "created", 1).otherwise(0)).alias("count_order_created"), # количество созданных заказов за месяц
    sum(when(f_orders.order_status == "in_progress", 1).otherwise(0)).alias("count_order_in_progress"), # количество заказов в процессе изготовки за месяц
    sum(when(f_orders.order_status == "delivery", 1).otherwise(0)).alias("count_order_delivery"), # количество заказов в доставке за месяц
    sum(when(f_orders.order_status == "done", 1).otherwise(0)).alias("count_order_done"), # количество завершенных заказов за месяц
    sum(when(f_orders.order_status.isNull(), 1).otherwise(0)).alias("count_order_not_done") # количество незавершенных заказов за месяц
)

In [591]:
# Добавляем топ категорию у мастера за месяц
window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy(col("count_category").desc())

# Считаем для каждого мастера и периода
product_category_counts = (
    f_orders
    .join(d_products, d_products["product_price"] == f_orders.product_id)
).groupBy(
    f_orders.craftsman_id, f_orders.report_period, d_products["product_type"]
).agg(
    count(d_products["product_type"]).alias("count_category")
)

# Топ 1 категория
top_categories = product_category_counts.withColumn(
    "row_num", row_number().over(window_spec)
).filter(
    col("row_num") == 1
).select(
    f_orders.craftsman_id, f_orders.report_period, d_products["product_type"]
)

In [592]:
# Джойним категорию к витрине
new_craftsman_report_datamart = (
    new_craftsman_report_datamart
    .join(top_categories,["craftsman_id", "report_period"],"left")
    .withColumnRenamed("product_type", "top_product_category")
    .withColumn("top_product_category", coalesce(col("top_product_category"), lit("Unknown")))  # Заменяем NULL на "Unknown"
)

In [593]:
# Итоговая витрина
new_craftsman_report_datamart.show(3)

+------------+-------------+--------------+------------------+------------------+--------------------+---------------+--------------+-----------+---------------+----------------+---------------------------+-------------------+-----------------------+--------------------+----------------+--------------------+--------------------+
|craftsman_id|report_period|craftsman_name| craftsman_address|craftsman_birthday|     craftsman_email|craftsman_money|platform_money|count_order|avg_price_order|avg_age_customer|median_time_order_completed|count_order_created|count_order_in_progress|count_order_delivery|count_order_done|count_order_not_done|top_product_category|
+------------+-------------+--------------+------------------+------------------+--------------------+---------------+--------------+-----------+---------------+----------------+---------------------------+-------------------+-----------------------+--------------------+----------------+--------------------+--------------------+
|      

## Инкерментальная загрузка

In [449]:
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_values

In [620]:
# Считываем витрину
existing_craftsman_report_datamart = read_table("dwh", "craftsman_report_datamart")
# Ключевые столбцы и потенциально-измененные столбцы
key_columns = ["craftsman_id", "report_period"]
check_columns = [
    "craftsman_money", "platform_money", "count_order", "avg_price_order",
    "avg_age_customer", "median_time_order_completed", "count_order_created",
    "count_order_in_progress", "count_order_delivery", "count_order_done",
    "count_order_not_done", "top_product_category"
]

# Определяем новые строки которых нет в витрине
new_rows = new_craftsman_report_datamart.join(existing_craftsman_report_datamart, key_columns, "left_anti")
# Определяем обновленные строки (только если есть изменения в потенциально измененных столбцах
updated_rows = (
    new_craftsman_report_datamart.alias("new")
    .join(existing_craftsman_report_datamart.alias("existing"), key_columns, "inner")
    .filter(
        expr(
            " OR ".join([
                f"new.{col} != existing.{col}" for col in check_columns
            ])
        )
    )
    .select("new.*")
)


In [622]:
# Сначала обновление существующих строк через PostgreSQL, если есть изменения
if updated_rows.count() > 0:
    rows_to_update = updated_rows.collect()
    update_query = """
    INSERT INTO dwh.craftsman_report_datamart (
        craftsman_id, report_period, craftsman_money, platform_money, count_order, avg_price_order,
        avg_age_customer, median_time_order_completed, count_order_created, count_order_in_progress,
        count_order_delivery, count_order_done, count_order_not_done, top_product_category
    )
    VALUES %s
    ON CONFLICT (craftsman_id, report_period)
    DO UPDATE SET
        craftsman_money = EXCLUDED.craftsman_money,
        platform_money = EXCLUDED.platform_money,
        count_order = EXCLUDED.count_order,
        avg_price_order = EXCLUDED.avg_price_order,
        avg_age_customer = EXCLUDED.avg_age_customer,
        median_time_order_completed = EXCLUDED.median_time_order_completed,
        count_order_created = EXCLUDED.count_order_created,
        count_order_in_progress = EXCLUDED.count_order_in_progress,
        count_order_delivery = EXCLUDED.count_order_delivery,
        count_order_done = EXCLUDED.count_order_done,
        count_order_not_done = EXCLUDED.count_order_not_done,
        top_product_category = EXCLUDED.top_product_category
    """
    conn = psycopg2.connect(
        dbname="postgres_db",
        user="postgres_user",
        password="postgres_password",
        host="postgres_container",
        port="5432"
    )
    with conn.cursor() as cursor:
        values = [
            (
                row["craftsman_id"], row["report_period"], row["craftsman_money"], row["platform_money"],
                row["count_order"], row["avg_price_order"], row["avg_age_customer"],
                row["median_time_order_completed"], row["count_order_created"],
                row["count_order_in_progress"], row["count_order_delivery"], row["count_order_done"],
                row["count_order_not_done"], row["top_product_category"]
            )
            for row in rows_to_update
        ]
        execute_values(cursor, update_query, values)
    conn.commit()
    conn.close()

In [598]:
# Затем запись новых строк в витрину
if new_rows.count() > 0:
    write_table(new_rows, "dwh", "craftsman_report_datamart")

In [624]:
# Обновление метаинформации о загрузке
load_dates_df = spark.createDataFrame([{"load_dttm": datetime.now()}])
write_table(load_dates_df, "dwh", "load_dates_craftsman_report_datamart")
print("Updated load dates table.")

Updated load dates table.
