### Обновление хранилища из источников данных

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, lit
import logging
import psycopg2
from psycopg2 import extras

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Параметры подключения для psycopg2
pg_conn_params = {
    "dbname": "postgres_db",
    "user": "postgres_user",
    "password": "postgres_password",
    "host": "172.27.253.185",
    "port": 5430  # Целое число для psycopg2
}

# Параметры подключения для Spark JDBC
spark_jdbc_properties = {
    "user": pg_conn_params["user"],
    "password": pg_conn_params["password"],
    "driver": "org.postgresql.Driver"
}

jdbc_url = f"jdbc:postgresql://{pg_conn_params['host']}:{pg_conn_params['port']}/{pg_conn_params['dbname']}"

# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("ETL Process with Incremental Loading") \
    .config("spark.master", "local[*]") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.6.0.jar") \
    .getOrCreate()

logger.info("SparkSession создан успешно.")

# Функция для чтения таблицы из PostgreSQL
def read_table(schema, table, columns="*"):
    table_full = f"{schema}.{table}"
    try:
        df = spark.read.jdbc(url=jdbc_url, table=table_full, properties=spark_jdbc_properties)
        if columns != "*":
            df = df.select(*[c.strip() for c in columns.split(",")])
        logger.info(f"Таблица {table_full} успешно прочитана.")
        return df
    except Exception as e:
        logger.error(f"Ошибка при чтении таблицы {table_full}: {e}")
        raise

# Функция для upsert
def upsert_partition(iterator, pg_conn_params, schema, table, conflict_fields):
    """
    Выполняет upsert данных из партиции в PostgreSQL.
    
    :param iterator: Итератор по строкам партиции
    :param pg_conn_params: Параметры подключения PostgreSQL
    :param schema: Схема таблицы
    :param table: Имя таблицы
    :param conflict_fields: Список колонок для конфликта
    """
    # Преобразуем партицию в список записей
    records = list(iterator)
    if not records:
        return
    
    # Определите поля и значения для вставки
    columns = records[0].keys()
    values = [[record[col] for col in columns] for record in records]
    
    # Создайте строку полей для SQL-запроса
    columns_sql = ", ".join([f'"{col}"' for col in columns])
    
    # Создайте строку плейсхолдеров для значений
    placeholders = ", ".join(["%s"] * len(columns))
    
    # Создайте строку для обновления при конфликте
    update_fields = [f'"{col}" = EXCLUDED."{col}"' for col in columns if col not in conflict_fields]
    update_sql = ", ".join(update_fields)
    
    # Сформируйте полный SQL-запрос для upsert
    conflict_sql = ", ".join([f'"{field}"' for field in conflict_fields])
    upsert_query = f"""
        INSERT INTO {schema}.{table} ({columns_sql})
        VALUES ({placeholders})
        ON CONFLICT ({conflict_sql})
        DO UPDATE SET
        {update_sql};
    """
    
    try:
        # Установите соединение с PostgreSQL
        conn = psycopg2.connect(
            dbname=pg_conn_params["dbname"],
            user=pg_conn_params["user"],
            password=pg_conn_params["password"],
            host=pg_conn_params["host"],
            port=pg_conn_params["port"]
        )
        cursor = conn.cursor()
        
        # Выполните пакетную вставку с upsert
        extras.execute_batch(cursor, upsert_query, values, page_size=100)
        
        # Зафиксируйте изменения
        conn.commit()
        
    except Exception as e:
        logger.error(f"Ошибка при выполнении upsert: {e}")
        raise
    
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# Функция для инкрементальной загрузки измерений
def incremental_load(source_df, dwh_df, key, columns, dwh_schema, dwh_table, source_system_code):
    """
    Выполняет инкрементальную загрузку измерений с использованием upsert через foreachPartition.
    
    :param source_df: DataFrame с исходными данными
    :param dwh_df: DataFrame с данными DWH
    :param key: Ключевое поле для сравнения
    :param columns: Список столбцов для сравнения
    :param dwh_schema: Схема DWH
    :param dwh_table: Таблица DWH
    :param source_system_code: Код системы источника (например, "S1", "S2", "S3")
    :return: None
    """
    logger.info(f"Начало инкрементальной загрузки для таблицы {dwh_schema}.{dwh_table} из источника {source_system_code}")
    
    # Добавляем source_system_code к source_df
    source_df = source_df.withColumn("source_system_code", lit(source_system_code))
    
    # Выполнение LEFT JOIN для обнаружения новых или изменённых записей
    joined_df = source_df.alias("src").join(
        dwh_df.alias("dwh"),
        on=(col(f"src.{key}") == col(f"dwh.{key}")) & 
           (col("src.source_system_code") == col("dwh.source_system_code")),
        how="left"
    )
    
    # Формирование условия для выявления изменений
    condition = lit(False)
    for column in columns:
        condition = condition | (col(f"src.{column}") != col(f"dwh.{column}"))
    
    # Фильтрация новых или изменённых записей
    delta_df = joined_df.filter(condition | col(f"dwh.{key}").isNull()) \
                        .select([col(f"src.{c}") for c in source_df.columns]) \
                        .withColumn("load_dttm", current_timestamp())
    
    count = delta_df.count()
    if count > 0:
        logger.info(f"Найдено {count} новых или изменённых записей для таблицы {dwh_schema}.{dwh_table} из источника {source_system_code}. Выполнение upsert в DWH...")
        
        # Определите конфликтующие поля (составной ключ)
        conflict_fields = ["source_system_code", key]
        
        # Преобразуем DataFrame в RDD из словарей
        delta_rdd = delta_df.rdd.map(lambda row: row.asDict())
        
        # Выполните upsert для каждой партиции
        delta_rdd.foreachPartition(
            lambda partition: upsert_partition(
                partition,
                pg_conn_params,  # Передаём параметры подключения psycopg2
                dwh_schema,
                dwh_table,
                conflict_fields  # Передаём конфликтующие поля
            )
        )
        
        logger.info(f"Upsert для таблицы {dwh_schema}.{dwh_table} из источника {source_system_code} завершен успешно.")
    else:
        logger.info(f"Нет новых или изменённых записей для таблицы {dwh_schema}.{dwh_table} из источника {source_system_code}.")

# Функция для инкрементальной загрузки фактов
def incremental_load_f_orders(source_df, dwh_df, dwh_schema, dwh_table, source_system_code):
    """
    Выполняет инкрементальную загрузку фактов f_orders с использованием upsert через foreachPartition.

    :param source_df: DataFrame с исходными данными для факта
    :param dwh_df: DataFrame с данными DWH для факта
    :param dwh_schema: Схема DWH
    :param dwh_table: Таблица DWH
    :param source_system_code: Код системы источника (например, "S1", "S2", "S3")
    :return: None
    """
    logger.info(f"Начало инкрементальной загрузки факта {dwh_schema}.{dwh_table} из источника {source_system_code}")
    
    # Добавляем source_system_code к source_df
    source_df = source_df.withColumn("source_system_code", lit(source_system_code))
    
    # Выполнение LEFT JOIN для обнаружения новых или изменённых записей
    joined_fact_df = source_df.alias("src").join(
        dwh_df.alias("dwh"),
        (col("src.source_system_code") == col("dwh.source_system_code")) &
        (col("src.order_id") == col("dwh.order_id")),
        how="left"
    )
    
    # Определение условий для новых или изменённых записей
    fact_condition = (
        col("dwh.order_id").isNull() |
        (col("src.product_id") != col("dwh.product_id")) |
        (col("src.craftsman_id") != col("dwh.craftsman_id")) |
        (col("src.customer_id") != col("dwh.customer_id")) |
        (col("src.order_created_date") != col("dwh.order_created_date")) |
        (col("src.order_completion_date") != col("dwh.order_completion_date")) |
        (col("src.order_status") != col("dwh.order_status"))
    )
    
    # Фильтрация новых или изменённых записей
    fact_delta_df = joined_fact_df.filter(fact_condition) \
        .select(
            col("src.source_system_code"),
            col("src.order_id"),
            col("src.product_id"),
            col("src.craftsman_id"),
            col("src.customer_id"),
            col("src.order_created_date"),
            col("src.order_completion_date"),
            col("src.order_status"),
            col("src.load_dttm")
        )
    
    fact_count = fact_delta_df.count()
    if fact_count > 0:
        logger.info(f"Найдено {fact_count} новых или изменённых записей для факта {dwh_schema}.{dwh_table} из источника {source_system_code}. Выполнение upsert в DWH...")
        
        # Определите конфликтующие поля для факта
        conflict_fields = ["source_system_code", "order_id"]
        
        # Преобразуем DataFrame в RDD из словарей
        fact_delta_rdd = fact_delta_df.rdd.map(lambda row: row.asDict())
        
        # Выполните upsert для каждой партиции
        fact_delta_rdd.foreachPartition(
            lambda partition: upsert_partition(
                partition,
                pg_conn_params,  # Передаём параметры подключения psycopg2
                dwh_schema,
                dwh_table,
                conflict_fields  # Передаём конфликтующие поля
            )
        )
        
        logger.info(f"Upsert для факта {dwh_schema}.{dwh_table} из источника {source_system_code} завершен успешно.")
    else:
        logger.info(f"Нет новых или изменённых записей для факта {dwh_schema}.{dwh_table} из источника {source_system_code}.")

# Загрузка таблиц источников
logger.info("Загрузка таблиц источников...")

# Источник 1: S1 - source1.craft_market_wide (Предполагается, что этот источник содержит данные для d_craftsmans, d_customers, d_products, f_orders)
source1_df = read_table("source1", "craft_market_wide")

# Источник 2: S2 - source2.craft_market_masters_products и source2.craft_market_orders_customers
source2_masters_products_df = read_table("source2", "craft_market_masters_products")
source2_orders_customers_df = read_table("source2", "craft_market_orders_customers")

# Источник 3: S3 - source3.craft_market_craftsmans, source3.craft_market_customers, source3.craft_market_orders
source3_craftsmans_df = read_table("source3", "craft_market_craftsmans")
source3_customers_df = read_table("source3", "craft_market_customers")
source3_orders_df = read_table("source3", "craft_market_orders")

logger.info("Таблицы источников загружены успешно.")

# Загрузка таблиц DWH
logger.info("Загрузка таблиц DWH...")

dwh_d_craftsmans_df = read_table("dwh", "d_craftsmans")
dwh_d_customers_df = read_table("dwh", "d_customers")
dwh_d_products_df = read_table("dwh", "d_products")
dwh_f_orders_df = read_table("dwh", "f_orders")

logger.info("Таблицы DWH загружены успешно.")

# Инкрементальная загрузка измерений с upsert

# 1. Измерение d_craftsmans из S1
# Предполагается, что S1 содержит данные для d_craftsmans
if "craftsman_id" in source1_df.columns:
    incremental_load(
        source_df=source1_df.select(
            "craftsman_id",
            "craftsman_name",
            "craftsman_address",
            "craftsman_birthday",
            "craftsman_email"
        ),
        dwh_df=dwh_d_craftsmans_df,
        key="craftsman_id",
        columns=["craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email"],
        dwh_schema="dwh",
        dwh_table="d_craftsmans",
        source_system_code="S1"
    )
else:
    logger.warning("Источник S1 не содержит данных для d_craftsmans.")

# 2. Измерение d_craftsmans из S2
# Необходимо объединить source2_orders_customers_df с source2_masters_products_df по craftsman_id
logger.info("Объединение данных S2 для d_craftsmans...")
if {"craftsman_id", "craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email"}.issubset(source2_masters_products_df.columns):
    # Объединяем таблицы по craftsman_id
    source2_combined_craftsmans_df = source2_orders_customers_df.join(
        source2_masters_products_df.select(
            "craftsman_id",
            "craftsman_name",
            "craftsman_address",
            "craftsman_birthday",
            "craftsman_email"
        ),
        on="craftsman_id",
        how="left"
    )
    
    # Удаляем дубликаты по craftsman_id
    source2_combined_craftsmans_df = source2_combined_craftsmans_df.dropDuplicates(["craftsman_id"])
    
    # Проверяем наличие необходимых столбцов
    required_columns = ["craftsman_id", "craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email"]
    if all(col in source2_combined_craftsmans_df.columns for col in required_columns):
        incremental_load(
            source_df=source2_combined_craftsmans_df.select(*required_columns),
            dwh_df=dwh_d_craftsmans_df,
            key="craftsman_id",
            columns=["craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email"],
            dwh_schema="dwh",
            dwh_table="d_craftsmans",
            source_system_code="S2"
        )
    else:
        missing = [col for col in required_columns if col not in source2_combined_craftsmans_df.columns]
        logger.warning(f"Источник S2 не содержит необходимых столбцов для d_craftsmans: {missing}. Пропускаем загрузку из S2.")
else:
    logger.warning("Источник S2 не содержит таблицы craft_market_masters_products с необходимыми столбцами для d_craftsmans. Пропускаем загрузку из S2.")

# 3. Измерение d_craftsmans из S3
incremental_load(
    source_df=source3_craftsmans_df.select(
        "craftsman_id",
        "craftsman_name",
        "craftsman_address",
        "craftsman_birthday",
        "craftsman_email"
    ),
    dwh_df=dwh_d_craftsmans_df,
    key="craftsman_id",
    columns=["craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email"],
    dwh_schema="dwh",
    dwh_table="d_craftsmans",
    source_system_code="S3"
)

# 4. Измерение d_customers из S1
# Предполагается, что S1 содержит данные для d_customers
if "customer_id" in source1_df.columns:
    incremental_load(
        source_df=source1_df.select(
            "customer_id",
            "customer_name",
            "customer_address",
            "customer_birthday",
            "customer_email"
        ),
        dwh_df=dwh_d_customers_df,
        key="customer_id",
        columns=["customer_name", "customer_address", "customer_birthday", "customer_email"],
        dwh_schema="dwh",
        dwh_table="d_customers",
        source_system_code="S1"
    )
else:
    logger.warning("Источник S1 не содержит данных для d_customers.")

# 5. Измерение d_customers из S2
# Данные о заказчиках находятся в source2_orders_customers_df
if {"customer_id", "customer_name", "customer_address", "customer_birthday", "customer_email"}.issubset(source2_orders_customers_df.columns):
    incremental_load(
        source_df=source2_orders_customers_df.select(
            "customer_id",
            "customer_name",
            "customer_address",
            "customer_birthday",
            "customer_email"
        ).dropDuplicates(["customer_id"]),
        dwh_df=dwh_d_customers_df,
        key="customer_id",
        columns=["customer_name", "customer_address", "customer_birthday", "customer_email"],
        dwh_schema="dwh",
        dwh_table="d_customers",
        source_system_code="S2"
    )
else:
    missing = [col for col in ["customer_id", "customer_name", "customer_address", "customer_birthday", "customer_email"] if col not in source2_orders_customers_df.columns]
    logger.warning(f"Источник S2 не содержит необходимых столбцов для d_customers: {missing}. Пропускаем загрузку из S2.")

# 6. Измерение d_customers из S3
incremental_load(
    source_df=source3_customers_df.select(
        "customer_id",
        "customer_name",
        "customer_address",
        "customer_birthday",
        "customer_email"
    ),
    dwh_df=dwh_d_customers_df,
    key="customer_id",
    columns=["customer_name", "customer_address", "customer_birthday", "customer_email"],
    dwh_schema="dwh",
    dwh_table="d_customers",
    source_system_code="S3"
)

# 7. Измерение d_products из S1
# Предполагается, что S1 содержит данные для d_products
if "product_id" in source1_df.columns:
    incremental_load(
        source_df=source1_df.select(
            "product_id",
            "product_name",
            "product_description",
            "product_type",
            "product_price"
        ),
        dwh_df=dwh_d_products_df,
        key="product_id",
        columns=["product_name", "product_description", "product_type", "product_price"],
        dwh_schema="dwh",
        dwh_table="d_products",
        source_system_code="S1"
    )
else:
    logger.warning("Источник S1 не содержит данных для d_products.")

# 8. Измерение d_products из S2
# Данные о продуктах находятся в source2_masters_products_df
if {"product_id", "product_name", "product_description", "product_type", "product_price"}.issubset(source2_masters_products_df.columns):
    incremental_load(
        source_df=source2_masters_products_df.select(
            "product_id",
            "product_name",
            "product_description",
            "product_type",
            "product_price"
        ),
        dwh_df=dwh_d_products_df,
        key="product_id",
        columns=["product_name", "product_description", "product_type", "product_price"],
        dwh_schema="dwh",
        dwh_table="d_products",
        source_system_code="S2"
    )
else:
    missing = [col for col in ["product_id", "product_name", "product_description", "product_type", "product_price"] if col not in source2_masters_products_df.columns]
    logger.warning(f"Источник S2 не содержит необходимых столбцов для d_products: {missing}. Пропускаем загрузку из S2.")

# 9. Измерение d_products из S3
# В S3 нет отдельной таблицы для продуктов, поэтому извлекаем из craft_market_orders
product_columns = ["product_id", "product_name", "product_description", "product_type", "product_price"]

incremental_load(
    source_df=source3_orders_df.select(*product_columns).dropDuplicates(["product_id"]),
    dwh_df=dwh_d_products_df,
    key="product_id",
    columns=["product_name", "product_description", "product_type", "product_price"],
    dwh_schema="dwh",
    dwh_table="d_products",
    source_system_code="S3"
)

# Инкрементальная загрузка фактов f_orders из S1
fact_orders_source_df_S1 = source1_df.select(
    col("order_id").alias("order_id"),
    col("product_id"),
    col("craftsman_id"),
    col("customer_id"),
    "order_created_date",
    "order_completion_date",
    "order_status"
).withColumn("load_dttm", current_timestamp()) \
 .withColumn("source_system_code", lit("S1"))  # Код системы источника S1

incremental_load_f_orders(
    source_df=fact_orders_source_df_S1,
    dwh_df=dwh_f_orders_df,
    dwh_schema="dwh",
    dwh_table="f_orders",
    source_system_code="S1"
)

# Инкрементальная загрузка фактов f_orders из S2
fact_orders_source_df_S2 = source2_orders_customers_df.select(
    col("order_id").alias("order_id"),
    col("product_id"),
    col("craftsman_id"),
    col("customer_id"),
    "order_created_date",
    "order_completion_date",
    "order_status"
).withColumn("load_dttm", current_timestamp()) \
 .withColumn("source_system_code", lit("S2"))  # Код системы источника S2

incremental_load_f_orders(
    source_df=fact_orders_source_df_S2,
    dwh_df=dwh_f_orders_df,
    dwh_schema="dwh",
    dwh_table="f_orders",
    source_system_code="S2"
)

# Инкрементальная загрузка фактов f_orders из S3
fact_orders_source_df_S3 = source3_orders_df.select(
    col("order_id").alias("order_id"),
    col("product_id"),
    col("craftsman_id"),
    col("customer_id"),
    "order_created_date",
    "order_completion_date",
    "order_status"
).withColumn("load_dttm", current_timestamp()) \
 .withColumn("source_system_code", lit("S3"))  # Код системы источника S3

incremental_load_f_orders(
    source_df=fact_orders_source_df_S3,
    dwh_df=dwh_f_orders_df,
    dwh_schema="dwh",
    dwh_table="f_orders",
    source_system_code="S3"
)

# Остановка SparkSession
spark.stop()
logger.info("SparkSession остановлена.")


### Обновление витрины с агрегатами только по тем данным которые изменились

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, sum, avg, count, when, expr, percentile_approx, coalesce, lit,
    current_timestamp, date_trunc, floor, months_between, date_format
)
import logging
from datetime import datetime
import psycopg2
from psycopg2 import extras

# ---------- Настройка логирования ----------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ---------- Создание SparkSession ----------
spark = SparkSession.builder \
    .appName("Incremental Aggregation with partial recalc") \
    .config("spark.master", "local[*]") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.6.0.jar") \
    .getOrCreate()

logger.info("SparkSession создан успешно.")

# ---------- Параметры подключения PostgreSQL ----------
pg_conn_params = {
    "dbname": "postgres_db",
    "user": "postgres_user",
    "password": "postgres_password",
    "host": "172.27.253.185",
    "port": 5430
}
spark_jdbc_properties = {
    "user": pg_conn_params["user"],
    "password": pg_conn_params["password"],
    "driver": "org.postgresql.Driver"
}
jdbc_url = f"jdbc:postgresql://{pg_conn_params['host']}:{pg_conn_params['port']}/{pg_conn_params['dbname']}"

def read_table(schema, table):
    df = spark.read.jdbc(
        url=jdbc_url,
        table=f"{schema}.{table}",
        properties=spark_jdbc_properties
    )
    cnt = df.count()
    logger.info(f"Таблица {schema}.{table} прочитана ({cnt} строк).")
    return df

# ---------- Функции для UPDATE / INSERT витрины ----------
def update_partition(iterator, pg_conn_params, schema, table):
    records = list(iterator)
    if not records:
        return

    columns = list(records[0].keys())
    if "id" not in columns:
        return
    columns.remove("id")

    set_clause = ", ".join([f'"{col}" = %s' for col in columns])
    update_sql = f"""
        UPDATE {schema}.{table}
        SET {set_clause}
        WHERE "id" = %s
    """

    values = []
    for rec in records:
        row_values = [rec[c] for c in columns]  # поля для UPDATE
        row_values.append(rec["id"])            # id в WHERE
        values.append(row_values)

    conn = None
    cursor = None
    try:
        conn = psycopg2.connect(**pg_conn_params)
        cursor = conn.cursor()
        extras.execute_batch(cursor, update_sql, values, page_size=100)
        conn.commit()
    except Exception as e:
        logger.error(f"Ошибка при UPDATE: {e}")
        raise
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

def insert_partition(iterator, pg_conn_params, schema, table):
    records = list(iterator)
    if not records:
        return

    columns = list(records[0].keys())
    cols_sql = ", ".join([f'"{c}"' for c in columns])
    placeholders = ", ".join(["%s"] * len(columns))
    insert_sql = f"""
        INSERT INTO {schema}.{table} ({cols_sql})
        VALUES ({placeholders})
    """

    values = []
    for rec in records:
        row_values = [rec[c] for c in columns]
        values.append(row_values)

    conn = None
    cursor = None
    try:
        conn = psycopg2.connect(**pg_conn_params)
        cursor = conn.cursor()
        extras.execute_batch(cursor, insert_sql, values, page_size=100)
        conn.commit()
    except Exception as e:
        logger.error(f"Ошибка при INSERT: {e}")
        raise
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# ------------------------------------------------------------------
# 1) Получаем max_load_dttm из таблицы загрузок
# 2) Читаем f_orders, d_craftsmans, d_customers, d_products
# 3) Фильтруем changed_f_orders, changed_d_craftsmans, ...
# 4) По изменившимся измерениям определяем "затронутые" факты,
#    union с changed_f_orders -> impacted_f_orders_df
# 5) Агрегируем impacted_f_orders_df (при join используем актуальные d_* таблицы целиком)
# 6) UPDATE / INSERT витрины
# 7) Добавляем запись о загрузке в load_dates
# ------------------------------------------------------------------

# ---------- (1) max_load_dttm ----------
load_dates_df = read_table("dwh", "load_dates_craftsman_report_datamart")
max_load_dttm = load_dates_df.selectExpr("MAX(load_dttm) as max_dt").first()["max_dt"]
logger.info(f"Последняя дата обновления витрины: {max_load_dttm}")

# ---------- (2) Чтение таблиц ----------
f_orders_df     = read_table("dwh", "f_orders")
d_craftsmans_df = read_table("dwh", "d_craftsmans")
d_customers_df  = read_table("dwh", "d_customers")
d_products_df   = read_table("dwh", "d_products")

# ---------- (3) Фильтруем изменившиеся строки ----------
def filter_by_load_dttm(df, max_dt):
    if max_dt:
        return df.filter(col("load_dttm") > lit(max_dt))
    else:
        # Первая загрузка - все строки "новые"
        return df

changed_f_orders_df     = filter_by_load_dttm(f_orders_df,     max_load_dttm)
changed_craftsmans_df   = filter_by_load_dttm(d_craftsmans_df, max_load_dttm)
changed_customers_df    = filter_by_load_dttm(d_customers_df,  max_load_dttm)
changed_products_df     = filter_by_load_dttm(d_products_df,   max_load_dttm)

cnt_f  = changed_f_orders_df.count()
cnt_dc = changed_craftsmans_df.count()
cnt_du = changed_customers_df.count()
cnt_dp = changed_products_df.count()

logger.info(f"Изменений: f_orders={cnt_f}, craftsmans={cnt_dc}, customers={cnt_du}, products={cnt_dp}")

# Если вообще нет изменений - выходим
if (cnt_f + cnt_dc + cnt_du + cnt_dp) == 0:
    logger.info("Нет новых/изменённых данных. Выходим.")
    spark.stop()
    raise SystemExit(0)

# ---------- (4) Определяем "затронутые" факты ----------
# Начнём с фактов, которые сами изменились
impacted_f_orders_df = changed_f_orders_df

# a) мастера
if cnt_dc > 0:
    # берём все changed_craftsmans_df -> вытаскиваем их craftsmans_id
    changed_craftsman_ids = changed_craftsmans_df.select("craftsman_id").distinct()
    # подтягиваем из f_orders все факты по этим id
    # (не обязательно фильтровать по load_dttm, т.к. факт мог не меняться,
    #  но измерение поменялось - нужно пересчитать)
    impacted_from_craftsmans = (
        f_orders_df
        .join(changed_craftsman_ids, "craftsman_id", how="inner")
    )
    impacted_f_orders_df = impacted_f_orders_df.unionByName(impacted_from_craftsmans)

# b) customers
if cnt_du > 0:
    changed_customer_ids = changed_customers_df.select("customer_id").distinct()
    impacted_from_customers = (
        f_orders_df
        .join(changed_customer_ids, "customer_id", how="inner")
    )
    impacted_f_orders_df = impacted_f_orders_df.unionByName(impacted_from_customers)

# c) products
if cnt_dp > 0:
    changed_product_ids = changed_products_df.select("product_id").distinct()
    impacted_from_products = (
        f_orders_df
        .join(changed_product_ids, "product_id", how="inner")
    )
    impacted_f_orders_df = impacted_f_orders_df.unionByName(impacted_from_products)

# Убираем дубликаты заказов, чтобы не считать один и тот же order_id несколько раз
# (зависит от вашей схемы, если order_id уникален в рамках source_system_code).
dedup_columns = ["order_id", "source_system_code"] if "source_system_code" in impacted_f_orders_df.columns else ["order_id"]
impacted_f_orders_df = impacted_f_orders_df.dropDuplicates(dedup_columns)

cnt_impacted = impacted_f_orders_df.count()
logger.info(f"Итоговое кол-во затронутых фактов: {cnt_impacted}")

if cnt_impacted == 0:
    logger.info("Нет фактов, затронутых изменениями (возможно данные удалили?). Выходим.")
    spark.stop()
    raise SystemExit(0)

# ---------- (5) Агрегируем только impacted_f_orders_df ----------
# При этом для join с измерениями берём их *актуальную* (полную) версию,
# чтобы получить последние данные по craftsmans, products, ...
from pyspark.sql.functions import date_trunc

impacted_f_orders_df = impacted_f_orders_df.withColumn(
    "report_period", date_trunc("month", col("order_created_date"))
)

# добавляем возраст клиента
impacted_f_orders_df = impacted_f_orders_df.join(
    d_customers_df.select("customer_id", "customer_birthday"),
    on="customer_id", how="left"
).withColumn(
    "customer_age",
    floor(months_between(col("order_created_date"), col("customer_birthday")) / 12)
)

joined_impacted_df = (
    impacted_f_orders_df
    .join(d_craftsmans_df, "craftsman_id", how="left")
    .join(d_products_df,   "product_id",   how="left")
)

aggregated_data_df = (
    joined_impacted_df
    .groupBy("craftsman_id", "report_period")
    .agg(
        expr("MAX(craftsman_name) AS craftsman_name"),
        expr("MAX(craftsman_address) AS craftsman_address"),
        expr("MAX(craftsman_birthday) AS craftsman_birthday"),
        expr("MAX(craftsman_email) AS craftsman_email"),
        coalesce(
            sum(when(col("order_status") == "done", col("product_price") * 0.9)),
            lit(0)
        ).alias("craftsman_money"),
        coalesce(
            sum(when(col("order_status") == "done", col("product_price") * 0.1)),
            lit(0)
        ).alias("platform_money"),
        count("order_id").alias("count_order"),
        coalesce(
            avg(when(col("order_status") == "done", col("product_price"))),
            lit(0)
        ).alias("avg_price_order"),
        coalesce(
            avg(col("customer_age")),
            lit(0)
        ).alias("avg_age_customer"),
        coalesce(
            percentile_approx(
                (col("order_completion_date").cast("long") - col("order_created_date").cast("long"))/86400,
                0.5
            ),
            lit(0)
        ).alias("median_time_order_completed"),
        expr("MAX(product_type) AS top_product_category"),
        count(when(col("order_status") == "created", 1)).alias("count_order_created"),
        count(when(col("order_status") == "in progress", 1)).alias("count_order_in_progress"),
        count(when(col("order_status") == "delivery", 1)).alias("count_order_delivery"),
        count(when(col("order_status") == "done", 1)).alias("count_order_done"),
        count(when(col("order_status") != "done", 1)).alias("count_order_not_done")
    )
)

aggregated_data_df = aggregated_data_df.withColumn(
    "report_period",
    date_format(col("report_period"), "yyyy-MM")
)

cnt_agg = aggregated_data_df.count()
logger.info(f"Подготовлено {cnt_agg} агрегированных записей для обновления витрины.")

# ---------- (6) UPDATE / INSERT в dwh.craftsman_report_datamart ----------
datamart_df = read_table("dwh", "craftsman_report_datamart")

join_cond = [
    aggregated_data_df["craftsman_id"] == datamart_df["craftsman_id"],
    aggregated_data_df["report_period"] == datamart_df["report_period"]
]
joined_dm = aggregated_data_df.alias("new").join(
    datamart_df.alias("old"), join_cond, how="left"
)

to_update_df = (
    joined_dm
    .filter(col("old.id").isNotNull())
    .select(
        col("old.id").alias("id"),
        col("new.craftsman_id"),
        col("new.craftsman_name"),
        col("new.craftsman_address"),
        col("new.craftsman_birthday"),
        col("new.craftsman_email"),
        col("new.craftsman_money"),
        col("new.platform_money"),
        col("new.count_order"),
        col("new.avg_price_order"),
        col("new.avg_age_customer"),
        col("new.median_time_order_completed"),
        col("new.top_product_category"),
        col("new.count_order_created"),
        col("new.count_order_in_progress"),
        col("new.count_order_delivery"),
        col("new.count_order_done"),
        col("new.count_order_not_done"),
        col("new.report_period")
    )
)

to_insert_df = (
    joined_dm
    .filter(col("old.id").isNull())
    .select(
        col("new.craftsman_id"),
        col("new.craftsman_name"),
        col("new.craftsman_address"),
        col("new.craftsman_birthday"),
        col("new.craftsman_email"),
        col("new.craftsman_money"),
        col("new.platform_money"),
        col("new.count_order"),
        col("new.avg_price_order"),
        col("new.avg_age_customer"),
        col("new.median_time_order_completed"),
        col("new.top_product_category"),
        col("new.count_order_created"),
        col("new.count_order_in_progress"),
        col("new.count_order_delivery"),
        col("new.count_order_done"),
        col("new.count_order_not_done"),
        col("new.report_period")
    )
)

cnt_update = to_update_df.count()
cnt_insert = to_insert_df.count()
logger.info(f"К обновлению: {cnt_update}, к вставке: {cnt_insert}.")

# --- UPDATE
to_update_rdd = to_update_df.rdd.map(lambda row: row.asDict())
to_update_rdd.foreachPartition(
    lambda part: update_partition(
        part, pg_conn_params, schema="dwh", table="craftsman_report_datamart"
    )
)

# --- INSERT
to_insert_rdd = to_insert_df.rdd.map(lambda row: row.asDict())
to_insert_rdd.foreachPartition(
    lambda part: insert_partition(
        part, pg_conn_params, schema="dwh", table="craftsman_report_datamart"
    )
)

logger.info("Частичный пересчёт витрины (UPDATE + INSERT) завершён.")

# ---------- (7) Добавляем запись о времени загрузки ----------
current_time = [(datetime.now(),)]
load_dates_new_df = spark.createDataFrame(current_time, ["load_dttm"])
load_dates_new_df.write.jdbc(
    url=jdbc_url,
    table="dwh.load_dates_craftsman_report_datamart",
    mode="append",
    properties=spark_jdbc_properties
)
logger.info("Таблица дат загрузок обновлена (append).")

spark.stop()
logger.info("SparkSession остановлен.")