In [1]:
!pip install psycopg2-binary
import psycopg2
from psycopg2.extras import execute_values
from pyspark.sql import SparkSession, Window, Row

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10


In [20]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import current_timestamp, current_date, date_format, col, sum, avg, count, lit, expr, row_number, desc, when, median, current_timestamp
from datetime import date
# Создаем SparkSession
spark = SparkSession.builder \
    .appName("Spark PostgreSQL Example") \
    .config("spark.jars", "jars/postgresql-42.7.4.jar/postgresql-42.7.5.jar") \
    .getOrCreate()

# Параметры подключения к базе данных
url = "jdbc:postgresql://postgres:5432/postgres_db"
connection_properties = {
    "user": "user",
    "password": "password",
    "driver": "org.postgresql.Driver"
}


In [21]:
#Чтение данных из источников
def read_table(schema, table_name):
    return spark.read.jdbc( url=url, table=f"{schema}.{table_name}", properties=connection_properties)
    
df_craft_market_wide = read_table("source1", "craft_market_wide")
df_craft_market_orders_customers = read_table("source2", "craft_market_orders_customers")
df_craft_market_masters_products = read_table("source2", "craft_market_masters_products")
df_craft_market_orders = read_table("source3", "craft_market_orders")
df_craft_market_craftmens = read_table("source3", "craft_market_craftsmans")
df_craft_market_customers = read_table("source3", "craft_market_customers")

In [22]:
df_craft_market_wide.show(5)

+---+------------+----------------+--------------------+------------------+--------------------+----------+--------------------+--------------------+--------------------+-------------+--------+------------------+---------------------+------------+-----------+---------------+--------------------+-----------------+--------------------+
| id|craftsman_id|  craftsman_name|   craftsman_address|craftsman_birthday|     craftsman_email|product_id|        product_name| product_description|        product_type|product_price|order_id|order_created_date|order_completion_date|order_status|customer_id|  customer_name|    customer_address|customer_birthday|      customer_email|
+---+------------+----------------+--------------------+------------------+--------------------+----------+--------------------+--------------------+--------------------+-------------+--------+------------------+---------------------+------------+-----------+---------------+--------------------+-----------------+--------------

In [23]:
columns_to_select = [col for col in df_craft_market_wide.columns]
columns_to_select = columns_to_select[1:]
source_1 = df_craft_market_wide.select(*columns_to_select)

In [24]:
source_2 = df_craft_market_orders_customers.join(
    df_craft_market_masters_products,
    df_craft_market_orders_customers.craftsman_id  == df_craft_market_masters_products.craftsman_id
).select(
    df_craft_market_masters_products.craftsman_id,
    df_craft_market_masters_products.craftsman_name,
    df_craft_market_masters_products.craftsman_address,
    df_craft_market_masters_products.craftsman_birthday,
    df_craft_market_masters_products.craftsman_email,
    df_craft_market_masters_products.product_id,
    df_craft_market_masters_products.product_name,
    df_craft_market_masters_products.product_description,
    df_craft_market_masters_products.product_type,
    df_craft_market_masters_products.product_price,
    df_craft_market_orders_customers.order_id,
    df_craft_market_orders_customers.order_created_date,
    df_craft_market_orders_customers.order_completion_date,
    df_craft_market_orders_customers.order_status,
    df_craft_market_orders_customers.customer_id,
    df_craft_market_orders_customers.customer_name,
    df_craft_market_orders_customers.customer_address,
    df_craft_market_orders_customers.customer_birthday,
    df_craft_market_orders_customers.customer_email,
)


In [25]:
source_3 = df_craft_market_orders.join(
    df_craft_market_craftmens,
    df_craft_market_orders.craftsman_id == df_craft_market_craftmens.craftsman_id
).join(
    df_craft_market_customers,
    df_craft_market_orders.customer_id == df_craft_market_customers.customer_id
).select(
    df_craft_market_craftmens.craftsman_id,
    df_craft_market_craftmens.craftsman_name,
    df_craft_market_craftmens.craftsman_address,
    df_craft_market_craftmens.craftsman_birthday,
    df_craft_market_craftmens.craftsman_email,
    df_craft_market_orders.product_id,
    df_craft_market_orders.product_name,
    df_craft_market_orders.product_description,
    df_craft_market_orders.product_type,
    df_craft_market_orders.product_price,
    df_craft_market_orders.order_id,
    df_craft_market_orders.order_created_date,
    df_craft_market_orders.order_completion_date,
    df_craft_market_orders.order_status,
    df_craft_market_customers.customer_id,
    df_craft_market_customers.customer_name,
    df_craft_market_customers.customer_address,
    df_craft_market_customers.customer_birthday,
    df_craft_market_customers.customer_email
)

In [26]:
sources = (source_1.union(source_2).union(source_3)).distinct()

In [27]:
def write_table(df, schema, table_name):
    df.write.jdbc(url=url, table=f"{schema}.{table_name}", mode="append", properties=connection_properties)

In [28]:
d_customers_df = read_table("dwh", "d_customers")
customers_columns = [
    'customer_name', 'customer_address', 'customer_birthday', 'customer_email'
]

# Фильтрация новых клиентов
new_customers_df = (
    sources.select(customers_columns)
    .distinct()
    .exceptAll(d_customers_df.select(customers_columns))
    .withColumn("load_dttm", current_timestamp())
    .cache()
)

# Запись новых клиентов в базу
write_table(new_customers_df, "dwh", "d_customers")

d_customers_df = read_table("dwh", "d_customers")

# Присоединение customer_id
new_customers_df = (
    new_customers_df.alias("new_customers")
    .join(
        d_customers_df.alias("d_customers"),
        [
            col("new_customers.customer_name") == col("d_customers.customer_name"),
            col("new_customers.customer_address") == col("d_customers.customer_address"),
            col("new_customers.customer_birthday") == col("d_customers.customer_birthday"),
            col("new_customers.customer_email") == col("d_customers.customer_email")
        ],
        how='left'
    )
    .select(
        col("d_customers.customer_id").alias("customer_id"),
        *[col(f"new_customers.{col_name}") for col_name in customers_columns],
        col("new_customers.load_dttm")
    )
)

# Обновление customer_id в источнике
sources = (
    sources.alias("sources")
    .join(
        d_customers_df.alias("d_customers_df"),
        [
            col("sources.customer_name") == col("d_customers_df.customer_name"),
            col("sources.customer_address") == col("d_customers_df.customer_address"),
            col("sources.customer_birthday") == col("d_customers_df.customer_birthday"),
            col("sources.customer_email") == col("d_customers_df.customer_email")
        ],
        how='left'
    )
    .select(
        col("d_customers_df.customer_id").alias("customer_id"),
        *[col(f'sources.{column}') for column in [
            'order_id', 'order_created_date', 'order_completion_date', 'order_status',
            'craftsman_id', 'craftsman_name', 'craftsman_address', 'craftsman_birthday',
            'craftsman_email', 'product_id', 'product_name', 'product_description',
            'product_type', 'product_price', 'customer_name', 'customer_address',
            'customer_birthday', 'customer_email'
        ]]
    )
)


In [29]:
d_craftsmans_df = read_table("dwh", "d_craftsmans")
craftsmans_columns = [
    'craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email'
]

# Фильтрация новых мастеров
new_craftsmans_df = (
    sources.select(craftsmans_columns)
    .distinct()
    .exceptAll(d_craftsmans_df.select(craftsmans_columns))
    .withColumn("load_dttm", current_timestamp())
    .cache()
)

# Запись новых мастеров в базу
write_table(new_craftsmans_df, "dwh", "d_craftsmans")

d_craftsmans_df = read_table("dwh", "d_craftsmans")

# Присоединение craftsman_id
new_craftsmans_df = (
    new_craftsmans_df.alias("new_craftsmans")
    .join(
        d_craftsmans_df.alias("d_craftsmans"),
        [
            col("new_craftsmans.craftsman_name") == col("d_craftsmans.craftsman_name"),
            col("new_craftsmans.craftsman_address") == col("d_craftsmans.craftsman_address"),
            col("new_craftsmans.craftsman_birthday") == col("d_craftsmans.craftsman_birthday"),
            col("new_craftsmans.craftsman_email") == col("d_craftsmans.craftsman_email")
        ],
        how='left'
    )
    .select(
        col("d_craftsmans.craftsman_id").alias("craftsman_id"),
        *[col(f"new_craftsmans.{col_name}") for col_name in craftsmans_columns],
        col("new_craftsmans.load_dttm")
    )
)

# Обновление craftsman_id в источнике
sources = (
    sources.alias("sources")
    .join(
        d_craftsmans_df.alias("d_craftsmans_df"),
        [
            col("sources.craftsman_name") == col("d_craftsmans_df.craftsman_name"),
            col("sources.craftsman_address") == col("d_craftsmans_df.craftsman_address"),
            col("sources.craftsman_birthday") == col("d_craftsmans_df.craftsman_birthday"),
            col("sources.craftsman_email") == col("d_craftsmans_df.craftsman_email")
        ],
        how='left'
    )
    .select(
        col("d_craftsmans_df.craftsman_id").alias("craftsman_id"),
        *[col(f'sources.{column}') for column in [
            'order_id', 'order_created_date', 'order_completion_date', 'order_status',
            'product_id', 'craftsman_name', 'craftsman_address', 'craftsman_birthday',
            'craftsman_email', 'customer_id', 'product_name', 'product_description',
            'product_type', 'product_price', 'customer_name', 'customer_address',
            'customer_birthday', 'customer_email'
        ]]
    )
)


In [30]:
d_products_df = read_table("dwh", "d_products")
products_columns = ['product_name', 'product_description', 'product_type', 'product_price']

# Фильтрация новых продуктов
new_products_df = (
    sources.select(products_columns)
    .distinct()
    .exceptAll(d_products_df.select(products_columns))
    .withColumn("load_dttm", current_timestamp())
    .cache()
)

# Запись новых продуктов в базу
write_table(new_products_df, "dwh", "d_products")

d_products_df = read_table("dwh", "d_products")

# Присоединение product_id
new_products_df = (
    new_products_df.alias("new_products")
    .join(
        d_products_df.alias("d_products"),
        [
            col("new_products.product_name") == col("d_products.product_name"),
            col("new_products.product_description") == col("d_products.product_description"),
            col("new_products.product_type") == col("d_products.product_type"),
            col("new_products.product_price") == col("d_products.product_price")
        ],
        how='left'
    )
    .select(
        col("d_products.product_id").alias("product_id"),
        *[col(f"new_products.{col_name}") for col_name in products_columns],
        col("new_products.load_dttm")
    )
)

# Обновление product_id в источнике
sources = (
    sources.alias("sources")
    .join(
        d_products_df.alias("d_products_df"),
        [
            col("sources.product_name") == col("d_products_df.product_name"),
            col("sources.product_description") == col("d_products_df.product_description"),
            col("sources.product_type") == col("d_products_df.product_type"),
            col("sources.product_price") == col("d_products_df.product_price")
        ],
        how='left'
    )
    .select(
        col("d_products_df.product_id").alias("product_id"),
        *[col(f'sources.{column}') for column in [
            'order_id', 'order_created_date', 'order_completion_date', 'order_status',
            'craftsman_id', 'craftsman_name', 'craftsman_address', 'craftsman_birthday',
            'craftsman_email', 'customer_id', 'product_name', 'product_description',
            'product_type', 'product_price', 'customer_name', 'customer_address',
            'customer_birthday', 'customer_email'
        ]]
    )
)


In [31]:
f_orders_df = read_table("dwh", "f_orders")
orders_columns = [
    'product_id', 'craftsman_id', 'customer_id', 
    'order_created_date', 'order_completion_date', 'order_status'
]

# Фильтрация новых заказов
new_orders_df = (
    sources.select(orders_columns)
    .distinct()
    .exceptAll(f_orders_df.select(orders_columns))
    .withColumn("load_dttm", current_timestamp())
    .cache()
)

# Запись новых заказов в базу
write_table(new_orders_df, "dwh", "f_orders")

f_orders_df = read_table("dwh", "f_orders")

# Присоединение order_id
new_orders_df = (
    new_orders_df.alias("new_orders")
    .join(
        f_orders_df.alias("f_orders"),
        [
            col("new_orders.product_id") == col("f_orders.product_id"),
            col("new_orders.craftsman_id") == col("f_orders.craftsman_id"),
            col("new_orders.customer_id") == col("f_orders.customer_id"),
            col("new_orders.order_created_date") == col("f_orders.order_created_date"),
            col("new_orders.order_completion_date") == col("f_orders.order_completion_date"),
            col("new_orders.order_status") == col("f_orders.order_status")
        ],
        how='left'
    )
    .select(
        col("f_orders.order_id").alias("order_id"),
        *[col(f"new_orders.{col_name}") for col_name in orders_columns],
        col("new_orders.load_dttm")
    )
)

# Обновление order_id в источнике
sources = (
    sources.alias("sources")
    .join(
        f_orders_df.alias("f_orders_df"),
        [
            col("sources.product_id") == col("f_orders_df.product_id"),
            col("sources.craftsman_id") == col("f_orders_df.craftsman_id"),
            col("sources.customer_id") == col("f_orders_df.customer_id"),
            col("sources.order_created_date") == col("f_orders_df.order_created_date"),
            col("sources.order_completion_date") == col("f_orders_df.order_completion_date"),
            col("sources.order_status") == col("f_orders_df.order_status")
        ],
        how='left'
    )
    .select(
        col("f_orders_df.order_id").alias("order_id"),
        *[col(f'sources.{column}') for column in [
            'craftsman_id', 'order_created_date', 'order_completion_date', 'order_status',
            'product_id', 'craftsman_name', 'craftsman_address', 'craftsman_birthday',
            'craftsman_email', 'customer_id', 'product_name', 'product_description',
            'product_type', 'product_price', 'customer_name', 'customer_address',
            'customer_birthday', 'customer_email'
        ]]
    )
)

In [32]:
# Инкрементальная таблица
# Добавляем отчетный период (год и месяц) на основе даты создания заказа
new_orders_df = new_orders_df.withColumn("report_period", date_format("order_created_date", "yyyy-MM"))

# Определяем окно для расчета самой популярной категории товаров для каждого мастера и отчетного периода
window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy(desc("count_category"))

# Считаем количество товаров каждой категории для каждого мастера и отчетного периода
product_category_counts = (
    new_orders_df.alias("orders")
    .join(
        new_products_df.alias("products"),
        col("orders.product_id") == col("products.product_id")
    )
    .groupBy("craftsman_id", "report_period", "products.product_type")
    .agg(count("products.product_type").alias("count_category"))
)

# Находим топ-1 категорию товаров для каждого мастера и отчетного периода
top_categories = (
    product_category_counts
    .withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)
    .select("craftsman_id", "report_period", "product_type")
)

# Рассчитываем агрегаты для витрины данных по мастерам
new_craftsman_report_datamart_df = (
    new_orders_df.alias("orders")
    .join(
        new_craftsmans_df.alias("craftsmans"),
        col("orders.craftsman_id") == col("craftsmans.craftsman_id")
    )
    .join(
        new_products_df.alias("products"),
        col("orders.product_id") == col("products.product_id")
    )
    .join(
        new_customers_df.alias("customers"),
        col("orders.customer_id") == col("customers.customer_id")
    )
    .groupBy(
        col("craftsmans.craftsman_id"),
        col("craftsmans.craftsman_name"),
        col("craftsmans.craftsman_address"),
        col("craftsmans.craftsman_birthday"),
        col("craftsmans.craftsman_email"),
        col("report_period")
    )
    .agg(
        sum(col("products.product_price") * 0.9).alias("craftsman_money"),
        sum(col("products.product_price") * 0.1).alias("platform_money"),
        count(col("orders.order_id")).alias("count_order"),
        avg(col("products.product_price")).alias("avg_price_order"),
        avg(expr("DATEDIFF(current_date(), customers.customer_birthday) / 365.25")).alias("avg_age_customer"),
        median(expr("DATEDIFF(order_completion_date, order_created_date)")).alias("median_time_order_completed"),
        sum(when(col("orders.order_status") == "created", 1).otherwise(0)).alias("count_order_created"),
        sum(when(col("orders.order_status") == "in_progress", 1).otherwise(0)).alias("count_order_in_progress"),
        sum(when(col("orders.order_status") == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
        sum(when(col("orders.order_status") == "done", 1).otherwise(0)).alias("count_order_done"),
        sum(when(col("orders.order_status") != "done", 1).otherwise(0)).alias("count_order_not_done")
    )
)



# Присоединяем топ-1 категорию товаров к витрине данных
new_craftsman_report_datamart_df = (
    new_craftsman_report_datamart_df.alias("report")
    .join(
        top_categories.alias("top_cat"),
        ["craftsman_id", "report_period"],
        "left"
    )
    .withColumnRenamed("product_type", "top_product_category")  # Переименовываем колонку для понятности
    .withColumn("load_dttm", current_timestamp())  # Добавляем метку времени загрузки данных
)

new_craftsman_report_datamart_df.toPandas().head(20)

Unnamed: 0,craftsman_id,report_period,craftsman_name,craftsman_address,craftsman_birthday,craftsman_email,craftsman_money,platform_money,count_order,avg_price_order,avg_age_customer,median_time_order_completed,count_order_created,count_order_in_progress,count_order_delivery,count_order_done,count_order_not_done,top_product_category,load_dttm


In [33]:
# Загружаем данные из PostgreSQL в DataFrame
existing_data_df = read_table("dwh", "craftsman_report_datamart")

# Определяем ключевые столбцы для идентификации строк
key_columns = ["craftsman_id", "report_period"]

#Находим новые строки для вставки
new_rows_to_insert_df = new_craftsman_report_datamart_df.join(
    existing_data_df.select(*key_columns),
    key_columns,
    how="left_anti"
)

# Находим строки для обновления
updated_rows_df = new_craftsman_report_datamart_df.alias("new_data").join(
    existing_data_df.alias("existing_data"),
    key_columns,
    how="inner"
).filter(
    # Проверяем, есть ли различия в любом из значимых столбцов
    (col("new_data.craftsman_money") != col("existing_data.craftsman_money")) |
    (col("new_data.platform_money") != col("existing_data.platform_money")) |
    (col("new_data.count_order") != col("existing_data.count_order")) |
    (col("new_data.avg_price_order") != col("existing_data.avg_price_order")) |
    (col("new_data.avg_age_customer") != col("existing_data.avg_age_customer")) |
    (col("new_data.median_time_order_completed") != col("existing_data.median_time_order_completed")) |
    (col("new_data.count_order_created") != col("existing_data.count_order_created")) |
    (col("new_data.count_order_in_progress") != col("existing_data.count_order_in_progress")) |
    (col("new_data.count_order_delivery") != col("existing_data.count_order_delivery")) |
    (col("new_data.count_order_done") != col("existing_data.count_order_done")) |
    (col("new_data.count_order_not_done") != col("existing_data.count_order_not_done")) |
    (col("new_data.top_product_category") != col("existing_data.top_product_category"))
).select("new_data.*")

In [39]:
def update_existing_rows(df, schema, table):
    rows = df.collect()
    
    update_query = f"""
    UPDATE {schema}.{table} AS target
    SET 
        craftsman_money = data.craftsman_money,
        platform_money = data.platform_money,
        count_order = data.count_order,
        avg_price_order = data.avg_price_order,
        avg_age_customer = data.avg_age_customer,
        median_time_order_completed = data.median_time_order_completed,
        count_order_created = data.count_order_created,
        count_order_in_progress = data.count_order_in_progress,
        count_order_delivery = data.count_order_delivery,
        count_order_done = data.count_order_done,
        count_order_not_done = data.count_order_not_done,
        top_product_category = data.top_product_category,
        load_dttm = data.load_dttm
    FROM (VALUES %s) AS data (
        craftsman_id, report_period, craftsman_money, platform_money, count_order, avg_price_order,
        avg_age_customer, median_time_order_completed, count_order_created, count_order_in_progress,
        count_order_delivery, count_order_done, count_order_not_done, top_product_category, load_dttm
    )
    WHERE target.craftsman_id = data.craftsman_id 
    AND target.report_period = data.report_period
    """
    
    # Подключение к базе данных
    conn = psycopg2.connect(
        dbname="postgres",
        user="postgres_user",
        password="postgres_password",
        host="postgres",
        port="5432"
    )
    
    # Подготовка данных для выполнения запроса
    values = [
        (
            row["craftsman_id"], row["report_period"], row["craftsman_money"], row["platform_money"],
            row["count_order"], row["avg_price_order"], row["avg_age_customer"], row["median_time_order_completed"],
            row["count_order_created"], row["count_order_in_progress"], row["count_order_delivery"],
            row["count_order_done"], row["count_order_not_done"], row["top_product_category"], row["load_dttm"]
        )
        for row in rows
    ]
    
    # Выполняем SQL-запрос
    with conn.cursor() as cursor:
        execute_values(cursor, update_query, values)
    
    # Фиксируем изменения и закрываем соединение
    conn.commit()
    conn.close()


In [41]:
write_table(new_rows_to_insert_df.drop("load_dttm"), "dwh", "craftsman_report_datamart")

In [42]:
# Обновление таблицы инкрементальных загрузок
load_dates_dma = read_table("dwh", "load_dates_craftsman_report_datamart")

load_dates_data = [Row(load_dttm=date.today())]  # Используем текущую дату из модуля datetime
load_dates_df = spark.createDataFrame(load_dates_data).exceptAll(load_dates_dma.select('load_dttm'))

write_table(load_dates_df, "dwh", "load_dates_craftsman_report_datamart")

read_table("dwh", "load_dates_craftsman_report_datamart").toPandas().sort_values(by='load_dttm', ascending=False).head(5)

Unnamed: 0,id,load_dttm
0,1,2025-02-18
