### Настройка подключения

In [1]:
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import (
    current_timestamp, current_date, date_format, col, sum, avg, median, count, lit, expr, row_number, desc, when
)
from datetime import date

# Создаем SparkSession
spark = SparkSession.builder \
    .appName("Postgres-Spark") \
    .config("spark.jars", "postgresql-42.7.4.jar") \
    .getOrCreate()

# Параметры подключения
jdbc_url = "jdbc:postgresql://postgres:5432/dwh"
connection_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

### Загрузка источников

In [2]:
def read_table(schema, table):
    return spark.read.jdbc(
        url=jdbc_url,
        table=f"{schema}.{table}",
        properties=connection_properties
    )

def write_table(df, schema, table, mode="append"):
    df.write.jdbc(
        url=jdbc_url,
        table=f"{schema}.{table}",
        mode=mode,
        properties=connection_properties
    )

In [3]:

# Чтение данных
df_sources = {
    "source1": read_table("source1", "craft_market_wide"),
    "source2_masters_products": read_table("source2", "craft_market_masters_products"),
    "source2_orders_customers": read_table("source2", "craft_market_orders_customers"),
    "source3_orders": read_table("source3", "craft_market_orders"),
    "source3_craftsmans": read_table("source3", "craft_market_craftsmans"),
    "source3_customers": read_table("source3", "craft_market_customers")
}

# Объединение данных из разных источников
def combine_sources():
    columns = [
        'order_id', 'order_created_date', 'order_completion_date', 'order_status', 'craftsman_id',
        'craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email', 'product_id',
        'product_name', 'product_description', 'product_type', 'product_price', 'customer_id',
        'customer_name', 'customer_address', 'customer_birthday', 'customer_email'
    ]
    
    # source1
    source1_df = df_sources["source1"].select(columns)
    
    # source2
    source2_df = df_sources["source2_masters_products"] \
        .join(df_sources["source2_orders_customers"], on=[
            "product_id", "craftsman_id"], how="inner") \
        .select(columns)

    # source3
    source3_df = df_sources["source3_orders"] \
        .join(df_sources["source3_craftsmans"], on="craftsman_id", how="inner") \
        .join(df_sources["source3_customers"], on="customer_id", how="inner") \
        .select(columns)

    # Объединение всех источников
    return source1_df.union(source2_df).union(source3_df).distinct()

# Обработка объединенных данных
combined_df = combine_sources()

# Показать результаты объединения
combined_df.show(1)

+--------+------------------+---------------------+------------+------------+--------------+-------------------+------------------+-------------------+----------+--------------------+--------------------+------------+-------------+-----------+----------------+----------------+-----------------+-----------------+
|order_id|order_created_date|order_completion_date|order_status|craftsman_id|craftsman_name|  craftsman_address|craftsman_birthday|    craftsman_email|product_id|        product_name| product_description|product_type|product_price|customer_id|   customer_name|customer_address|customer_birthday|   customer_email|
+--------+------------------+---------------------+------------+------------+--------------+-------------------+------------------+-------------------+----------+--------------------+--------------------+------------+-------------+-----------+----------------+----------------+-----------------+-----------------+
|      86|        2022-07-18|           2022-07-21|       

### Формирование таблиц измерений и фактов в DWH

In [4]:
from pyspark.sql import functions as F

dwh_customers_df = read_table("dwh", "d_customers")
dwh_products_df = read_table("dwh", "d_products")
dwh_craftsmans_df = read_table("dwh", "d_craftsmans")
dwh_orders_df = read_table("dwh", "f_orders")



In [5]:
from pyspark.sql import functions as F

# Чтение текущих данных из DWH
existing_customers_df = read_table("dwh", "d_customers")

# Указание интересующих нас колонок
customer_columns = ['customer_name', 'customer_address', 'customer_birthday', 'customer_email']

# Фильтрация данных: сохраняем только уникальные записи из исходного DataFrame
new_customer_data = combined_df.select(customer_columns).distinct()

# Определение новых записей путем удаления уже существующих данных
unique_new_customers = new_customer_data.join(
    existing_customers_df.select(customer_columns),
    on=customer_columns,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp())  # Используем load_dttm вместо load_timestamp

# Кэшируем DataFrame, чтобы избежать потери данных
unique_new_customers = unique_new_customers.cache()

# Записываем только уникальные записи в таблицу DWH
write_table(unique_new_customers, "dwh", "d_customers")

# Проверяем первые несколько строк таблицы после записи
updated_customers_df = read_table("dwh", "d_customers")
updated_customers_df.show(5)

enriched_customers_df = unique_new_customers.alias("new_customers").join(
    existing_customers_df.alias("d_customers"),
    on=[
        F.col("new_customers.customer_name") == F.col("d_customers.customer_name"),
        F.col("new_customers.customer_address") == F.col("d_customers.customer_address"),
        F.col("new_customers.customer_birthday") == F.col("d_customers.customer_birthday"),
        F.col("new_customers.customer_email") == F.col("d_customers.customer_email")
    ],
    how='left'
).select(
    F.col("d_customers.customer_id").alias("customer_id"),
    F.col("new_customers.customer_name"),
    F.col("new_customers.customer_address"),
    F.col("new_customers.customer_birthday"),
    F.col("new_customers.customer_email"),
    F.col("new_customers.load_dttm")  # Сохраняем поле load_dttm
)

# Просмотр результата с присоединенным customer_id
enriched_customers_df.show(1)


+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|customer_id|  customer_name|    customer_address|customer_birthday|      customer_email|           load_dttm|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|          1|Ariel Hidderley|3616 American Ash...|       1995-05-21|ahidderleyik@tiny...|2025-01-22 00:42:...|
|          2|    Del Kindred|220 Grasskamp Par...|       1993-06-03|     dkindredki@g.co|2025-01-22 00:42:...|
|          3|    Dynah Lough|286 Mitchell Terrace|       1999-04-17|dloughlz@blogspot...|2025-01-22 00:42:...|
|          4| Goldina Napper|    592 Nova Parkway|       2004-11-08|gnapperp7@hatena....|2025-01-22 00:42:...|
|          5|  Dinny McGlynn|      1 Sommers Hill|       2000-10-15|dmcglynn6i@archiv...|2025-01-22 00:42:...|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
o

In [6]:
from pyspark.sql import functions as F

# Чтение текущих данных из DWH
existing_products_df = read_table("dwh", "d_products")

# Указание интересующих нас колонок
product_columns = ['product_name', 'product_description', 'product_type', 'product_price']

# Фильтрация данных: сохраняем только уникальные записи из исходного DataFrame
new_product_data = combined_df.select(product_columns).distinct()

# Определение новых записей путем удаления уже существующих данных
unique_new_products = new_product_data.join(
    existing_products_df.select(product_columns),
    on=product_columns,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp())  # Используем load_dttm

# Кэшируем DataFrame, чтобы избежать потери данных
unique_new_products = unique_new_products.cache()

# Записываем только уникальные записи в таблицу DWH
write_table(unique_new_products, "dwh", "d_products")

# Проверяем первые несколько строк таблицы после записи
updated_products_df = read_table("dwh", "d_products")
updated_products_df.show(5)


enriched_products_df = unique_new_products.alias("new_products").join(
    existing_products_df.alias("d_products"),
    on=[
        F.col("new_products.product_name") == F.col("d_products.product_name"),
        F.col("new_products.product_description") == F.col("d_products.product_description"),
        F.col("new_products.product_type") == F.col("d_products.product_type"),
        F.col("new_products.product_price") == F.col("d_products.product_price")
    ],
    how='left'
).select(
    F.col("d_products.product_id").alias("product_id"),
    F.col("new_products.product_name"),
    F.col("new_products.product_description"),
    F.col("new_products.product_type"),
    F.col("new_products.product_price"),
    F.col("new_products.load_dttm")  # Сохраняем поле load_dttm
)

# Просмотр результата с присоединенным product_id
enriched_products_df.show(1)

+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|product_id|        product_name| product_description|        product_type|product_price|           load_dttm|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|         1|        Pathiri Podi|Pathiri is a trad...|Foodgrains, Oil &...|           12|2025-01-22 00:42:...|
|         2|HandMade Gold-Pla...|Gold-plated and b...|             clothes|           10|2025-01-22 00:42:...|
|         3|HandMade Plus Siz...|Blue and beige la...|             clothes|           19|2025-01-22 00:42:...|
|         4|Foot Pumice Paddl...|HandMade foot pum...|    Beauty & Hygiene|           25|2025-01-22 00:42:...|
|         5|Bergamot & Cedarw...|HandMade Luxury S...|    Beauty & Hygiene|           31|2025-01-22 00:42:...|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
o

In [7]:
# Чтение текущих данных из DWH
existing_craftsmans_df = read_table("dwh", "d_craftsmans")

# Указание интересующих нас колонок
craftsmans_columns = ['craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email']

# Фильтрация данных: сохраняем только уникальные записи из исходного DataFrame
new_craftsman_data = combined_df.select(craftsmans_columns).distinct()

# Определение новых записей путем удаления уже существующих данных
unique_new_craftsmans = new_craftsman_data.join(
    existing_craftsmans_df.select(craftsmans_columns),
    on=craftsmans_columns,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp())  # Используем load_dttm

# Кэшируем DataFrame, чтобы избежать потери данных
unique_new_craftsmans = unique_new_craftsmans.cache()

# Записываем только уникальные записи в таблицу DWH
write_table(unique_new_craftsmans, "dwh", "d_craftsmans")

# Проверяем первые несколько строк таблицы после записи
updated_craftsmans_df = read_table("dwh", "d_craftsmans")
updated_craftsmans_df.show(5)

enriched_craftsmans_df = unique_new_craftsmans.alias("new_craftsmans").join(
    existing_craftsmans_df.alias("d_craftsmans"),
    on=[
        F.col("new_craftsmans.craftsman_name") == F.col("d_craftsmans.craftsman_name"),
        F.col("new_craftsmans.craftsman_address") == F.col("d_craftsmans.craftsman_address"),
        F.col("new_craftsmans.craftsman_birthday") == F.col("d_craftsmans.craftsman_birthday"),
        F.col("new_craftsmans.craftsman_email") == F.col("d_craftsmans.craftsman_email")
    ],
    how='left'
).select(
    F.col("d_craftsmans.craftsman_id").alias("craftsman_id"),
    F.col("new_craftsmans.craftsman_name"),
    F.col("new_craftsmans.craftsman_address"),
    F.col("new_craftsmans.craftsman_birthday"),
    F.col("new_craftsmans.craftsman_email"),
    F.col("new_craftsmans.load_dttm")  # Сохраняем поле load_dttm
)

# Просмотр результата с присоединенным craftsman_id
enriched_craftsmans_df.show(1)

+------------+---------------+--------------------+------------------+--------------------+--------------------+
|craftsman_id| craftsman_name|   craftsman_address|craftsman_birthday|     craftsman_email|           load_dttm|
+------------+---------------+--------------------+------------------+--------------------+--------------------+
|           1| Khalil Heining|  83956 Manley Plaza|        1998-02-08|ymcwhorter17@inte...|2025-01-22 00:42:...|
|           2|     Jake Draye|      2 Bluestem Way|        2003-08-01|dwannes8v@newsvin...|2025-01-22 00:42:...|
|           3|  Gustave Irwin|   54700 Swallow Way|        1992-09-02|jwherritc1@cornel...|2025-01-22 00:42:...|
|           4|  Zelma Scarffe|5560 Blackbird Plaza|        2000-05-24|ccripinh2@list-ma...|2025-01-22 00:42:...|
|           5|Katlin Guilloud|       9659 8th Lane|        1992-01-08|bsheberjf@pcworld...|2025-01-22 00:42:...|
+------------+---------------+--------------------+------------------+--------------------+-----

In [8]:

# Чтение текущих данных из DWH
existing_orders_df = read_table("dwh", "f_orders")

# Указание интересующих нас колонок
orders_columns = ['product_id', 'craftsman_id', 'customer_id', 'order_created_date', 'order_completion_date', 'order_status']

# Фильтрация данных: сохраняем только уникальные записи из исходного DataFrame
new_order_data = combined_df.select(orders_columns).distinct()

# Определение новых записей путем удаления уже существующих данных
unique_new_orders = new_order_data.join(
    existing_orders_df.select(orders_columns),
    on=orders_columns,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp())  # Используем load_dttm

# Кэшируем DataFrame, чтобы избежать потери данных
unique_new_orders = unique_new_orders.cache()

# Записываем только уникальные записи в таблицу DWH
write_table(unique_new_orders, "dwh", "f_orders")

# Проверяем первые несколько строк таблицы после записи
updated_orders_df = read_table("dwh", "f_orders")
updated_orders_df.show(5)

enriched_orders_df = unique_new_orders.alias("new_orders").join(
    existing_orders_df.alias("f_orders"),
    on=[
        F.col("new_orders.product_id") == F.col("f_orders.product_id"),
        F.col("new_orders.craftsman_id") == F.col("f_orders.craftsman_id"),
        F.col("new_orders.customer_id") == F.col("f_orders.customer_id"),
        F.col("new_orders.order_created_date") == F.col("f_orders.order_created_date"),
        F.col("new_orders.order_completion_date") == F.col("f_orders.order_completion_date"),
        F.col("new_orders.order_status") == F.col("f_orders.order_status")
    ],
    how='left'
).select(
    F.col("f_orders.order_id").alias("order_id"),
    F.col("new_orders.product_id"),
    F.col("new_orders.craftsman_id"),
    F.col("new_orders.customer_id"),
    F.col("new_orders.order_created_date"),
    F.col("new_orders.order_completion_date"),
    F.col("new_orders.order_status"),
    F.col("new_orders.load_dttm")  
)

# Просмотр результата с присоединенным order_id
enriched_orders_df.show(1)

+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|order_id|product_id|craftsman_id|customer_id|order_created_date|order_completion_date|order_status|           load_dttm|
+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|       1|       157|         157|        157|        2022-06-25|           2022-06-29|        done|2025-01-22 00:42:...|
|       2|       805|         805|        805|        2022-09-05|           2022-09-07|        done|2025-01-22 00:42:...|
|       3|       122|         122|        122|        2022-07-09|           2022-07-11|        done|2025-01-22 00:42:...|
|       4|       630|         630|        630|        2022-08-16|           2022-08-17|        done|2025-01-22 00:42:...|
|       5|        11|          11|         11|        2022-05-12|           2022-05-14|        done|2025-01-22 00:42:...|
+--------+----------+---

### Заполнение витрины

In [10]:
# Добавляем столбец для периода отчета (по месяцу и году)
enriched_orders_df = enriched_orders_df.withColumn("report_period", F.date_format("order_created_date", "yyyy-MM"))

# Создаем окно для нахождения самой популярной категории (по количеству заказов)
window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy(F.desc("count_category"))

# Считаем количество товаров каждой категории для каждого мастера и периода
product_category_counts = enriched_orders_df.join(
    enriched_products_df.alias("products"),
    enriched_orders_df["product_id"] == F.col("products.product_id")
).groupBy("craftsman_id", "report_period", "products.product_type") \
    .agg(F.count("products.product_type").alias("count_category"))

# Находим топ-1 категорию для каждого мастера и периода
top_categories = product_category_counts.withColumn(
    "row_num", F.row_number().over(window_spec)
).filter(F.col("row_num") == 1).select("craftsman_id", "report_period", "product_type")

# Создаем отчет по данным мастеров, включая агрегированные данные
new_craftsman_report_datamart_df = enriched_orders_df.join(
    enriched_craftsmans_df.alias("craftsmans"),
    enriched_orders_df.craftsman_id == F.col("craftsmans.craftsman_id")
).join(
    enriched_products_df.alias("products"),
    enriched_orders_df.product_id == F.col("products.product_id")
).join(
    enriched_customers_df.alias("customers"),
    enriched_orders_df.customer_id == F.col("customers.customer_id")
).groupBy(
    F.col("craftsmans.craftsman_id"),
    F.col("craftsmans.craftsman_name"),
    F.col("craftsmans.craftsman_address"),
    F.col("craftsmans.craftsman_birthday"),
    F.col("craftsmans.craftsman_email"),
    F.col("report_period")
).agg(
    F.sum(F.col("products.product_price") * 0.9).alias("craftsman_money"),
    F.sum(F.col("products.product_price") * 0.1).alias("platform_money"),
    F.count(enriched_orders_df.order_id).alias("count_order"),
    F.avg(F.col("products.product_price")).alias("avg_price_order"),
    F.avg(F.expr("DATEDIFF(current_date(), customers.customer_birthday) / 365.25")).alias("avg_age_customer"),
    F.median(F.expr("DATEDIFF(order_completion_date, order_created_date)")).alias("median_time_order_completed"),
    F.sum(F.when(enriched_orders_df.order_status == "created", 1).otherwise(0)).alias("count_order_created"),
    F.sum(F.when(enriched_orders_df.order_status == "in_progress", 1).otherwise(0)).alias("count_order_in_progress"),
    F.sum(F.when(enriched_orders_df.order_status == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
    F.sum(F.when(enriched_orders_df.order_status == "done", 1).otherwise(0)).alias("count_order_done"),
    F.sum(F.when(enriched_orders_df.order_status != "done", 1).otherwise(0)).alias("count_order_not_done")
)

# Присоединяем топ-1 категорию товаров
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.join(
    top_categories,
    ["craftsman_id", "report_period"],
    "left"
).withColumnRenamed("product_type", "top_product_category")

# Добавляем столбец с текущей датой загрузки (опционально)
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.withColumn("load_dttm", F.current_timestamp())

# Проверяем результат
new_craftsman_report_datamart_df.toPandas().head(5)

Unnamed: 0,craftsman_id,report_period,craftsman_name,craftsman_address,craftsman_birthday,craftsman_email,craftsman_money,platform_money,count_order,avg_price_order,avg_age_customer,median_time_order_completed,count_order_created,count_order_in_progress,count_order_delivery,count_order_done,count_order_not_done,top_product_category,load_dttm
0,87,2022-07,Jeanelle Healing,575 Graedel Park,1999-03-25,csprason3@soup.io,79.2,8.8,1,88.0,21.330595,1.0,0,0,0,1,0,clothes,2025-01-22 00:43:29.831006
1,96,2019-09,Ulberto Duffrie,0 Sommers Way,1992-04-09,pcloneyp2@histats.com,71.1,7.9,1,79.0,33.201916,1.0,0,0,0,1,0,clothes,2025-01-22 00:43:29.831006
2,142,2018-09,Casper Lambrook,14 Luster Lane,2002-04-09,bmckendry2k@java.com,73.8,8.2,1,82.0,24.506502,3.0,0,0,0,1,0,clothes,2025-01-22 00:43:29.831006
3,251,2020-01,Charlean Bendin,302 Mendota Street,1994-06-01,igentrye3@ycombinator.com,49.5,5.5,0,55.0,30.201232,,1,0,0,0,1,Beauty & Hygiene,2025-01-22 00:43:29.831006
4,480,2022-06,Thornton Tremollet,40 Grasskamp Drive,2003-08-16,imorley28@linkedin.com,64.8,7.2,0,72.0,25.445585,,0,0,0,0,1,clothes,2025-01-22 00:43:29.831006


In [12]:
# Чтение существующих данных из таблицы craftsman_report_datamart
craftsman_report_datamart = read_table("dwh", "craftsman_report_datamart")

# Ключевые столбцы для идентификации строки
key_columns = ["craftsman_id", "report_period"]

# 1. Определяем новые строки (INSERT)
# Используем "left_anti" join, чтобы найти строки, которые есть в новом датафрейме, но отсутствуют в существующем
new_rows_df = new_craftsman_report_datamart_df.join(
    craftsman_report_datamart.select(*key_columns),
    key_columns,
    how="left_anti"
)

# 2. Определяем обновленные строки (UPDATE)
# Используем "inner" join, чтобы получить строки, которые присутствуют в обоих датафреймах, и затем фильтруем
updated_rows_df = new_craftsman_report_datamart_df.alias("new").join(
    craftsman_report_datamart.alias("existing"),
    key_columns,
    how="inner"
).filter(
    # Проверяем расхождения в значимых столбцах
    (
        col("new.craftsman_money") != col("existing.craftsman_money")
    ) |
    (
        col("new.platform_money") != col("existing.platform_money")
    ) |
    (
        col("new.count_order") != col("existing.count_order")
    ) |
    (
        col("new.avg_price_order") != col("existing.avg_price_order")
    ) |
    (
        col("new.avg_age_customer") != col("existing.avg_age_customer")
    ) |
    (
        col("new.median_time_order_completed") != col("existing.median_time_order_completed")
    ) |
    (
        col("new.count_order_created") != col("existing.count_order_created")
    ) |
    (
        col("new.count_order_in_progress") != col("existing.count_order_in_progress")
    ) |
    (
        col("new.count_order_delivery") != col("existing.count_order_delivery")
    ) |
    (
        col("new.count_order_done") != col("existing.count_order_done")
    ) |
    (
        col("new.count_order_not_done") != col("existing.count_order_not_done")
    ) |
    (
        col("new.top_product_category") != col("existing.top_product_category")
    )
).select("new.*")

# Результат: новый датафрейм с новыми и обновленными строками


In [13]:
new_craftsman_report_datamart_df

DataFrame[craftsman_id: bigint, report_period: string, craftsman_name: string, craftsman_address: string, craftsman_birthday: date, craftsman_email: string, craftsman_money: double, platform_money: double, count_order: bigint, avg_price_order: double, avg_age_customer: decimal(22,10), median_time_order_completed: double, count_order_created: bigint, count_order_in_progress: bigint, count_order_delivery: bigint, count_order_done: bigint, count_order_not_done: bigint, top_product_category: string, load_dttm: timestamp]

In [14]:
pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10
Note: you may need to restart the kernel to use updated packages.


In [16]:
import psycopg2
from psycopg2.extras import execute_values

def update_existing_rows(df, schema, table):
    # Преобразуем DataFrame в список словарей
    rows = df.collect()
    update_query = f"""
    UPDATE {schema}.{table} AS target
    SET craftsman_money = data.craftsman_money,
        platform_money = data.platform_money,
        count_order = data.count_order,
        avg_price_order = data.avg_price_order,
        avg_age_customer = data.avg_age_customer,
        median_time_order_completed = data.median_time_order_completed,
        count_order_created = data.count_order_created,
        count_order_in_progress = data.count_order_in_progress,
        count_order_delivery = data.count_order_delivery,
        count_order_done = data.count_order_done,
        count_order_not_done = data.count_order_not_done,
        top_product_category = data.top_product_category,
        load_dttm = data.load_dttm
    FROM (VALUES %s) AS data (
        craftsman_id, report_period, craftsman_money, platform_money, count_order, avg_price_order,
        avg_age_customer, median_time_order_completed, count_order_created, count_order_in_progress,
        count_order_delivery, count_order_done, count_order_not_done, top_product_category, load_dttm
    )
    WHERE target.craftsman_id = data.craftsman_id AND target.report_period = data.report_period
    """
    
    # Устанавливаем соединение с Postgres
    conn = psycopg2.connect(
        dbname="postgres",
        user="postgres",
        password="postgres",
        host="postgres",
        port="5432"
    )
    with conn.cursor() as cursor:
        # Формируем данные для VALUES
        values = [
            (
                row["craftsman_id"], row["report_period"], row["craftsman_money"], row["platform_money"],
                row["count_order"], row["avg_price_order"], row["avg_age_customer"], 
                row["median_time_order_completed"], row["count_order_created"],
                row["count_order_in_progress"], row["count_order_delivery"], row["count_order_done"],
                row["count_order_not_done"], row["top_product_category"], row["load_dttm"]
            )
            for row in rows
        ]
        execute_values(cursor, update_query, values)
    conn.commit()
    conn.close()

In [17]:
write_table(new_rows_df.drop("load_dttm"), "dwh", "craftsman_report_datamart")

In [18]:
load_dates_dma = read_table("dwh", "load_dates_craftsman_report_datamart")

load_dates_data = [Row(load_dttm=date.today())]  # Используем текущую дату из модуля datetime
load_dates_df = spark.createDataFrame(load_dates_data).exceptAll(load_dates_dma.select('load_dttm'))


write_table(load_dates_df, "dwh", "load_dates_craftsman_report_datamart")

read_table("dwh", "load_dates_craftsman_report_datamart").show()

+---+----------+
| id| load_dttm|
+---+----------+
|  1|2025-01-22|
+---+----------+

