In [39]:
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import (
    current_timestamp, current_date, date_format, col, sum, avg, median, count, lit, expr, row_number, desc, when
)
from datetime import date

# Создаем SparkSession
spark = SparkSession.builder \
    .appName("Postgres-Spark") \
    .config("spark.jars", "/workspace/postgresql-42.7.4.jar") \
    .getOrCreate()

# Параметры подключения
jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
connection_properties = {
    "user": "eezimin",
    "password": "qwerty",
    "driver": "org.postgresql.Driver"
}


#### Загрузка источников

In [40]:
# Определяем функцию для чтения данных из Postgres
def read_from_postgres(schema, table):
    return spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/postgres") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"{schema}.{table}") \
        .option("user", "eezimin") \
        .option("password", "qwerty") \
        .load()

# Определяем функцию для записи данных в Postgres
def write_to_postgres(df, schema, table):
    df.write.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/postgres") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"{schema}.{table}") \
        .option("user", "eezimin") \
        .option("password", "qwerty") \
        .mode("append") \
        .save()


In [41]:
# Чтение данных из источников
source1_df = read_from_postgres("source1", "craft_market_wide")
source2_masters_products = read_from_postgres("source2", "craft_market_masters_products")
source2_orders_customers = read_from_postgres("source2", "craft_market_orders_customers")
source3_orders = read_from_postgres("source3", "craft_market_orders")
source3_craftsmans = read_from_postgres("source3", "craft_market_craftsmans")
source3_customers = read_from_postgres("source3", "craft_market_customers")

In [42]:
all_columns = [
    'order_id',
    'order_created_date',
    'order_completion_date',
    'order_status',
    'craftsman_id',
    'craftsman_name',
    'craftsman_address',
    'craftsman_birthday',
    'craftsman_email',
    'product_id',
    'product_name',
    'product_description',
    'product_type',
    'product_price',
    'customer_id',
    'customer_name',
    'customer_address',
    'customer_birthday',
    'customer_email' 
]

# создаем объединенную таблицу из таблиц 1-го источника
source1_df = source1_df.select(all_columns)

In [43]:
# создаем объединенную таблицу из таблиц 2-го источника
source2_df = source2_masters_products.join(
    source2_orders_customers,
    (source2_masters_products.product_id == source2_orders_customers.product_id) &
    (source2_masters_products.craftsman_id == source2_orders_customers.craftsman_id)
).select(
    source2_masters_products.craftsman_id,
    source2_masters_products.craftsman_name,
    source2_masters_products.craftsman_address,
    source2_masters_products.craftsman_birthday,
    source2_masters_products.craftsman_email,
    source2_masters_products.product_id,
    source2_masters_products.product_name,
    source2_masters_products.product_description,
    source2_masters_products.product_type,
    source2_masters_products.product_price,
    source2_orders_customers.customer_id,
    source2_orders_customers.customer_name,
    source2_orders_customers.customer_address,
    source2_orders_customers.customer_birthday,
    source2_orders_customers.customer_email,
    source2_orders_customers.order_id,
    source2_orders_customers.order_created_date,
    source2_orders_customers.order_completion_date,
    source2_orders_customers.order_status
).select(all_columns)

In [44]:
# создаем объединенную таблицу из таблиц 3-го источника
source3_df = source3_orders.join(
    source3_craftsmans,
    source3_orders.craftsman_id == source3_craftsmans.craftsman_id
).join(
    source3_customers,
    source3_orders.customer_id == source3_customers.customer_id
).select(
    source3_orders.order_id,
    source3_orders.order_created_date,
    source3_orders.order_completion_date,
    source3_orders.order_status,
    source3_craftsmans.craftsman_id,
    source3_craftsmans.craftsman_name,
    source3_craftsmans.craftsman_address,
    source3_craftsmans.craftsman_birthday,
    source3_craftsmans.craftsman_email,
    source3_orders.product_id,
    source3_orders.product_name,
    source3_orders.product_description,
    source3_orders.product_type,
    source3_orders.product_price,
    source3_customers.customer_id,
    source3_customers.customer_name,
    source3_customers.customer_address,
    source3_customers.customer_birthday,
    source3_customers.customer_email
).select(all_columns)

In [47]:
# Объединяем три DataFrame в один
all_sources_df = source1_df.union(source2_df).union(source3_df)

# Удаляем дубликаты из объединенного DataFrame
all_sources_df = all_sources_df.distinct()
all_sources_df.show(1)

+--------+------------------+---------------------+------------+------------+--------------+-------------------+------------------+-------------------+----------+--------------------+--------------------+------------+-------------+-----------+----------------+----------------+-----------------+-----------------+
|order_id|order_created_date|order_completion_date|order_status|craftsman_id|craftsman_name|  craftsman_address|craftsman_birthday|    craftsman_email|product_id|        product_name| product_description|product_type|product_price|customer_id|   customer_name|customer_address|customer_birthday|   customer_email|
+--------+------------------+---------------------+------------+------------+--------------+-------------------+------------------+-------------------+----------+--------------------+--------------------+------------+-------------+-----------+----------------+----------------+-----------------+-----------------+
|      86|        2022-07-18|           2022-07-21|       

### Формирование таблиц измерений и фактов в DWH

#### 1. Таблика заказчиков dwh.d_customers


In [105]:
# Читаем данные из таблицы измерений
d_customers_df = read_from_postgres("dwh", "d_customers")
customers_columns = ['customer_name', 'customer_address', 'customer_birthday', 'customer_email']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_customers_df = all_sources_df.select(customers_columns).distinct().exceptAll(d_customers_df.select(customers_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())
# кэшируем, чтоб спарк после записи не удалил таблицу
new_customers_df = new_customers_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_customers_df, "dwh", "d_customers")

# Проверяем первые 5 строк
d_customers_df = read_from_postgres("dwh", "d_customers")
d_customers_df.show(5)

+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|customer_id|  customer_name|    customer_address|customer_birthday|      customer_email|           load_dttm|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|          1|Ariel Hidderley|3616 American Ash...|       1995-05-21|ahidderleyik@tiny...|2024-12-10 22:21:...|
|          2|    Del Kindred|220 Grasskamp Par...|       1993-06-03|     dkindredki@g.co|2024-12-10 22:21:...|
|          3|    Dynah Lough|286 Mitchell Terrace|       1999-04-17|dloughlz@blogspot...|2024-12-10 22:21:...|
|          4| Goldina Napper|    592 Nova Parkway|       2004-11-08|gnapperp7@hatena....|2024-12-10 22:21:...|
|          5|    Sandye Mant|21947 Carpenter Park|       1996-10-01|    smantcp@admin.ch|2024-12-10 22:21:...|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
o

In [106]:
# Присоединяем customer_id из оригинальной таблицы d_customers_df (понадобится дальше для инкрементальной витрины)
new_customers_df = new_customers_df.alias("new_customers").join(
    d_customers_df.alias("d_customers"),
    (col("new_customers.customer_name") == col("d_customers.customer_name")) &
    (col("new_customers.customer_address") == col("d_customers.customer_address")) &
    (col("new_customers.customer_birthday") == col("d_customers.customer_birthday")) &
    (col("new_customers.customer_email") == col("d_customers.customer_email")),
    how='left'
).select(
    col("d_customers.customer_id").alias("customer_id"),
    col("new_customers.customer_name"),
    col("new_customers.customer_address"),
    col("new_customers.customer_birthday"),
    col("new_customers.customer_email"),
    col("new_customers.load_dttm")
)

# Проверяем результат
new_customers_df.show(1)

+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|customer_id|  customer_name|    customer_address|customer_birthday|      customer_email|           load_dttm|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|          1|Ariel Hidderley|3616 American Ash...|       1995-05-21|ahidderleyik@tiny...|2024-12-10 22:21:...|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
only showing top 1 row



#### 2. Таблица продуктов dwh.d_products

In [107]:
# Читаем данные из таблицы измерений
d_products_df = read_from_postgres("dwh", "d_products")
products_columns = ['product_name', 'product_description', 'product_type', 'product_price']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_products_df = all_sources_df.select(products_columns).distinct().exceptAll(d_products_df.select(products_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())

# кэшируем, чтоб спарк после записи не удалил таблицу
new_products_df = new_products_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_products_df, "dwh", "d_products")

# Проверяем первые 5 строк
d_products_df = read_from_postgres("dwh", "d_products")
d_products_df.show(5)

+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|product_id|        product_name| product_description|        product_type|product_price|           load_dttm|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|         1|        Pathiri Podi|Pathiri is a trad...|Foodgrains, Oil &...|           12|2024-12-10 22:21:...|
|         2|HandMade Gold-Pla...|Gold-plated and b...|             clothes|           10|2024-12-10 22:21:...|
|         3|HandMade Plus Siz...|Blue and beige la...|             clothes|           19|2024-12-10 22:21:...|
|         4|Foot Pumice Paddl...|HandMade foot pum...|    Beauty & Hygiene|           25|2024-12-10 22:21:...|
|         5|Bergamot & Cedarw...|HandMade Luxury S...|    Beauty & Hygiene|           31|2024-12-10 22:21:...|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
o

In [108]:
# Присоединяем product_id из оригинальной таблицы d_products_df (потребуется далее для инкрементальной витрины)
new_products_df = new_products_df.alias("new_products").join(
    d_products_df.alias("d_products"),
    (col("new_products.product_name") == col("d_products.product_name")) &
    (col("new_products.product_description") == col("d_products.product_description")) &
    (col("new_products.product_type") == col("d_products.product_type")) &
    (col("new_products.product_price") == col("d_products.product_price")),
    how='left'
).select(
    col("d_products.product_id").alias("product_id"),
    col("new_products.product_name"),
    col("new_products.product_description"),
    col("new_products.product_type"),
    col("new_products.product_price")
)

# Проверяем результат
new_products_df.show(1)


+----------+------------+--------------------+--------------------+-------------+
|product_id|product_name| product_description|        product_type|product_price|
+----------+------------+--------------------+--------------------+-------------+
|         1|Pathiri Podi|Pathiri is a trad...|Foodgrains, Oil &...|           12|
+----------+------------+--------------------+--------------------+-------------+
only showing top 1 row



#### 3. Таблица мастеров dwh.d_craftsmans

In [109]:
# Читаем данные из таблицы измерений
d_craftsmans_df = read_from_postgres("dwh", "d_craftsmans")
craftsmans_columns = ['craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_craftsmans_df = all_sources_df.select(craftsmans_columns).distinct().exceptAll(d_craftsmans_df.select(craftsmans_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())

# кэшируем, чтоб спарк после записи не удалил таблицу
new_craftsmans_df = new_craftsmans_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_craftsmans_df, "dwh", "d_craftsmans")

# Проверяем первые 5 строк
d_craftsmans_df = read_from_postgres("dwh", "d_craftsmans")
d_craftsmans_df.show(5)

+------------+---------------+--------------------+------------------+--------------------+--------------------+
|craftsman_id| craftsman_name|   craftsman_address|craftsman_birthday|     craftsman_email|           load_dttm|
+------------+---------------+--------------------+------------------+--------------------+--------------------+
|           1| Khalil Heining|  83956 Manley Plaza|        1998-02-08|ymcwhorter17@inte...|2024-12-10 22:21:...|
|           2|     Jake Draye|      2 Bluestem Way|        2003-08-01|dwannes8v@newsvin...|2024-12-10 22:21:...|
|           3|  Gustave Irwin|   54700 Swallow Way|        1992-09-02|jwherritc1@cornel...|2024-12-10 22:21:...|
|           4|  Zelma Scarffe|5560 Blackbird Plaza|        2000-05-24|ccripinh2@list-ma...|2024-12-10 22:21:...|
|           5|Katlin Guilloud|       9659 8th Lane|        1992-01-08|bsheberjf@pcworld...|2024-12-10 22:21:...|
+------------+---------------+--------------------+------------------+--------------------+-----

In [110]:
# Присоединяем craftsman_id из оригинальной таблицы d_craftsmans_df (потребуется далее для инкрементальной витрины)
new_craftsmans_df = new_craftsmans_df.alias("new_craftsmans").join(
    d_craftsmans_df.alias("d_craftsmans"),
    (col("new_craftsmans.craftsman_name") == col("d_craftsmans.craftsman_name")) &
    (col("new_craftsmans.craftsman_address") == col("d_craftsmans.craftsman_address")) &
    (col("new_craftsmans.craftsman_birthday") == col("d_craftsmans.craftsman_birthday")) &
    (col("new_craftsmans.craftsman_email") == col("d_craftsmans.craftsman_email")),
    how='left'
).select(
    col("d_craftsmans.craftsman_id").alias("craftsman_id"),
    col("new_craftsmans.craftsman_name"),
    col("new_craftsmans.craftsman_address"),
    col("new_craftsmans.craftsman_birthday"),
    col("new_craftsmans.craftsman_email")
)

# Проверяем результат
new_craftsmans_df.show(1)


+------------+--------------+------------------+------------------+--------------------+
|craftsman_id|craftsman_name| craftsman_address|craftsman_birthday|     craftsman_email|
+------------+--------------+------------------+------------------+--------------------+
|           1|Khalil Heining|83956 Manley Plaza|        1998-02-08|ymcwhorter17@inte...|
+------------+--------------+------------------+------------------+--------------------+
only showing top 1 row



#### 4. Таблица заказов dwh.f_orders

In [111]:
# # Читаем данные из таблицы измерений
# f_orders_df = read_from_postgres("dwh", "f_orders")

# # Выбираем необходимые колонки из объединенного DataFrame
# orders_columns = ['product_id', 'craftsman_id', 'customer_id', 'order_created_date', 'order_completion_date', 'order_status']

# # Читаем данные из таблицы измерений и переименовываем столбцы
# d_craftsmans_df = read_from_postgres("dwh", "d_craftsmans").selectExpr(
#     "craftsman_id as craftsman_id_dwh", "craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email"
# )
# d_customers_df = read_from_postgres("dwh", "d_customers").selectExpr(
#     "customer_id as customer_id_dwh", "customer_name", "customer_address", "customer_birthday", "customer_email"
# )
# d_products_df = read_from_postgres("dwh", "d_products").selectExpr(
#     "product_id as product_id_dwh", "product_name", "product_description", "product_type", "product_price"
# )

# # Делаем join, используя переименованные столбцы
# new_orders_df = all_sources_df.join(
#     d_craftsmans_df,
#     all_sources_df.craftsman_id == d_craftsmans_df.craftsman_id_dwh
# ).join(
#     d_customers_df,
#     all_sources_df.customer_id == d_customers_df.customer_id_dwh
# ).join(
#     d_products_df,
#     all_sources_df.product_id == d_products_df.product_id_dwh
# ).select(orders_columns).distinct().exceptAll(f_orders_df.select(orders_columns)) \
#                                                              .withColumn("load_dttm", current_timestamp())

# # Записываем оставшиеся записи в таблицу заказов
# write_to_postgres(new_orders_df, "dwh", "f_orders")

# Читаем данные из таблицы измерений
f_orders_df = read_from_postgres("dwh", "f_orders")
orders_columns = ['product_id', 'craftsman_id', 'customer_id', 'order_created_date', 'order_completion_date', 'order_status']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_orders_df = all_sources_df.select(orders_columns).distinct().exceptAll(f_orders_df.select(orders_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())

# кэшируем, чтоб спарк после записи не удалил таблицу
new_orders_df = new_orders_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_orders_df, "dwh", "f_orders")

# Проверяем первые 5 строк
f_orders_df = read_from_postgres("dwh", "f_orders")
f_orders_df.show(5)

+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|order_id|product_id|craftsman_id|customer_id|order_created_date|order_completion_date|order_status|           load_dttm|
+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|       1|        86|          86|         86|        2022-07-18|           2022-07-21|        done|2024-12-10 22:22:...|
|       2|       955|         955|        955|        2022-07-31|           2022-08-02|        done|2024-12-10 22:22:...|
|       3|       950|         950|        950|        2022-05-12|           2022-05-15|        done|2024-12-10 22:22:...|
|       4|        61|          61|         61|        2022-10-17|                 NULL|     created|2024-12-10 22:22:...|
|       5|       604|         604|        604|        2022-10-10|                 NULL|    delivery|2024-12-10 22:22:...|
+--------+----------+---

In [112]:
# Присоединяем order_id из оригинальной таблицы f_orders_df (потребуется далее для инкрементальной витрины)
new_orders_df = new_orders_df.alias("new_orders").join(
    f_orders_df.alias("f_orders"),
    (col("new_orders.product_id") == col("f_orders.product_id")) &
    (col("new_orders.craftsman_id") == col("f_orders.craftsman_id")) &
    (col("new_orders.customer_id") == col("f_orders.customer_id")) &
    (col("new_orders.order_created_date") == col("f_orders.order_created_date")) &
    (col("new_orders.order_completion_date") == col("f_orders.order_completion_date")) &
    (col("new_orders.order_status") == col("f_orders.order_status")),
    how='left'
).select(
    col("f_orders.order_id").alias("order_id"),
    col("new_orders.product_id"),
    col("new_orders.craftsman_id"),
    col("new_orders.customer_id"),
    col("new_orders.order_created_date"),
    col("new_orders.order_completion_date"),
    col("new_orders.order_status")
)

# Проверяем результат
new_orders_df.show(1)


+--------+----------+------------+-----------+------------------+---------------------+------------+
|order_id|product_id|craftsman_id|customer_id|order_created_date|order_completion_date|order_status|
+--------+----------+------------+-----------+------------------+---------------------+------------+
|       1|        86|          86|         86|        2022-07-18|           2022-07-21|        done|
+--------+----------+------------+-----------+------------------+---------------------+------------+
only showing top 1 row



### Заполнение витрины данных из данных таблиц измерений и фактов в DWH

Первым делаем создаем инкрементальную табличку с агрегатами.

In [122]:
# Добавляем отчетный период в таблицу заказов
new_orders_df = new_orders_df.withColumn("report_period", date_format("order_created_date", "yyyy-MM"))

# Создаем окно для нахождения самой популярной категории
window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy(desc("count_category"))

# Считаем количество товаров каждой категории для каждого мастера и периода
product_category_counts = new_orders_df.join(
    new_products_df.alias("products"),
    new_orders_df.product_id == col("products.product_id")
).groupBy("craftsman_id", "report_period", "products.product_type") \
    .agg(count("products.product_type").alias("count_category"))

# Находим топ-1 категорию для каждого мастера и периода
top_categories = product_category_counts.withColumn(
    "row_num", row_number().over(window_spec)
).filter(col("row_num") == 1).select("craftsman_id", "report_period", "product_type")

# Добавляем расчет агрегатов
new_craftsman_report_datamart_df = new_orders_df.join(
    new_craftsmans_df.alias("craftsmans"),
    new_orders_df.craftsman_id == col("craftsmans.craftsman_id")
).join(
    new_products_df.alias("products"),
    new_orders_df.product_id == col("products.product_id")
).join(
    new_customers_df.alias("customers"),
    new_orders_df.customer_id == col("customers.customer_id")
).groupBy(
    col("craftsmans.craftsman_id"),
    col("craftsmans.craftsman_name"),
    col("craftsmans.craftsman_address"),
    col("craftsmans.craftsman_birthday"),
    col("craftsmans.craftsman_email"),
    col("report_period")
).agg(
    sum(col("products.product_price") * 0.9).alias("craftsman_money"),
    sum(col("products.product_price") * 0.1).alias("platform_money"),
    count(new_orders_df.order_id).alias("count_order"),
    avg(col("products.product_price")).alias("avg_price_order"),
    avg(expr("DATEDIFF(current_date(), customers.customer_birthday) / 365.25")).alias("avg_age_customer"),
    median(expr("DATEDIFF(order_completion_date, order_created_date)")).alias("median_time_order_completed"),
    sum(when(new_orders_df.order_status == "created", 1).otherwise(0)).alias("count_order_created"),
    sum(when(new_orders_df.order_status == "in_progress", 1).otherwise(0)).alias("count_order_in_progress"),
    sum(when(new_orders_df.order_status == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
    sum(when(new_orders_df.order_status == "done", 1).otherwise(0)).alias("count_order_done"),
    sum(when(new_orders_df.order_status != "done", 1).otherwise(0)).alias("count_order_not_done")
)

# Присоединяем топ-1 категорию товаров
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.join(
    top_categories,
    ["craftsman_id", "report_period"],
    "left"
).withColumnRenamed("product_type", "top_product_category")

# Добавляем столбец с текущей датой загрузки (опционально)
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.withColumn("load_dttm", current_timestamp())

# Проверяем результат
new_craftsman_report_datamart_df.toPandas().head(5)


Unnamed: 0,craftsman_id,report_period,craftsman_name,craftsman_address,craftsman_birthday,craftsman_email,craftsman_money,platform_money,count_order,avg_price_order,avg_age_customer,median_time_order_completed,count_order_created,count_order_in_progress,count_order_delivery,count_order_done,count_order_not_done,top_product_category,load_dttm
0,87,2022-07,Elliott Beachem,44816 Sullivan Point,1996-12-05,aezelefx@usgs.gov,89.1,9.9,1,99.0,33.527721,1.0,0,0,0,1,0,clothes,2024-12-10 22:31:27.922727
1,96,2019-09,Kellen Ambresin,32 Division Circle,1993-01-03,hallmannpp@scribd.com,120.6,13.4,1,134.0,26.439425,1.0,0,0,0,1,0,clothes,2024-12-10 22:31:27.922727
2,142,2018-09,Dacie Fredson,1 Blue Bill Park Point,1994-05-24,rcheley57@google.ca,32.4,3.6,1,36.0,22.658453,3.0,0,0,0,1,0,clothes,2024-12-10 22:31:27.922727
3,251,2020-01,Tyrone Rennolds,40 Sachs Street,2004-09-23,moclearydp@weather.com,109.8,12.2,0,122.0,23.712526,,1,0,0,0,1,clothes,2024-12-10 22:31:27.922727
4,480,2022-06,Ken Ellcock,4 Summer Ridge Drive,2000-12-08,gwilletts5b@china.com.cn,48.6,5.4,0,54.0,20.936345,,0,0,0,0,1,clothes,2024-12-10 22:31:27.922727


Теперь нужно понять, какие строки будем обновлять (update) в `dwh.craftsman_report_datamart`, а какие нужно будет вставить новые (insert).

In [129]:
craftsman_report_datamart = read_from_postgres("dwh", "craftsman_report_datamart")

# Ключевые столбцы для идентификации строки
key_columns = ["craftsman_id", "report_period"]

# 1. Определяем новые строки (INSERT)
new_rows_df = new_craftsman_report_datamart_df.join(
    craftsman_report_datamart.select(*key_columns),
    key_columns,
    how="left_anti"
)

# 2. Определяем обновленные строки (UPDATE)
updated_rows_df = new_craftsman_report_datamart_df.alias("new").join(
    craftsman_report_datamart.alias("existing"),
    key_columns,
    how="inner"
).filter(
    # Проверяем, есть ли расхождения в значимых столбцах
    (col("new.craftsman_money") != col("existing.craftsman_money")) |
    (col("new.platform_money") != col("existing.platform_money")) |
    (col("new.count_order") != col("existing.count_order")) |
    (col("new.avg_price_order") != col("existing.avg_price_order")) |
    (col("new.avg_age_customer") != col("existing.avg_age_customer")) |
    (col("new.median_time_order_completed") != col("existing.median_time_order_completed")) |
    (col("new.count_order_created") != col("existing.count_order_created")) |
    (col("new.count_order_in_progress") != col("existing.count_order_in_progress")) |
    (col("new.count_order_delivery") != col("existing.count_order_delivery")) |
    (col("new.count_order_done") != col("existing.count_order_done")) |
    (col("new.count_order_not_done") != col("existing.count_order_not_done")) |
    (col("new.top_product_category") != col("existing.top_product_category"))
).select("new.*")

In [134]:
new_craftsman_report_datamart_df

DataFrame[craftsman_id: bigint, report_period: string, craftsman_name: string, craftsman_address: string, craftsman_birthday: date, craftsman_email: string, craftsman_money: double, platform_money: double, count_order: bigint, avg_price_order: double, avg_age_customer: decimal(22,10), median_time_order_completed: double, count_order_created: bigint, count_order_in_progress: bigint, count_order_delivery: bigint, count_order_done: bigint, count_order_not_done: bigint, top_product_category: string, load_dttm: timestamp]

Обновим строчки.

In [131]:
import psycopg2
from psycopg2.extras import execute_values

def update_existing_rows(df, schema, table):
    # Преобразуем DataFrame в список словарей
    rows = df.collect()
    update_query = f"""
    UPDATE {schema}.{table} AS target
    SET craftsman_money = data.craftsman_money,
        platform_money = data.platform_money,
        count_order = data.count_order,
        avg_price_order = data.avg_price_order,
        avg_age_customer = data.avg_age_customer,
        median_time_order_completed = data.median_time_order_completed,
        count_order_created = data.count_order_created,
        count_order_in_progress = data.count_order_in_progress,
        count_order_delivery = data.count_order_delivery,
        count_order_done = data.count_order_done,
        count_order_not_done = data.count_order_not_done,
        top_product_category = data.top_product_category,
        load_dttm = data.load_dttm
    FROM (VALUES %s) AS data (
        craftsman_id, report_period, craftsman_money, platform_money, count_order, avg_price_order,
        avg_age_customer, median_time_order_completed, count_order_created, count_order_in_progress,
        count_order_delivery, count_order_done, count_order_not_done, top_product_category, load_dttm
    )
    WHERE target.craftsman_id = data.craftsman_id AND target.report_period = data.report_period
    """
    
    # Устанавливаем соединение с Postgres
    conn = psycopg2.connect(
        dbname="postgres",
        user="eezimin",
        password="qwerty",
        host="postgres",
        port="5432"
    )
    with conn.cursor() as cursor:
        # Формируем данные для VALUES
        values = [
            (
                row["craftsman_id"], row["report_period"], row["craftsman_money"], row["platform_money"],
                row["count_order"], row["avg_price_order"], row["avg_age_customer"], 
                row["median_time_order_completed"], row["count_order_created"],
                row["count_order_in_progress"], row["count_order_delivery"], row["count_order_done"],
                row["count_order_not_done"], row["top_product_category"], row["load_dttm"]
            )
            for row in rows
        ]
        execute_values(cursor, update_query, values)
    conn.commit()
    conn.close()


In [133]:
new_rows_df

DataFrame[craftsman_id: bigint, report_period: string, craftsman_name: string, craftsman_address: string, craftsman_birthday: date, craftsman_email: string, craftsman_money: double, platform_money: double, count_order: bigint, avg_price_order: double, avg_age_customer: decimal(22,10), median_time_order_completed: double, count_order_created: bigint, count_order_in_progress: bigint, count_order_delivery: bigint, count_order_done: bigint, count_order_not_done: bigint, top_product_category: string, load_dttm: timestamp]

А теперь вставим недостающие.

In [135]:
write_to_postgres(new_rows_df.drop("load_dttm"), "dwh", "craftsman_report_datamart")

In [136]:
# Обновление таблицы инкрементальных загрузок
load_dates_dma = read_from_postgres("dwh", "load_dates_craftsman_report_datamart")

load_dates_data = [Row(load_dttm=date.today())]  # Используем текущую дату из модуля datetime
load_dates_df = spark.createDataFrame(load_dates_data).exceptAll(load_dates_dma.select('load_dttm'))

# Запись в таблицу
write_to_postgres(load_dates_df, "dwh", "load_dates_craftsman_report_datamart")

read_from_postgres("dwh", "load_dates_craftsman_report_datamart").show()

+---+----------+
| id| load_dttm|
+---+----------+
|  1|2024-12-10|
+---+----------+

