In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, col
from functools import reduce
import logging

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Создание SparkSession
spark = SparkSession.builder \
    .appName("Dimensions and Facts") \
    .config("spark.jars", "/home/jovyan/work/postgresql-42.7.1.jar") \
    .getOrCreate()

# Параметры подключения к PostgreSQL
jdbc_url = "jdbc:postgresql://craft_market_db:5432/craft_market"
connection_properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

# Функция для чтения данных из PostgreSQL
def read_table(table_name, schema):
    try:
        logger.info(f"Чтение данных из таблицы {schema}.{table_name}")
        df = spark.read.jdbc(
            url=jdbc_url,
            table=f"{schema}.{table_name}",
            properties=connection_properties
        )
        logger.info(f"Успешно прочитано {df.count()} строк из {schema}.{table_name}")
        return df
    except Exception as e:
        logger.error(f"Ошибка при чтении таблицы {schema}.{table_name}: {str(e)}")
        raise

def write_table(df, table_name, schema, columns=None):
    try:
        logger.info(f"Запись данных в таблицу {schema}.{table_name}")
        # Если указаны столбцы, выбираем только их
        if columns:
            df = df.select(*columns)
        
        # Записываем данные в таблицу
        df.write.jdbc(
            url=jdbc_url,
            table=f"{schema}.{table_name}",
            mode="append",
            properties=connection_properties
        )
        logger.info(f"Данные успешно записаны в {schema}.{table_name}")
    except Exception as e:
        logger.error(f"Ошибка при записи в таблицу {schema}.{table_name}: {str(e)}")
        raise

def union_all(*dfs):
    return reduce(DataFrame.unionByName, dfs)

try:
    # Чтение данных из источников
    source1_data = read_table("craft_market_wide", "source1")
    source2_masters_products = read_table("craft_market_masters_products", "source2")
    source2_orders_customers = read_table("craft_market_orders_customers", "source2")
    
    # Для source3 читаем сразу все три таблицы
    source3_craftsmans = read_table("craft_market_craftsmans", "source3")
    source3_customers = read_table("craft_market_customers", "source3")
    source3_orders    = read_table("craft_market_orders", "source3")
    
    # 1. Загрузка измерений (dimensions)
    # d_customers (используем customer_email как бизнес-ключ)
    logger.info("Заполнение таблицы d_customers")
    customers_df1 = source1_data.select(
        col("customer_email"),
        col("customer_name"),
        col("customer_address"),
        col("customer_birthday").cast("date")
    ).distinct()

    customers_df2 = source2_orders_customers.select(
        col("customer_email"),
        col("customer_name"),
        col("customer_address"),
        col("customer_birthday").cast("date")
    ).distinct()

    customers_df3 = source3_customers.select(
        col("customer_email"),
        col("customer_name"),
        col("customer_address"),
        col("customer_birthday").cast("date")
    ).distinct()

    customers_df_final = union_all(customers_df1, customers_df2, customers_df3).distinct()\
        .withColumn("load_dttm", current_timestamp())
    write_table(customers_df_final, "d_customers", "dwh", 
                columns=["customer_name", "customer_address", "customer_birthday", "customer_email", "load_dttm"])
    
    # d_craftsmans (используем craftsman_email как бизнес-ключ)
    logger.info("Заполнение таблицы d_craftsmans")
    craftsmans_df1 = source1_data.select(
        col("craftsman_email"),
        col("craftsman_name"),
        col("craftsman_address"),
        col("craftsman_birthday").cast("date")
    ).distinct()

    craftsmans_df2 = source2_masters_products.select(
        col("craftsman_email"),
        col("craftsman_name"),
        col("craftsman_address"),
        col("craftsman_birthday").cast("date")
    ).distinct()

    craftsmans_df3 = source3_craftsmans.select(
        col("craftsman_email"),
        col("craftsman_name"),
        col("craftsman_address"),
        col("craftsman_birthday").cast("date")
    ).distinct()

    craftsmans_df_final = union_all(craftsmans_df1, craftsmans_df2, craftsmans_df3).distinct()\
        .withColumn("load_dttm", current_timestamp())
    write_table(craftsmans_df_final, "d_craftsmans", "dwh", 
                columns=["craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email", "load_dttm"])
    
    # d_products (используем product_name как бизнес-ключ)
    logger.info("Заполнение таблицы d_products")
    products_df1 = source1_data.select(
        col("product_name"),
        col("product_description"),
        col("product_type"),
        col("product_price").cast("bigint")
    ).distinct()

    products_df2 = source2_masters_products.select(
        col("product_name"),
        col("product_description"),
        col("product_type"),
        col("product_price").cast("bigint")
    ).distinct()

    products_df3 = source3_orders.select(
        col("product_name"),
        col("product_description"),
        col("product_type"),
        col("product_price").cast("bigint")
    ).distinct()

    products_df_final = union_all(products_df1, products_df2, products_df3).distinct()\
        .withColumn("load_dttm", current_timestamp())
    write_table(products_df_final, "d_products", "dwh", 
                columns=["product_name", "product_description", "product_type", "product_price", "load_dttm"])


    # 2. Подготовка данных для фактов (fact orders)
    logger.info("Подготовка данных заказов из источников")
    orders_df1 = source1_data.select(
        col("order_id").cast("bigint"),
        col("order_created_date").cast("date"),
        col("order_completion_date").cast("date"),
        col("order_status"),
        col("customer_email"),
        col("craftsman_email"),
        col("product_name")
    )
    
    orders_df2 = source2_orders_customers.alias("oc")\
        .join(source2_masters_products.alias("mp"), col("oc.craftsman_id") == col("mp.craftsman_id"))\
        .select(
            col("oc.order_id").cast("bigint"),
            col("oc.order_created_date").cast("date"),
            col("oc.order_completion_date").cast("date"),
            col("oc.order_status"),
            col("oc.customer_email"),
            col("mp.craftsman_email"),
            col("mp.product_name")
        )
    
    orders_df3 = source3_orders.alias("o")\
        .join(source3_craftsmans.alias("csm"), col("o.craftsman_id") == col("csm.craftsman_id"))\
        .join(source3_customers.alias("cs"), col("o.customer_id") == col("cs.customer_id"))\
        .select(
            col("o.order_id").cast("bigint"),
            col("o.order_created_date").cast("date"),
            col("o.order_completion_date").cast("date"),
            col("o.order_status"),
            col("cs.customer_email"),
            col("csm.craftsman_email"),
            col("o.product_name")
        )
    
    orders_df_union = union_all(orders_df1, orders_df2, orders_df3).distinct()

    # 3. Читаем измерения, которые уже записаны в DWH, для получения сгенерированных идентификаторов.
    d_customers = read_table("d_customers", "dwh").select("customer_id", "customer_email")
    d_craftsmans = read_table("d_craftsmans", "dwh").select("craftsman_id", "craftsman_email")
    d_products   = read_table("d_products", "dwh").select("product_id", "product_name")
    
    orders_enriched = orders_df_union \
        .join(d_customers, on="customer_email", how="left") \
        .join(d_craftsmans, on="craftsman_email", how="left") \
        .join(d_products, on="product_name", how="left") \
        .withColumn("load_dttm", current_timestamp())
    
    # 4. Подготовка итогового набора данных для записи в таблицу фактов f_orders
    orders_final = orders_enriched.select(
        col("craftsman_id"),
        col("customer_id"),
        col("product_id"),             
        col("order_created_date"),
        col("order_completion_date"),
        col("order_status"),
        col("load_dttm")
    )
    
    # 5. Запись данных в таблицу фактов f_orders
    logger.info("Заполнение таблицы фактов f_orders")
    write_table(orders_final, "f_orders", "dwh", 
                columns=["craftsman_id", "customer_id", "product_id", "order_created_date", "order_completion_date", "order_status", "load_dttm"])
    logger.info("ETL-процесс успешно завершён")

except Exception as e:
    logger.error(f"Ошибка в ETL-процессе: {str(e)}")
    raise

finally:
    spark.stop()
    logger.info("SparkSession остановлен")


2025-02-19 14:13:20,590 - INFO - Чтение данных из таблицы source1.craft_market_wide
2025-02-19 14:13:26,281 - INFO - Успешно прочитано 999 строк из source1.craft_market_wide
2025-02-19 14:13:26,283 - INFO - Чтение данных из таблицы source2.craft_market_masters_products
2025-02-19 14:13:26,701 - INFO - Успешно прочитано 999 строк из source2.craft_market_masters_products
2025-02-19 14:13:26,703 - INFO - Чтение данных из таблицы source2.craft_market_orders_customers
2025-02-19 14:13:27,048 - INFO - Успешно прочитано 999 строк из source2.craft_market_orders_customers
2025-02-19 14:13:27,051 - INFO - Чтение данных из таблицы source3.craft_market_craftsmans
2025-02-19 14:13:27,386 - INFO - Успешно прочитано 999 строк из source3.craft_market_craftsmans
2025-02-19 14:13:27,389 - INFO - Чтение данных из таблицы source3.craft_market_customers
2025-02-19 14:13:27,675 - INFO - Успешно прочитано 999 строк из source3.craft_market_customers
2025-02-19 14:13:27,677 - INFO - Чтение данных из таблицы so

In [2]:
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.functions import current_timestamp, col, date_format, sum, count, avg, percentile_approx, row_number, coalesce, lit, max, first, year, when
import logging

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Создание SparkSession
spark = SparkSession.builder \
    .appName("DWH ETL - Data Mart") \
    .config("spark.jars", "/home/jovyan/work/postgresql-42.7.1.jar") \
    .getOrCreate()

# Параметры подключения к PostgreSQL
jdbc_url = "jdbc:postgresql://craft_market_db:5432/craft_market"
connection_properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

# Функция для чтения данных из PostgreSQL
def read_table(table_name, schema):
    try:
        logger.info(f"Чтение данных из таблицы {schema}.{table_name}")
        df = spark.read.jdbc(
            url=jdbc_url,
            table=f"{schema}.{table_name}",
            properties=connection_properties
        )
        logger.info(f"Успешно прочитано {df.count()} строк из {schema}.{table_name}")
        return df
    except Exception as e:
        logger.error(f"Ошибка при чтении таблицы {schema}.{table_name}: {str(e)}")
        raise

# Функция для записи данных в PostgreSQL
def write_table(df, table_name, schema):
    try:
        logger.info(f"Запись данных в таблицу {schema}.{table_name}")
        df.write.jdbc(
            url=jdbc_url,
            table=f"{schema}.{table_name}",
            mode="append",  # Используем append для витрины
            properties=connection_properties
        )
        logger.info(f"Данные успешно записаны в {schema}.{table_name}")
    except Exception as e:
        logger.error(f"Ошибка при записи в таблицу {schema}.{table_name}: {str(e)}")
        raise
# Загрузка витрины данных
def load_datamart():
  try:
    # 1. Читаем последнюю дату загрузки
    try:
        last_load_date_df = spark.read.jdbc(url=jdbc_url, table="dwh.load_dates_craftsman_report_datamart", properties=connection_properties)
        last_load_date = last_load_date_df.select(max("load_dttm")).first()[0]
        logger.info(f"Последняя дата загрузки: {last_load_date}")
    except Exception as e:
        last_load_date = None  # Если таблицы нет или она пуста
        logger.info(f"Таблица с датами загрузки пуста или не существует, загружаем все данные.{e}")

    # 2. Читаем данные из таблиц DWH
    d_craftsmans = read_table("d_craftsmans", "dwh")
    d_customers = read_table("d_customers", "dwh")
    d_products = read_table("d_products", "dwh")
    f_orders = read_table("f_orders", "dwh")

    if last_load_date:
        d_craftsmans = d_craftsmans.filter(col("load_dttm") >= last_load_date)
        d_customers = d_customers.filter(col("load_dttm") >= last_load_date)
        d_products = d_products.filter(col("load_dttm") >= last_load_date)
        f_orders = f_orders.filter(col("load_dttm") >= last_load_date)

    # 3. Объединяем таблицы
    df = f_orders.join(d_craftsmans, "craftsman_id") \
                 .join(d_customers, "customer_id") \
                 .join(d_products, "product_id")

    # 4. Вычисляем агрегаты
    df = df.withColumn("report_period", date_format(col("order_created_date"), "yyyy-MM"))

    # 5. Группируем по мастеру и отчетному периоду
    df_agg = df.groupBy("craftsman_id", "report_period") \
              .agg(
                  first("craftsman_name").alias("craftsman_name"),
                  first("craftsman_address").alias("craftsman_address"),
                  first("craftsman_birthday").alias("craftsman_birthday"),
                  first("craftsman_email").alias("craftsman_email"),
                  (sum(col("product_price")) * 0.9).cast("decimal(15,2)").alias("craftsman_money"),
                  (sum(col("product_price")) * 0.1).cast("bigint").alias("platform_money"),
                  count("order_id").alias("count_order"),
                  (avg(col("product_price"))).cast("decimal(10,2)").alias("avg_price_order"),
                  avg(year(current_timestamp()) - year(col("customer_birthday"))).cast("decimal(3,1)").alias("avg_age_customer"),
                  percentile_approx((col("order_completion_date").cast("long") - col("order_created_date").cast("long")) / (24 * 60 * 60), 0.5).cast("decimal(10,1)").alias("median_time_order_completed"),  # Медиана в днях
                  sum(when(col("order_status") == "created", 1).otherwise(0)).alias("count_order_created"),
                  sum(when(col("order_status") == "in progress", 1).otherwise(0)).alias("count_order_in_progress"),
                  sum(when(col("order_status") == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
                  sum(when(col("order_status") == "done", 1).otherwise(0)).alias("count_order_done"),
                  sum(when(col("order_status").isin(["created", "in progress", "delivery"]), 1).otherwise(0)).alias("count_order_not_done")
              )
    # Оконная функция для определения самой популярной категории
    windowSpec = Window.partitionBy("craftsman_id", "report_period").orderBy(col("product_count").desc())

    df_with_top_category = df.groupBy("craftsman_id", "report_period", "product_type") \
        .agg(count("product_id").alias("product_count")) \
        .withColumn("row_number", row_number().over(windowSpec))

    top_category = df_with_top_category.filter(col("row_number") == 1).select(col("craftsman_id"),col("report_period"),col("product_type").alias("top_product_category"))

    df_agg = df_agg.join(top_category, ["craftsman_id", "report_period"], "left")

    # 6. Записываем результат в витрину
    write_table(df_agg, "craftsman_report_datamart", "dwh")

    # 7. Обновляем дату последней загрузки
    load_date_df = spark.sql("select current_date() as load_dttm")
    write_table(load_date_df, "load_dates_craftsman_report_datamart", "dwh")
    logger.info("Загрузка витрины данных завершена")

  except Exception as e:
        logger.error(f"Ошибка при загрузке витрины: {str(e)}")
        raise
load_datamart()
spark.stop()
logger.info("SparkSession остановлен")

2025-02-19 14:14:59,306 - INFO - Последняя дата загрузки: 2025-02-19
2025-02-19 14:14:59,307 - INFO - Чтение данных из таблицы dwh.d_craftsmans
2025-02-19 14:14:59,445 - INFO - Успешно прочитано 17970 строк из dwh.d_craftsmans
2025-02-19 14:14:59,446 - INFO - Чтение данных из таблицы dwh.d_customers
2025-02-19 14:14:59,570 - INFO - Успешно прочитано 17982 строк из dwh.d_customers
2025-02-19 14:14:59,571 - INFO - Чтение данных из таблицы dwh.d_products
2025-02-19 14:14:59,693 - INFO - Успешно прочитано 17964 строк из dwh.d_products
2025-02-19 14:14:59,694 - INFO - Чтение данных из таблицы dwh.f_orders
2025-02-19 14:15:01,327 - INFO - Успешно прочитано 2186478 строк из dwh.f_orders
2025-02-19 14:15:01,765 - INFO - Запись данных в таблицу dwh.craftsman_report_datamart
2025-02-19 14:15:27,029 - INFO - Данные успешно записаны в dwh.craftsman_report_datamart
2025-02-19 14:15:27,165 - INFO - Запись данных в таблицу dwh.load_dates_craftsman_report_datamart
2025-02-19 14:15:27,361 - INFO - Данн