In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col

spark = SparkSession.builder \
    .appName("Postgres Integration") \
    .getOrCreate()

db_url = "jdbc:postgresql://postgres:5432/dwh"
db_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [21]:
def read_from_postgres(table_name):
    return spark.read \
        .format("jdbc") \
        .option("url", db_url) \
        .option("dbtable", table_name) \
        .option("user", db_properties["user"]) \
        .option("password", db_properties["password"]) \
        .option("driver", db_properties["driver"]) \
        .load()

In [22]:
def write_to_postgres(df, table_name):
    df.write \
        .format("jdbc") \
        .option("url", db_url) \
        .option("dbtable", table_name) \
        .option("user", db_properties["user"]) \
        .option("password", db_properties["password"]) \
        .option("driver", db_properties["driver"]) \
        .mode("append") \
        .save()

### Заполнение таблицы измерений и фактов в DWH

In [24]:
# Чтение данных из таблиц источников
source_tables = {
    "source1.craft_market_wide": "source1_df",
    "source2.craft_market_masters_products": "source2_masters_products_df",
    "source2.craft_market_orders_customers": "source2_orders_customers_df",
    "source3.craft_market_craftsmans": "source3_craftsmans_df",
    "source3.craft_market_customers": "source3_customers_df",
    "source3.craft_market_orders": "source3_orders_df"
}

dataframes = {name: read_from_postgres(table) for table, name in source_tables.items()}

# Загрузка данных в измерения

# d_customers
customers_df = dataframes["source1_df"].select(
    col("customer_name").alias("customer_name"),
    col("customer_address").alias("customer_address"),
    col("customer_birthday").alias("customer_birthday"),
    col("customer_email").alias("customer_email")
).distinct().withColumn("load_dttm", current_timestamp())

write_to_postgres(customers_df, "dwh.d_customers")

# d_products
products_df = dataframes["source1_df"].select(
    col("product_name").alias("product_name"),
    col("product_description").alias("product_description"),
    col("product_type").alias("product_type"),
    col("product_price").alias("product_price")
).distinct().withColumn("load_dttm", current_timestamp())

write_to_postgres(products_df, "dwh.d_products")

# d_craftsmans
craftsmans_df = dataframes["source1_df"].select(
    col("craftsman_name").alias("craftsman_name"),
    col("craftsman_address").alias("craftsman_address"),
    col("craftsman_birthday").alias("craftsman_birthday"),
    col("craftsman_email").alias("craftsman_email")
).distinct().withColumn("load_dttm", current_timestamp())

write_to_postgres(craftsmans_df, "dwh.d_craftsmans")

# Загрузка данных в таблицу фактов

# f_orders
orders_df = dataframes["source1_df"].select(
    col("craftsman_id").alias("craftsman_id"),
    col("customer_id").alias("customer_id"),
    col("order_created_date").alias("order_created_date"),
    col("order_completion_date").alias("order_completion_date"),
    col("order_status").alias("order_status")
).withColumn("load_dttm", current_timestamp())

write_to_postgres(orders_df, "dwh.f_orders")

In [None]:
spark.stop()

### Инкрементальный код заполнения таблицы витрин данных из таблиц измерений и фактов в DWH

In [94]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, datediff, col, lit, year, month, current_date, count, sum, expr, concat

spark = SparkSession.builder \
    .appName("Postgres Integration") \
    .getOrCreate()

db_url = "jdbc:postgresql://postgres:5432/dwh"
db_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [95]:
def read_from_postgres(table_name):
    return spark.read \
        .format("jdbc") \
        .option("url", db_url) \
        .option("dbtable", table_name) \
        .option("user", db_properties["user"]) \
        .option("password", db_properties["password"]) \
        .option("driver", db_properties["driver"]) \
        .load()

In [101]:
report_df = orders_df.join(craftsmans_df, "craftsman_id") \
    .join(customers_df, "customer_id") \
    .join(products_df, "product_id") \
    .groupBy(
        col("craftsman_id"),
        col("craftsman_name"),
        col("craftsman_address"),
        col("craftsman_birthday"),
        col("craftsman_email"),
        expr("DATE_FORMAT(order_created_date, 'yyyy-MM')").alias("report_period")
    ).agg(
        sum(expr("product_price * 0.9")).alias("craftsman_money"),
        sum(expr("product_price * 0.1")).alias("platform_money"),
        count("order_id").alias("count_order"),
        avg("product_price").alias("avg_price_order"),
        avg(expr("year(current_date()) - year(customer_birthday)")).alias("avg_age_customer"),
        expr("PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY datediff(order_completion_date, order_created_date))").alias("median_time_order_completed"),
        expr("FIRST(product_type)").alias("top_product_category"),  # Вычисление самой популярной категории товаров
        count(when(col("order_status") == "created", 1)).alias("count_order_created"),
        count(when(col("order_status") == "in progress", 1)).alias("count_order_in_progress"),
        count(when(col("order_status") == "delivery", 1)).alias("count_order_delivery"),
        count(when(col("order_status") == "done", 1)).alias("count_order_done"),
        count(when(col("order_status") != "done", 1)).alias("count_order_not_done")
    )

In [118]:
def upsert_to_postgres(df, table_name, unique_keys):
    temp_table_name = f"{table_name}_temp"

    # Записываем данные во временную таблицу
    df.write \
        .format("jdbc") \
        .option("url", db_url) \
        .option("dbtable", temp_table_name) \
        .option("user", db_properties["user"]) \
        .option("password", db_properties["password"]) \
        .option("driver", db_properties["driver"]) \
        .mode("overwrite") \
        .save()

    unique_keys_condition = " AND ".join([f"target.{key} = source.{key}" for key in unique_keys])
    update_assignments = ", ".join([f"{col} = source.{col}" for col in df.columns if col not in unique_keys])
    insert_columns = ", ".join(df.columns)
    insert_values = ", ".join([f"source.{col}" for col in df.columns])

    merge_query = f"""
        -- Обновляем существующие записи
        UPDATE {table_name} AS target
        SET {update_assignments}
        FROM {temp_table_name} AS source
        WHERE {unique_keys_condition};

        -- Вставляем новые записи
        INSERT INTO {table_name} ({insert_columns})
        SELECT {insert_columns}
        FROM {temp_table_name} AS source
        WHERE NOT EXISTS (
            SELECT 1
            FROM {table_name} AS target
            WHERE {unique_keys_condition}
        );
    """

    load_date_query = """
        INSERT INTO dwh.load_dates_craftsman_report_datamart (load_dttm)
        VALUES (CURRENT_TIMESTAMP)
    """

    conn = None
    try:
        conn = spark._sc._jvm.java.sql.DriverManager.getConnection(db_url, db_properties["user"], db_properties["password"])
        cursor = conn.createStatement()

        cursor.execute(merge_query)
        cursor.execute(load_date_query)

    except Exception as e:
        raise e
    finally:
        if conn:
            conn.close()

In [119]:
craftsmans_df = read_from_postgres("dwh.d_craftsmans")
customers_df = read_from_postgres("dwh.d_customers")
products_df = read_from_postgres("dwh.d_products")
orders_df = read_from_postgres("dwh.f_orders")

In [120]:
orders_df = orders_df.withColumn(
    "report_period",
    concat(year(col("order_created_date")), lit("-"), month(col("order_created_date")))
)

In [121]:
upsert_to_postgres(report_df, "dwh.craftsman_report_datamart", ["craftsman_id", "report_period"])

In [122]:
spark.stop()

### Тесты

In [127]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import current_timestamp, to_date

spark = SparkSession.builder \
    .appName("Postgres Integration") \
    .getOrCreate()

db_url = "jdbc:postgresql://postgres:5432/dwh"
db_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [132]:
# Данные для первоначального добавления
initial_source_df = spark.createDataFrame([
    Row(
        craftsman_id=1,
        craftsman_name="John Doe",
        craftsman_address="123 Main St",
        craftsman_birthday="1980-01-01",
        craftsman_email="johndoe@example.com",
        craftsman_money=1000.00,
        platform_money=100.00,
        count_order=10,
        avg_price_order=100.00,
        avg_age_customer=30.0,
        median_time_order_completed=5.0,
        top_product_category="Woodwork",
        count_order_created=2,
        count_order_in_progress=3,
        count_order_delivery=1,
        count_order_done=4,
        count_order_not_done=2,
        report_period="2024-12"
    )
])


initial_source_df = initial_source_df.withColumn("craftsman_birthday", to_date("craftsman_birthday"))
upsert_to_postgres(initial_source_df, "dwh.craftsman_report_datamart", ["craftsman_id", "report_period"])

print("Данные до обновления:")
vitrine_before_update = spark.read.format("jdbc").options(
    url=db_url,
    dbtable="dwh.craftsman_report_datamart",
    user=db_properties["user"],
    password=db_properties["password"],
    driver=db_properties["driver"]
).load()

vitrine_before_update.filter(
    (vitrine_before_update["craftsman_id"] == 1) & 
    (vitrine_before_update["report_period"] == "2024-12")
).show()


updated_source_df = spark.createDataFrame([
    Row(
        craftsman_id=1,
        craftsman_name="John Doe Updated",
        craftsman_address="456 New Ave",
        craftsman_birthday="1980-01-01",
        craftsman_email="johndoe_updated@example.com",
        craftsman_money=1200.00,
        platform_money=120.00,
        count_order=12,
        avg_price_order=110.00,
        avg_age_customer=35.0,
        median_time_order_completed=4.0,
        top_product_category="Jewelry",
        count_order_created=5,
        count_order_in_progress=4,
        count_order_delivery=3,
        count_order_done=8,
        count_order_not_done=3,
        report_period="2024-12"
    )
])


updated_source_df = updated_source_df.withColumn("craftsman_birthday", to_date("craftsman_birthday"))
upsert_to_postgres(updated_source_df, "dwh.craftsman_report_datamart", ["craftsman_id", "report_period"])

print("Данные после обновления:")
vitrine_after_update = spark.read.format("jdbc").options(
    url=db_url,
    dbtable="dwh.craftsman_report_datamart",
    user=db_properties["user"],
    password=db_properties["password"],
    driver=db_properties["driver"]
).load()

vitrine_after_update.filter(
    (vitrine_after_update["craftsman_id"] == 1) & 
    (vitrine_after_update["report_period"] == "2024-12")
).show()

Данные до обновления:
+----+------------+--------------+-----------------+------------------+-------------------+---------------+--------------+-----------+---------------+----------------+---------------------------+--------------------+-------------------+-----------------------+--------------------+----------------+--------------------+-------------+
|  id|craftsman_id|craftsman_name|craftsman_address|craftsman_birthday|    craftsman_email|craftsman_money|platform_money|count_order|avg_price_order|avg_age_customer|median_time_order_completed|top_product_category|count_order_created|count_order_in_progress|count_order_delivery|count_order_done|count_order_not_done|report_period|
+----+------------+--------------+-----------------+------------------+-------------------+---------------+--------------+-----------+---------------+----------------+---------------------------+--------------------+-------------------+-----------------------+--------------------+----------------+------------