In [1]:
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import (
    current_timestamp, current_date, date_format, col, sum, avg, median, count, lit, expr, row_number, desc, when
)
from datetime import date

spark = SparkSession.builder \
    .appName("Spark") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.4") \
    .getOrCreate()

pg_user = 'postgres'
pg_pass = 'postgres'

jdbc_url = "jdbc:postgresql://postgres:5432/postgres_db"
connection_properties = {
    "user": pg_user,
    "password": pg_pass,
    "driver": "org.postgresql.Driver"
}


In [2]:
def read_pg(schema, table):
    return spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/postgres_db") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"{schema}.{table}") \
        .option("user", pg_user) \
        .option("password", pg_pass) \
        .load()

def write_pg(df, schema, table):
    df.write.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/postgres_db") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"{schema}.{table}") \
        .option("user", pg_user) \
        .option("password", pg_pass) \
        .mode("append") \
        .save()


### Чтение данных и объединение их в одну большую таблицу

In [3]:
source1_df = read_pg("source1", "craft_market_wide")
source2_masters_products = read_pg("source2", "craft_market_masters_products")
source2_orders_customers = read_pg("source2", "craft_market_orders_customers")
source3_orders = read_pg("source3", "craft_market_orders")
source3_craftsmans = read_pg("source3", "craft_market_craftsmans")
source3_customers = read_pg("source3", "craft_market_customers")

In [4]:
all_columns = [
    'order_id',
    'order_created_date',
    'order_completion_date',
    'order_status',
    'craftsman_id',
    'craftsman_name',
    'craftsman_address',
    'craftsman_birthday',
    'craftsman_email',
    'product_id',
    'product_name',
    'product_description',
    'product_type',
    'product_price',
    'customer_id',
    'customer_name',
    'customer_address',
    'customer_birthday',
    'customer_email' 
]

In [5]:
# создаем объединенную таблицу из таблиц 1-го источника
source1_df = source1_df.select(all_columns)

In [6]:
# создаем объединенную таблицу из таблиц 2-го источника
source2_df = source2_masters_products.join(
    source2_orders_customers,
    (source2_masters_products.product_id == source2_orders_customers.product_id) &
    (source2_masters_products.craftsman_id == source2_orders_customers.craftsman_id)
).select(
    source2_masters_products.craftsman_id,
    source2_masters_products.craftsman_name,
    source2_masters_products.craftsman_address,
    source2_masters_products.craftsman_birthday,
    source2_masters_products.craftsman_email,
    source2_masters_products.product_id,
    source2_masters_products.product_name,
    source2_masters_products.product_description,
    source2_masters_products.product_type,
    source2_masters_products.product_price,
    source2_orders_customers.customer_id,
    source2_orders_customers.customer_name,
    source2_orders_customers.customer_address,
    source2_orders_customers.customer_birthday,
    source2_orders_customers.customer_email,
    source2_orders_customers.order_id,
    source2_orders_customers.order_created_date,
    source2_orders_customers.order_completion_date,
    source2_orders_customers.order_status
).select(all_columns)

In [7]:
# создаем объединенную таблицу из таблиц 3-го источника
source3_df = source3_orders.join(
    source3_craftsmans,
    source3_orders.craftsman_id == source3_craftsmans.craftsman_id
).join(
    source3_customers,
    source3_orders.customer_id == source3_customers.customer_id
).select(
    source3_orders.order_id,
    source3_orders.order_created_date,
    source3_orders.order_completion_date,
    source3_orders.order_status,
    source3_craftsmans.craftsman_id,
    source3_craftsmans.craftsman_name,
    source3_craftsmans.craftsman_address,
    source3_craftsmans.craftsman_birthday,
    source3_craftsmans.craftsman_email,
    source3_orders.product_id,
    source3_orders.product_name,
    source3_orders.product_description,
    source3_orders.product_type,
    source3_orders.product_price,
    source3_customers.customer_id,
    source3_customers.customer_name,
    source3_customers.customer_address,
    source3_customers.customer_birthday,
    source3_customers.customer_email
).select(all_columns)

In [8]:
# Объединяем три DataFrame в один
all_sources_df = source1_df.union(source2_df).union(source3_df)

# Удаляем дубликаты из объединенного DataFrame
all_sources_df = all_sources_df.distinct()

### Создание таблиц измерений и фактов в DWH

#### dwh.d_customers


In [9]:
# Читаем данные из таблицы измерений
d_customers_df = read_pg("dwh", "d_customers")
customers_columns = ['customer_name', 'customer_address', 'customer_birthday', 'customer_email']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_customers_df = all_sources_df.select(customers_columns).distinct().exceptAll(d_customers_df.select(customers_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())
# кэшируем, чтоб спарк после записи не удалил таблицу
new_customers_df = new_customers_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_pg(new_customers_df, "dwh", "d_customers")

# Проверяем первые 5 строк
d_customers_df = read_pg("dwh", "d_customers")
d_customers_df.show(5)

+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|customer_id|  customer_name|    customer_address|customer_birthday|      customer_email|           load_dttm|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|          1|    Dynah Lough|286 Mitchell Terrace|       1999-04-17|dloughlz@blogspot...|2025-01-02 07:28:...|
|          2|Ariel Hidderley|3616 American Ash...|       1995-05-21|ahidderleyik@tiny...|2025-01-02 07:28:...|
|          3|    Del Kindred|220 Grasskamp Par...|       1993-06-03|     dkindredki@g.co|2025-01-02 07:28:...|
|          4| Goldina Napper|    592 Nova Parkway|       2004-11-08|gnapperp7@hatena....|2025-01-02 07:28:...|
|          5|    Sandye Mant|21947 Carpenter Park|       1996-10-01|    smantcp@admin.ch|2025-01-02 07:28:...|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
o

In [10]:
# Присоединяем customer_id из оригинальной таблицы d_customers_df (понадобится дальше для инкрементальной витрины)
new_customers_df = new_customers_df.alias("new_customers").join(
    d_customers_df.alias("d_customers"),
    (col("new_customers.customer_name") == col("d_customers.customer_name")) &
    (col("new_customers.customer_address") == col("d_customers.customer_address")) &
    (col("new_customers.customer_birthday") == col("d_customers.customer_birthday")) &
    (col("new_customers.customer_email") == col("d_customers.customer_email")),
    how='left'
).select(
    col("d_customers.customer_id").alias("customer_id"),
    col("new_customers.customer_name"),
    col("new_customers.customer_address"),
    col("new_customers.customer_birthday"),
    col("new_customers.customer_email"),
    col("new_customers.load_dttm")
)

# Проверяем результат
new_customers_df.show(5)

+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|customer_id|  customer_name|    customer_address|customer_birthday|      customer_email|           load_dttm|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|          1|    Dynah Lough|286 Mitchell Terrace|       1999-04-17|dloughlz@blogspot...|2025-01-02 07:28:...|
|          2|Ariel Hidderley|3616 American Ash...|       1995-05-21|ahidderleyik@tiny...|2025-01-02 07:28:...|
|          3|    Del Kindred|220 Grasskamp Par...|       1993-06-03|     dkindredki@g.co|2025-01-02 07:28:...|
|          4| Goldina Napper|    592 Nova Parkway|       2004-11-08|gnapperp7@hatena....|2025-01-02 07:28:...|
|          5|    Sandye Mant|21947 Carpenter Park|       1996-10-01|    smantcp@admin.ch|2025-01-02 07:28:...|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
o

In [11]:
# обновление customer_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    d_customers_df.alias("d_customers_df"),
    (col("all_sources_df.customer_name") == col("d_customers_df.customer_name")) &
    (col("all_sources_df.customer_address") == col("d_customers_df.customer_address")) &
    (col("all_sources_df.customer_birthday") == col("d_customers_df.customer_birthday")) &
    (col("all_sources_df.customer_email") == col("d_customers_df.customer_email")),
    how='left'
).select(
    col("d_customers_df.customer_id").alias("customer_id"),
    col('all_sources_df.order_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.craftsman_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.product_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

#### dwh.d_products

In [12]:
# Читаем данные из таблицы измерений
d_products_df = read_pg("dwh", "d_products")
products_columns = ['product_name', 'product_description', 'product_type', 'product_price']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_products_df = all_sources_df.select(products_columns).distinct().exceptAll(d_products_df.select(products_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())

# кэшируем, чтоб спарк после записи не удалил таблицу
new_products_df = new_products_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_pg(new_products_df, "dwh", "d_products")

# Проверяем первые 5 строк
d_products_df = read_pg("dwh", "d_products")
d_products_df.show(5)

+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|product_id|        product_name| product_description|        product_type|product_price|           load_dttm|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|         1|        Pathiri Podi|Pathiri is a trad...|Foodgrains, Oil &...|           12|2025-01-02 07:28:...|
|         2|HandMade Gold-Pla...|Gold-plated and b...|             clothes|           10|2025-01-02 07:28:...|
|         3|HandMade Plus Siz...|Blue and beige la...|             clothes|           19|2025-01-02 07:28:...|
|         4|Foot Pumice Paddl...|HandMade foot pum...|    Beauty & Hygiene|           25|2025-01-02 07:28:...|
|         5|Bergamot & Cedarw...|HandMade Luxury S...|    Beauty & Hygiene|           31|2025-01-02 07:28:...|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
o

In [13]:
# Присоединяем product_id из оригинальной таблицы d_products_df (потребуется далее для инкрементальной витрины)
new_products_df = new_products_df.alias("new_products").join(
    d_products_df.alias("d_products"),
    (col("new_products.product_name") == col("d_products.product_name")) &
    (col("new_products.product_description") == col("d_products.product_description")) &
    (col("new_products.product_type") == col("d_products.product_type")) &
    (col("new_products.product_price") == col("d_products.product_price")),
    how='left'
).select(
    col("d_products.product_id").alias("product_id"),
    col("new_products.product_name"),
    col("new_products.product_description"),
    col("new_products.product_type"),
    col("new_products.product_price")
)

# Проверяем результат
new_products_df.show(5)

+----------+--------------------+--------------------+--------------------+-------------+
|product_id|        product_name| product_description|        product_type|product_price|
+----------+--------------------+--------------------+--------------------+-------------+
|         1|        Pathiri Podi|Pathiri is a trad...|Foodgrains, Oil &...|           12|
|         2|HandMade Gold-Pla...|Gold-plated and b...|             clothes|           10|
|         3|HandMade Plus Siz...|Blue and beige la...|             clothes|           19|
|         4|Foot Pumice Paddl...|HandMade foot pum...|    Beauty & Hygiene|           25|
|         5|Bergamot & Cedarw...|HandMade Luxury S...|    Beauty & Hygiene|           31|
+----------+--------------------+--------------------+--------------------+-------------+
only showing top 5 rows



In [14]:
# обновление product_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    d_products_df.alias("d_products_df"),
    (col("all_sources_df.product_name") == col("d_products_df.product_name")) &
    (col("all_sources_df.product_description") == col("d_products_df.product_description")) &
    (col("all_sources_df.product_type") == col("d_products_df.product_type")) &
    (col("all_sources_df.product_price") == col("d_products_df.product_price")),
    how='left'
).select(
    col("d_products_df.product_id").alias("product_id"),
    col('all_sources_df.order_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.craftsman_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.customer_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

#### dwh.d_craftsmans

In [15]:
# Читаем данные из таблицы измерений
d_craftsmans_df = read_pg("dwh", "d_craftsmans")
craftsmans_columns = ['craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_craftsmans_df = all_sources_df.select(craftsmans_columns).distinct().exceptAll(d_craftsmans_df.select(craftsmans_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())

# кэшируем, чтоб спарк после записи не удалил таблицу
new_craftsmans_df = new_craftsmans_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_pg(new_craftsmans_df, "dwh", "d_craftsmans")

# Проверяем первые 5 строк
d_craftsmans_df = read_pg("dwh", "d_craftsmans")
d_craftsmans_df.show(5)

+------------+---------------+--------------------+------------------+--------------------+--------------------+
|craftsman_id| craftsman_name|   craftsman_address|craftsman_birthday|     craftsman_email|           load_dttm|
+------------+---------------+--------------------+------------------+--------------------+--------------------+
|           1| Khalil Heining|  83956 Manley Plaza|        1998-02-08|ymcwhorter17@inte...|2025-01-02 07:28:...|
|           2|     Jake Draye|      2 Bluestem Way|        2003-08-01|dwannes8v@newsvin...|2025-01-02 07:28:...|
|           3|  Gustave Irwin|   54700 Swallow Way|        1992-09-02|jwherritc1@cornel...|2025-01-02 07:28:...|
|           4|  Zelma Scarffe|5560 Blackbird Plaza|        2000-05-24|ccripinh2@list-ma...|2025-01-02 07:28:...|
|           5|Katlin Guilloud|       9659 8th Lane|        1992-01-08|bsheberjf@pcworld...|2025-01-02 07:28:...|
+------------+---------------+--------------------+------------------+--------------------+-----

In [16]:
# Присоединяем craftsman_id из оригинальной таблицы d_craftsmans_df (потребуется далее для инкрементальной витрины)
new_craftsmans_df = new_craftsmans_df.alias("new_craftsmans").join(
    d_craftsmans_df.alias("d_craftsmans"),
    (col("new_craftsmans.craftsman_name") == col("d_craftsmans.craftsman_name")) &
    (col("new_craftsmans.craftsman_address") == col("d_craftsmans.craftsman_address")) &
    (col("new_craftsmans.craftsman_birthday") == col("d_craftsmans.craftsman_birthday")) &
    (col("new_craftsmans.craftsman_email") == col("d_craftsmans.craftsman_email")),
    how='left'
).select(
    col("d_craftsmans.craftsman_id").alias("craftsman_id"),
    col("new_craftsmans.craftsman_name"),
    col("new_craftsmans.craftsman_address"),
    col("new_craftsmans.craftsman_birthday"),
    col("new_craftsmans.craftsman_email")
)

# Проверяем результат
new_craftsmans_df.show(5)


+------------+---------------+--------------------+------------------+--------------------+
|craftsman_id| craftsman_name|   craftsman_address|craftsman_birthday|     craftsman_email|
+------------+---------------+--------------------+------------------+--------------------+
|           1| Khalil Heining|  83956 Manley Plaza|        1998-02-08|ymcwhorter17@inte...|
|           2|     Jake Draye|      2 Bluestem Way|        2003-08-01|dwannes8v@newsvin...|
|           3|  Gustave Irwin|   54700 Swallow Way|        1992-09-02|jwherritc1@cornel...|
|           4|  Zelma Scarffe|5560 Blackbird Plaza|        2000-05-24|ccripinh2@list-ma...|
|           5|Katlin Guilloud|       9659 8th Lane|        1992-01-08|bsheberjf@pcworld...|
+------------+---------------+--------------------+------------------+--------------------+
only showing top 5 rows



In [17]:
# обновление craftsman_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    d_craftsmans_df.alias("d_craftsmans_df"),
    (col("all_sources_df.craftsman_name") == col("d_craftsmans_df.craftsman_name")) &
    (col("all_sources_df.craftsman_address") == col("d_craftsmans_df.craftsman_address")) &
    (col("all_sources_df.craftsman_birthday") == col("d_craftsmans_df.craftsman_birthday")) &
    (col("all_sources_df.craftsman_email") == col("d_craftsmans_df.craftsman_email")),
    how='left'
).select(
    col("d_craftsmans_df.craftsman_id").alias("craftsman_id"),
    col('all_sources_df.order_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.product_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.customer_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

#### dwh.f_orders

In [18]:
# Читаем данные из таблицы измерений
f_orders_df = read_pg("dwh", "f_orders")
orders_columns = ['product_id', 'craftsman_id', 'customer_id', 'order_created_date', 'order_completion_date', 'order_status']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_orders_df = all_sources_df.select(orders_columns).distinct().exceptAll(f_orders_df.select(orders_columns)) \
                                                             .withColumn("load_dttm", current_timestamp())

# кэшируем, чтоб спарк после записи не удалил таблицу
new_orders_df = new_orders_df.cache()

# Записываем оставшиеся записи в таблицу измерений
write_pg(new_orders_df, "dwh", "f_orders")

# Проверяем первые 5 строк
f_orders_df = read_pg("dwh", "f_orders")
f_orders_df.show(5)

+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|order_id|product_id|craftsman_id|customer_id|order_created_date|order_completion_date|order_status|           load_dttm|
+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|       1|       774|        2039|       2482|        2022-07-24|           2022-07-27|        done|2025-01-02 07:28:...|
|       2|      2406|        2665|       2032|        2022-09-29|           2022-09-30|        done|2025-01-02 07:28:...|
|       3|       734|         886|       1945|        2022-08-17|           2022-08-18|        done|2025-01-02 07:28:...|
|       4|       300|        1995|       1631|        2021-11-06|           2021-11-09|        done|2025-01-02 07:28:...|
|       5|       218|        1526|       1959|        2022-10-22|           2022-10-24|        done|2025-01-02 07:28:...|
+--------+----------+---

In [19]:
# Присоединяем order_id из оригинальной таблицы f_orders_df (потребуется далее для инкрементальной витрины)
new_orders_df = new_orders_df.alias("new_orders").join(
    f_orders_df.alias("f_orders"),
    (col("new_orders.product_id") == col("f_orders.product_id")) &
    (col("new_orders.craftsman_id") == col("f_orders.craftsman_id")) &
    (col("new_orders.customer_id") == col("f_orders.customer_id")) &
    (col("new_orders.order_created_date") == col("f_orders.order_created_date")) &
    (col("new_orders.order_completion_date") == col("f_orders.order_completion_date")) &
    (col("new_orders.order_status") == col("f_orders.order_status")),
    how='left'
).select(
    col("f_orders.order_id").alias("order_id"),
    col("new_orders.product_id"),
    col("new_orders.craftsman_id"),
    col("new_orders.customer_id"),
    col("new_orders.order_created_date"),
    col("new_orders.order_completion_date"),
    col("new_orders.order_status")
)

# Проверяем результат
new_orders_df.show(1)


+--------+----------+------------+-----------+------------------+---------------------+------------+
|order_id|product_id|craftsman_id|customer_id|order_created_date|order_completion_date|order_status|
+--------+----------+------------+-----------+------------------+---------------------+------------+
|       1|       774|        2039|       2482|        2022-07-24|           2022-07-27|        done|
+--------+----------+------------+-----------+------------------+---------------------+------------+
only showing top 1 row



In [20]:
# обновление order_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    f_orders_df.alias("f_orders_df"),
    (col("all_sources_df.product_id") == col("f_orders_df.product_id")) &
    (col("all_sources_df.craftsman_id") == col("f_orders_df.craftsman_id")) &
    (col("all_sources_df.customer_id") == col("f_orders_df.customer_id")) &
    (col("all_sources_df.order_created_date") == col("f_orders_df.order_created_date")) &
    (col("all_sources_df.order_completion_date") == col("f_orders_df.order_completion_date")) &
    (col("all_sources_df.order_status") == col("f_orders_df.order_status")),
    how='left'
).select(
    col("f_orders_df.order_id").alias("order_id"),
    col('all_sources_df.craftsman_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.product_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.customer_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

### Витрина данных

##### Создаем инкрементальную табличку с агрегатами.

In [21]:
# Добавляем отчетный период в таблицу заказов
new_orders_df = new_orders_df.withColumn("report_period", date_format("order_created_date", "yyyy-MM"))

# Создаем окно для нахождения самой популярной категории
window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy(desc("count_category"))

# Считаем количество товаров каждой категории для каждого мастера и периода
product_category_counts = new_orders_df.join(
    new_products_df.alias("products"),
    new_orders_df.product_id == col("products.product_id")
).groupBy("craftsman_id", "report_period", "products.product_type") \
    .agg(count("products.product_type").alias("count_category"))

# Находим топ-1 категорию для каждого мастера и периода
top_categories = product_category_counts.withColumn(
    "row_num", row_number().over(window_spec)
).filter(col("row_num") == 1).select("craftsman_id", "report_period", "product_type")

# Добавляем расчет агрегатов
new_craftsman_report_datamart_df = new_orders_df.join(
    new_craftsmans_df.alias("craftsmans"),
    new_orders_df.craftsman_id == col("craftsmans.craftsman_id")
).join(
    new_products_df.alias("products"),
    new_orders_df.product_id == col("products.product_id")
).join(
    new_customers_df.alias("customers"),
    new_orders_df.customer_id == col("customers.customer_id")
).groupBy(
    col("craftsmans.craftsman_id"),
    col("craftsmans.craftsman_name"),
    col("craftsmans.craftsman_address"),
    col("craftsmans.craftsman_birthday"),
    col("craftsmans.craftsman_email"),
    col("report_period")
).agg(
    sum(col("products.product_price") * 0.9).alias("craftsman_money"),
    sum(col("products.product_price") * 0.1).alias("platform_money"),
    count(new_orders_df.order_id).alias("count_order"),
    avg(col("products.product_price")).alias("avg_price_order"),
    avg(expr("DATEDIFF(current_date(), customers.customer_birthday) / 365.25")).alias("avg_age_customer"),
    median(expr("DATEDIFF(order_completion_date, order_created_date)")).alias("median_time_order_completed"),
    sum(when(new_orders_df.order_status == "created", 1).otherwise(0)).alias("count_order_created"),
    sum(when(new_orders_df.order_status == "in_progress", 1).otherwise(0)).alias("count_order_in_progress"),
    sum(when(new_orders_df.order_status == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
    sum(when(new_orders_df.order_status == "done", 1).otherwise(0)).alias("count_order_done"),
    sum(when(new_orders_df.order_status != "done", 1).otherwise(0)).alias("count_order_not_done")
)

# Присоединяем топ-1 категорию товаров
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.join(
    top_categories,
    ["craftsman_id", "report_period"],
    "left"
).withColumnRenamed("product_type", "top_product_category")

# Добавляем столбец с текущей датой загрузки (опционально)
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.withColumn("load_dttm", current_timestamp())

# Проверяем результат
new_craftsman_report_datamart_df.toPandas().head(5)


Unnamed: 0,craftsman_id,report_period,craftsman_name,craftsman_address,craftsman_birthday,craftsman_email,craftsman_money,platform_money,count_order,avg_price_order,avg_age_customer,median_time_order_completed,count_order_created,count_order_in_progress,count_order_delivery,count_order_done,count_order_not_done,top_product_category,load_dttm
0,100,2021-01,Rafe Torbeck,1296 Farragut Plaza,1998-02-12,htillerjq@adobe.com,143.1,15.9,0,159.0,26.839151,,1,0,0,0,1,clothes,2025-01-02 07:28:48.222485
1,143,2020-02,Myriam Knocker,2 Paget Center,1998-12-17,cdecreuzeqp@sciencedaily.com,226.8,25.2,0,252.0,21.437372,,0,0,0,0,1,Beauty & Hygiene,2025-01-02 07:28:48.222485
2,241,2021-08,Hart Elintune,07 Rusk Parkway,2002-02-16,mropkins2q@biblegateway.com,91.8,10.2,0,102.0,24.495551,,0,0,1,0,1,clothes,2025-01-02 07:28:48.222485
3,854,2022-11,Myron Sinnock,90698 Rusk Way,1996-07-30,jharloweg6@youku.com,51.3,5.7,0,57.0,20.848734,,0,0,1,0,1,clothes,2025-01-02 07:28:48.222485
4,1279,2022-07,Pryce Gilbard,21 Westend Alley,1991-11-07,ljudkinfw@upenn.edu,65.7,7.3,1,73.0,28.531143,3.0,0,0,0,1,0,clothes,2025-01-02 07:28:48.222485


#### Нужно понять, какие строки будем обновлять, а какие нужно будет создать.

In [22]:
craftsman_report_datamart = read_pg("dwh", "craftsman_report_datamart")

key_columns = ["craftsman_id", "report_period"]

# Новые строки которые нужно вставить
new_rows_df = new_craftsman_report_datamart_df.join(
    craftsman_report_datamart.select(*key_columns),
    key_columns,
    how="left_anti"
)

# Старые строки которые нужно обновить
updated_rows_df = new_craftsman_report_datamart_df.alias("new").join(
    craftsman_report_datamart.alias("existing"),
    key_columns,
    how="inner"
).filter(
    (col("new.craftsman_money") != col("existing.craftsman_money")) |
    (col("new.platform_money") != col("existing.platform_money")) |
    (col("new.count_order") != col("existing.count_order")) |
    (col("new.avg_price_order") != col("existing.avg_price_order")) |
    (col("new.avg_age_customer") != col("existing.avg_age_customer")) |
    (col("new.median_time_order_completed") != col("existing.median_time_order_completed")) |
    (col("new.count_order_created") != col("existing.count_order_created")) |
    (col("new.count_order_in_progress") != col("existing.count_order_in_progress")) |
    (col("new.count_order_delivery") != col("existing.count_order_delivery")) |
    (col("new.count_order_done") != col("existing.count_order_done")) |
    (col("new.count_order_not_done") != col("existing.count_order_not_done")) |
    (col("new.top_product_category") != col("existing.top_product_category"))
).select("new.*")

In [23]:
new_craftsman_report_datamart_df

DataFrame[craftsman_id: bigint, report_period: string, craftsman_name: string, craftsman_address: string, craftsman_birthday: date, craftsman_email: string, craftsman_money: double, platform_money: double, count_order: bigint, avg_price_order: double, avg_age_customer: decimal(22,10), median_time_order_completed: double, count_order_created: bigint, count_order_in_progress: bigint, count_order_delivery: bigint, count_order_done: bigint, count_order_not_done: bigint, top_product_category: string, load_dttm: timestamp]

In [24]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Using cached psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Using cached psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10


In [25]:
import psycopg2
from psycopg2.extras import execute_values

def update_existing_rows(df, schema, table):
    rows = df.collect()
    update_query = f"""
    UPDATE {schema}.{table} AS target
    SET craftsman_money = data.craftsman_money,
        platform_money = data.platform_money,
        count_order = data.count_order,
        avg_price_order = data.avg_price_order,
        avg_age_customer = data.avg_age_customer,
        median_time_order_completed = data.median_time_order_completed,
        count_order_created = data.count_order_created,
        count_order_in_progress = data.count_order_in_progress,
        count_order_delivery = data.count_order_delivery,
        count_order_done = data.count_order_done,
        count_order_not_done = data.count_order_not_done,
        top_product_category = data.top_product_category,
        load_dttm = data.load_dttm
    FROM (VALUES %s) AS data (
        craftsman_id, report_period, craftsman_money, platform_money, count_order, avg_price_order,
        avg_age_customer, median_time_order_completed, count_order_created, count_order_in_progress,
        count_order_delivery, count_order_done, count_order_not_done, top_product_category, load_dttm
    )
    WHERE target.craftsman_id = data.craftsman_id AND target.report_period = data.report_period
    """
    conn = psycopg2.connect(
        dbname="postgres",
        user="eezimin",
        password="qwerty",
        host="postgres",
        port="5432"
    )
    with conn.cursor() as cursor:
        values = [
            (
                row["craftsman_id"], row["report_period"], row["craftsman_money"], row["platform_money"],
                row["count_order"], row["avg_price_order"], row["avg_age_customer"], 
                row["median_time_order_completed"], row["count_order_created"],
                row["count_order_in_progress"], row["count_order_delivery"], row["count_order_done"],
                row["count_order_not_done"], row["top_product_category"], row["load_dttm"]
            )
            for row in rows
        ]
        execute_values(cursor, update_query, values)
    conn.commit()
    conn.close()


In [27]:
write_pg(new_rows_df.drop("load_dttm"), "dwh", "craftsman_report_datamart")

In [28]:
# Обновление таблицы инкрементальных загрузок
load_dates_dma = read_pg("dwh", "load_dates_craftsman_report_datamart")

load_dates_data = [Row(load_dttm=date.today())]  # Используем текущую дату из модуля datetime
load_dates_df = spark.createDataFrame(load_dates_data).exceptAll(load_dates_dma.select('load_dttm'))

# Запись в таблицу
write_pg(load_dates_df, "dwh", "load_dates_craftsman_report_datamart")

read_pg("dwh", "load_dates_craftsman_report_datamart").show()

+---+----------+
| id| load_dttm|
+---+----------+
|  1|2025-01-02|
+---+----------+

