In [11]:
from pyspark.sql.functions import col, lit, current_timestamp, expr, when, avg
from pyspark.sql.types import TimestampType


In [2]:
from pyspark.sql import SparkSession

# Инициализируем Spark Session с добавлением PostgreSQL JDBC драйвера
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("JupyterSparkCluster") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.4") \
    .getOrCreate()
spark

In [3]:
# Параметры подключения к PostgreSQL
jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
connection_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}


# Заполнение таблиц измерений и фактов в DWH

In [4]:
def load_table(spark, jdbc_url, connection_properties, source_table, target_table, selected_columns, mode='append'):
    """
    Загружает данные из исходной таблицы в целевую таблицу DWH с выборкой необходимых колонок.

    :param spark: Объект SparkSession.
    :param jdbc_url: JDBC URL для подключения к PostgreSQL.
    :param connection_properties: Словарь с параметрами подключения (user, password, driver).
    :param source_table: Имя исходной таблицы (схема.таблица).
    :param target_table: Имя целевой таблицы в DWH (схема.таблица).
    :param selected_columns: Список колонок для выборки из исходной таблицы.
    :param mode: Режим записи ('overwrite' или 'append'). По умолчанию 'append'.
    """

    # Чтение данных из исходной таблицы
    df = spark.read.jdbc(
        url=jdbc_url,
        table=source_table,
        properties=connection_properties
    )
    
    # Выбор необходимых колонок
    df_selected = df.select(*selected_columns)
    
    # Добавление колонки с временем загрузки
    df_selected = df_selected.withColumn("load_dttm", current_timestamp())
    
    # Запись данных в целевую таблицу
    df_selected.write.jdbc(
        url=jdbc_url,
        table=target_table,
        mode=mode,
        properties=connection_properties
    )
    
    print(f"Таблица {target_table} успешно загружена.")
    return df_selected
        

In [5]:
# Параметры для загрузки dwh.d_craftsmans
source_table_craftsmans = "source3.craft_market_craftsmans"
target_table_craftsmans = "dwh.d_craftsmans"
selected_columns_craftsmans = [
    # "craftsman_id",
    "craftsman_name",
    "craftsman_address",
    "craftsman_birthday",
    "craftsman_email"
]

# Вызов функции для загрузки dwh.d_craftsmans
dwh_d_craftsmans = load_table(
    spark=spark,
    jdbc_url=jdbc_url,
    connection_properties=connection_properties,
    source_table=source_table_craftsmans,
    target_table=target_table_craftsmans,
    selected_columns=selected_columns_craftsmans,
    mode='append'
)

Таблица dwh.d_craftsmans успешно загружена.


In [6]:
# Параметры для загрузки dwh.d_customers
source_table_customers = "source3.craft_market_customers"
target_table_customers = "dwh.d_customers"
selected_columns_customers = [
    # "customer_id",
    "customer_name",
    "customer_address",
    "customer_birthday",
    "customer_email"
]

# Вызов функции для загрузки dwh.d_customers
dwh_d_customers = load_table(
    spark=spark,
    jdbc_url=jdbc_url,
    connection_properties=connection_properties,
    source_table=source_table_customers,
    target_table=target_table_customers,
    selected_columns=selected_columns_customers,
    mode='append'
)

Таблица dwh.d_customers успешно загружена.


In [7]:
dwh_d_customers.head(5)

[Row(customer_name='Anna-maria Lamba', customer_address='8 Prairieview Alley', customer_birthday=datetime.date(1990, 7, 27), customer_email='alamba1@ted.com', load_dttm=datetime.datetime(2024, 12, 26, 4, 5, 13, 700602)),
 Row(customer_name='Kim Coonihan', customer_address='3 Hanson Center', customer_birthday=datetime.date(1992, 5, 26), customer_email='kcoonihan2@angelfire.com', load_dttm=datetime.datetime(2024, 12, 26, 4, 5, 13, 700602)),
 Row(customer_name='Lanny Vasse', customer_address='9 Westridge Alley', customer_birthday=datetime.date(2003, 6, 12), customer_email='lvasse3@pbs.org', load_dttm=datetime.datetime(2024, 12, 26, 4, 5, 13, 700602)),
 Row(customer_name='Alice Treneman', customer_address='68462 Meadow Valley Drive', customer_birthday=datetime.date(1995, 7, 26), customer_email='atreneman4@cmu.edu', load_dttm=datetime.datetime(2024, 12, 26, 4, 5, 13, 700602)),
 Row(customer_name='Flo Morant', customer_address='2453 Crescent Oaks Avenue', customer_birthday=datetime.date(1997

In [8]:
# Параметры для загрузки dwh.d_products
source_table_products = "source2.craft_market_masters_products"  # Предполагается, что products находятся здесь
target_table_products = "dwh.d_products"
selected_columns_products = [
    # "product_id",
    "product_name",
    "product_description",
    "product_type",
    "product_price"
]

# Вызов функции для загрузки dwh.d_products
dwh_d_products = load_table(
    spark=spark,
    jdbc_url=jdbc_url,
    connection_properties=connection_properties,
    source_table=source_table_products,
    target_table=target_table_products,
    selected_columns=selected_columns_products,
    mode='append'
)

Таблица dwh.d_products успешно загружена.


In [9]:
source_table_orders = "source1.craft_market_wide"
target_table_orders = "dwh.f_orders"
selected_columns_orders = [
    "order_created_date",
    "order_completion_date",
    "order_status",
    "customer_id",
    "craftsman_id",
    "product_id",
    "order_id",
]

# Вызов функции для загрузки dwh.f_orders
dwh_f_orders = load_table(
    spark=spark,
    jdbc_url=jdbc_url,
    connection_properties=connection_properties,
    source_table=source_table_orders,
    target_table=target_table_orders,
    selected_columns=selected_columns_orders,
    mode='append'  # Используем 'append' для добавления данных
)

Таблица dwh.f_orders успешно загружена.


# Инкрементальная Загрузка Витрины Данных

In [27]:
from pyspark.sql.functions import col, lit, current_timestamp, expr, count, floor
from pyspark.sql.types import TimestampType
from pyspark.sql import Window
import pyspark.sql.functions as F

def load_datamart_incremental(spark, jdbc_url, connection_properties, source_table, load_dates_table, target_datamart_table, report_period):
    """
    Инкрементально загружает данные в витрину данных dwh.craftsman_report_datamart.
    
    :param spark: SparkSession
    :param jdbc_url: JDBC URL для подключения к PostgreSQL.
    :param connection_properties: Словарь с параметрами подключения (user, password, driver).
    :param source_table: Таблица фактов (dwh.f_orders).
    :param load_dates_table: Таблица для отслеживания дат загрузки (dwh.load_dates_craftsman_report_datamart).
    :param target_datamart_table: Витрина данных (dwh.craftsman_report_datamart).
    :param report_period: Отчетный период в формате 'YYYY-MM'.
    """
    # Шаг 1: Получаем последнюю дату загрузки
    last_load_date_df = spark.read.jdbc(
        url=jdbc_url,
        table=load_dates_table,
        properties=connection_properties
    ).orderBy(col("load_dttm").desc()).limit(1)
    
    if last_load_date_df.count() > 0:
        last_load_date = last_load_date_df.collect()[0]['load_dttm']
        print(f"Последняя дата загрузки: {last_load_date}")
    else:
        last_load_date = lit('1970-01-01').cast(TimestampType())  # Начальное значение
        print("Таблица load_dates пуста. Загружаем все данные.")
    
    # Шаг 2: Извлекаем новые или обновлённые записи из таблицы фактов
    df_new_orders = spark.read.jdbc(
        url=jdbc_url,
        table=source_table,
        properties=connection_properties
    ).filter(col("load_dttm") > last_load_date)
    
    if df_new_orders.count() == 0:
        print("Нет новых данных для загрузки.")
        return
    
    print(f"Загружаем {df_new_orders.count()} новых записей из таблицы фактов.")
    
    # Шаг 3: Объединяем данные с таблицами измерений
    df_joined = df_new_orders \
        .join(
            spark.read.jdbc(url=jdbc_url, table="dwh.d_craftsmans", properties=connection_properties),
            on="craftsman_id",
            how="left"
        ) \
        .join(
            spark.read.jdbc(url=jdbc_url, table="dwh.d_customers", properties=connection_properties),
            on="customer_id",
            how="left"
        ) \
        .join(
            spark.read.jdbc(url=jdbc_url, table="dwh.d_products", properties=connection_properties),
            on="product_id",
            how="left"
        )
    
    # Шаг 4: Выполняем агрегации для расчёта метрик, исключая top_product_category
    df_aggregated = df_joined.groupBy(
        "craftsman_id", 
        "craftsman_name", 
        "craftsman_address", 
        "craftsman_birthday", 
        "craftsman_email"
    ).agg(
        (F.sum("product_price") * 0.9).alias("craftsman_money"),  # 10% платформа
        (F.sum("product_price") * 0.1).alias("platform_money"),
        F.count("order_id").alias("count_order"),
        F.avg("product_price").alias("avg_price_order"),
        F.avg(F.floor(F.months_between(F.current_date(), F.col("customer_birthday")) / 12)).alias("avg_age_customer"),
        F.expr("percentile_approx(datediff(order_completion_date, order_created_date), 0.5)").alias("median_time_order_completed"),
        F.count(F.when(col("order_status") == "created", 1)).alias("count_order_created"),
        F.count(F.when(col("order_status") == "in progress", 1)).alias("count_order_in_progress"),
        F.count(F.when(col("order_status") == "delivery", 1)).alias("count_order_delivery"),
        F.count(F.when(col("order_status") == "done", 1)).alias("count_order_done"),
        F.count(F.when(~col("order_status").isin("done"), 1)).alias("count_order_not_done")
    )
    
    # Шаг 5: Вычисляем top_product_category отдельно
    # Считаем количество каждого product_type для каждого craftsman_id
    df_product_counts = df_joined.groupBy("craftsman_id", "product_type") \
        .agg(F.count("*").alias("product_type_count"))
    
    # Определяем самое частое product_type для каждого craftsman_id
    window_spec = Window.partitionBy("craftsman_id").orderBy(F.desc("product_type_count"))
    
    df_top_product = df_product_counts.withColumn("rank", F.row_number().over(window_spec)) \
        .filter(F.col("rank") == 1) \
        .select("craftsman_id", F.col("product_type").alias("top_product_category"))
    
    # Шаг 6: Объединяем агрегированные данные с top_product_category
    df_final = df_aggregated.join(df_top_product, on="craftsman_id", how="left") \
        .withColumn("report_period", lit(report_period)) \
        # .withColumn("load_dttm", F.current_timestamp())
    return df_final
    
    # Шаг 7: Записываем агрегированные данные в витрину данных
    df_final.write.jdbc(
        url=jdbc_url,
        table=target_datamart_table,
        mode='append',
        properties=connection_properties
    )
    
    print(f"Витрина данных {target_datamart_table} успешно обновлена.")
    
    # Шаг 8: Обновляем таблицу дат загрузки
    df_load_date = spark.range(1).withColumn("load_dttm", current_timestamp()).select("load_dttm")
    df_load_date.write.jdbc(
        url=jdbc_url,
        table=load_dates_table,
        mode='append',
        properties=connection_properties
    )
    
    print(f"Таблица {load_dates_table} обновлена.")
    return df_final, df_load_date


In [28]:
# Параметры для загрузки витрины данных
source_table_for_datamart = "dwh.f_orders"  # Источник данных для витрины
load_dates_table = "dwh.load_dates_craftsman_report_datamart"
target_datamart_table = "dwh.craftsman_report_datamart"
report_period = "2021-03"  # Формат: 'ГГГГ-ММ'

# Вызов функции для инкрементальной загрузки витрины данных
dwh_load_dates_craftsman_report_datamart, dwh_craftsman_report_datamart = load_datamart_incremental(
    spark=spark,
    jdbc_url=jdbc_url,
    connection_properties=connection_properties,
    source_table=source_table_for_datamart,
    load_dates_table=load_dates_table,
    target_datamart_table=target_datamart_table,
    report_period=report_period
)

Таблица load_dates пуста. Загружаем все данные.
Загружаем 999 новых записей из таблицы фактов.
Витрина данных dwh.craftsman_report_datamart успешно обновлена.


PySparkTypeError: [CANNOT_INFER_TYPE_FOR_FIELD] Unable to infer the type of the field `load_dttm`.