In [None]:
import re
from pathlib import Path

def normalize_line(entry: str) -> str:
    entry = entry.rstrip('\r\n')
    entry = re.sub(r"\s{2,}", " ", entry)
    entry = re.sub(r"\s*,\s*", ",", entry)
    entry = re.sub(r"\s*=\s*", "=", entry)
    return entry


def merge_broken_lines(source: Path, destination: Path) -> None:
    start_pattern = re.compile(r"^\d+,")

    assembled = []
    current_parts = []

    with source.open('r', encoding='utf-8') as f_in:
        for raw_line in f_in:
            clean = normalize_line(raw_line)
            if start_pattern.match(clean):
                if current_parts:
                    assembled.append("".join(current_parts))
                current_parts = [clean]
            else:
                current_parts.append(clean)

    if current_parts:
        assembled.append("".join(current_parts))

    destination.parent.mkdir(parents=True, exist_ok=True)
    with destination.open('w', encoding='utf-8', newline='\n') as f_out:
        f_out.write("\n".join(assembled))


def main():

    source_dir = Path("./исходные данные")
    target_dir = Path("./после парсинга")
    file_template = "MOCK_DATA*.csv"

    csv_list = sorted(source_dir.glob(file_template))
    if not csv_list:
        print(f"Файлы по шаблону '{file_template}' не найдены в {source_dir}")
        return

    for idx, src_file in enumerate(csv_list, start=1):
        output_name = f"parsed_{idx}.csv"
        dest_file = target_dir / output_name
        print(f"Обрабатывается: {src_file} -> {dest_file}")
        merge_broken_lines(src_file, dest_file)

    print("Парсинг завершён.")


if __name__ == "__main__":
    main()


In [None]:
!pip install psycopg2-binary

In [None]:
import os
import glob
import psycopg2

connection_settings = {
    "host":     os.getenv("DB_HOST", "postgres"),
    "port":     os.getenv("DB_PORT", "5432"),
    "dbname":   os.getenv("DB_NAME", "spark_db"),
    "user":     os.getenv("DB_USER", "spark_user"),
    "password": os.getenv("DB_PASSWORD", "spark_password"),
}

connection = psycopg2.connect(**connection_settings)
cursor = connection.cursor()

WORKSPACE_DIR = os.getenv("WORK_DIR", ".")


def read_text_file(file_path: str) -> str:

    for encoding in ("utf-8-sig", "utf-8", "cp1251", "latin-1"):
        try:
            with open(file_path, "r", encoding=encoding) as f:
                return f.read()
        except UnicodeDecodeError:
            continue
    raise UnicodeDecodeError(f"Не удалось декодировать файл: {file_path}")


def apply_schema() -> None:
 
    ddl_file = os.path.join(WORKSPACE_DIR, "init.sql")
    ddl_statements = read_text_file(ddl_file)
    cursor.execute(ddl_statements)
    connection.commit()
    print("Структура БД обновлена (DDL выполнен).")


def import_csv_files() -> None:

    pattern = os.path.join(PROCESSED_DIR, "после парсинга", "parsed_mock*.csv")
    csv_list = sorted(glob.glob(pattern))

    if not csv_list:
        print(f"Файлы по шаблону '{pattern}' не найдены.")
        return

    for csv_path in csv_list:
        print(f"Импортируем: {csv_path}")
        with open(csv_path, "r", encoding="utf-8") as f:
            cursor.copy_expert("COPY mock_data FROM STDIN WITH CSV HEADER", f)

    connection.commit()
    print("Все CSV-файлы загружены в mock_data.")


def main():
    apply_schema()
    import_csv_files()


if __name__ == "__main__":
    main()


In [None]:


from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_date, year, quarter, month,
    dayofmonth, date_format
)


def initialize_postgres_session():
    session = (
        SparkSession.builder
        .appName("Postgres_ETL_StarSchema")
        .config("spark.jars", "postgresql-42.6.0.jar")
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
        .getOrCreate()
    )
    return session


def load_raw_data(spark_sess, url, properties):
    df = (
        spark_sess.read
        .format("jdbc")
        .option("url", url)
        .option("dbtable", "mock_data")
        .options(**properties)
        .load()
    )
    return df


def refresh_countries_dimension(raw_df, url, props):
    customers = raw_df.select(col("customer_country").alias("country_name"))
    sellers = raw_df.select(col("seller_country").alias("country_name"))
    stores = raw_df.select(col("store_country").alias("country_name"))
    suppliers = raw_df.select(col("supplier_country").alias("country_name"))

    extracted = (
        customers.union(sellers)
                 .union(stores)
                 .union(suppliers)
                 .where(col("country_name").isNotNull())
                 .distinct()
    )

    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_countries", properties=props
    )

    to_add = extracted.join(existing, ["country_name"], "left_anti") \
                      .dropDuplicates(["country_name"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_countries")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_countries", mode="append", properties=props)


def refresh_cities_dimension(raw_df, url, props):
    stores = raw_df.select(col("store_city").alias("city_name"))
    suppliers = raw_df.select(col("supplier_city").alias("city_name"))

    extracted = (
        stores.union(suppliers)
              .where(col("city_name").isNotNull())
              .distinct()
    )

    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_cities", properties=props
    )

    to_add = extracted.join(existing, ["city_name"], "left_anti") \
                      .dropDuplicates(["city_name"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_cities")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_cities", mode="append", properties=props)


def refresh_dates_dimension(raw_df, url, props):
    sales = raw_df.select(to_date("sale_date", "MM/dd/yyyy").alias("full_date"))
    releases = raw_df.select(to_date("product_release_date", "MM/dd/yyyy").alias("full_date"))
    expiries = raw_df.select(to_date("product_expiry_date", "MM/dd/yyyy").alias("full_date"))

    extracted = (
        sales.union(releases)
             .union(expiries)
             .where(col("full_date").isNotNull())
             .distinct()
             .withColumn("year", year("full_date"))
             .withColumn("quarter", quarter("full_date"))
             .withColumn("month", month("full_date"))
             .withColumn("day", dayofmonth("full_date"))
             .withColumn("weekday", date_format("full_date", "EEEE"))
    )

    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_dates", properties=props
    )

    to_add = extracted.join(existing, ["full_date"], "left_anti") \
                      .dropDuplicates(["full_date"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_dates")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_dates", mode="append", properties=props)


def refresh_simple_lookup(raw_df, url, props):
    mapping = [
        ("customer_pet_type", "dim_pet_types", "pet_type_name"),
        ("customer_pet_breed", "dim_pet_breeds", "pet_breed_name"),
        ("pet_category", "dim_pet_categories", "pet_category_name"),
    ]

    for src_col, table_name, alias in mapping:
        extracted = (
            raw_df.select(col(src_col).alias(alias))
                  .where(col(alias).isNotNull())
                  .distinct()
        )

        existing = raw_df.sparkSession.read.jdbc(
            url=url, table=table_name, properties=props
        )

        to_add = extracted.join(existing, [alias], "left_anti") \
                          .dropDuplicates([alias])
        add_count = to_add.count()
        print(f"Будет вставлено {add_count} новых записей в {table_name}")

        if add_count > 0:
            to_add.write.jdbc(url=url, table=table_name, mode="append", properties=props)


def refresh_pets_dimension(raw_df, url, props):
    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_pets", properties=props
    ).select("pet_name", "pet_type_id").dropDuplicates(["pet_name", "pet_type_id"])

    base = (
        raw_df
        .select(
            col("customer_pet_name").alias("pet_name"),
            col("customer_pet_type").alias("pet_type_name"),
            col("customer_pet_breed").alias("pet_breed_name"),
            col("pet_category").alias("pet_category_name")
        )
        .where(col("pet_name").isNotNull())
        .distinct()
    )

    pet_types = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_pet_types", properties=props
    )
    pet_breeds = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_pet_breeds", properties=props
    )
    pet_cats = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_pet_categories", properties=props
    )

    prepared = (
        base
        .join(pet_types,  base.pet_type_name  == pet_types.pet_type_name,   "left")
        .join(pet_breeds, base.pet_breed_name == pet_breeds.pet_breed_name, "left")
        .join(pet_cats,   base.pet_category_name == pet_cats.pet_category_name, "left")
        .select(
            col("pet_name"),
            col("pet_type_id"),
            col("pet_breed_id"),
            col("pet_category_id")
        )
        .distinct()
    )

    to_add = prepared.join(existing, ["pet_name", "pet_type_id"], "left_anti") \
                     .dropDuplicates(["pet_name", "pet_type_id"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_pets")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_pets", mode="append", properties=props)


def refresh_suppliers_dimension(raw_df, url, props):
    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_suppliers", properties=props
    ).select("supplier_email").dropDuplicates(["supplier_email"])

    base = (
        raw_df
        .select(
            col("supplier_name"),
            col("supplier_contact").alias("contact_person"),
            col("supplier_email"),
            col("supplier_phone"),
            col("supplier_address"),
            col("supplier_city").alias("city_name"),
            col("supplier_country").alias("country_name")
        )
        .where(col("supplier_email").isNotNull())
        .distinct()
    )

    cities = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_cities", properties=props
    )
    countries = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_countries", properties=props
    )

    prepared = (
        base
        .join(cities,   base.city_name    == cities.city_name,   "left")
        .join(countries, base.country_name == countries.country_name, "left")
        .select(
            col("supplier_name"),
            col("contact_person"),
            col("supplier_email"),
            col("supplier_phone"),
            col("supplier_address"),
            col("city_id"),
            col("country_id")
        )
    )

    to_add = prepared.join(existing, ["supplier_email"], "left_anti") \
                     .dropDuplicates(["supplier_email"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_suppliers")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_suppliers", mode="append", properties=props)


def refresh_product_dimensions(raw_df, url, props):
    lookup_info = [
        ("product_category", "category_name", "dim_product_categories"),
        ("product_color",    "color_name",    "dim_product_colors"),
        ("product_size",     "size_name",     "dim_product_sizes"),
        ("product_brand",    "brand_name",    "dim_product_brands"),
        ("product_material", "material_name", "dim_product_materials"),
    ]

    for src_col, alias, table_name in lookup_info:
        extracted = (
            raw_df.select(col(src_col).alias(alias))
                  .where(col(alias).isNotNull())
                  .distinct()
        )

        existing = raw_df.sparkSession.read.jdbc(
            url=url, table=table_name, properties=props
        )

        to_add = extracted.join(existing, [alias], "left_anti") \
                          .dropDuplicates([alias])
        add_count = to_add.count()
        print(f"Будет вставлено {add_count} новых записей в {table_name}")

        if add_count > 0:
            to_add.write.jdbc(url=url, table=table_name, mode="append", properties=props)

    prod_cats = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_product_categories", properties=props
    )
    colors = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_product_colors", properties=props
    )
    sizes = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_product_sizes", properties=props
    )
    brands = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_product_brands", properties=props
    )
    materials = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_product_materials", properties=props
    )
    suppliers = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_suppliers", properties=props
    )
    dates_dim = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_dates", properties=props
    )

    dates_release = dates_dim.select(
        col("date_id").alias("release_date_id"),
        col("full_date").alias("release_full_date")
    )
    dates_expiry = dates_dim.select(
        col("date_id").alias("expiry_date_id"),
        col("full_date").alias("expiry_full_date")
    )

    raw_products = (
        raw_df
        .select(
            col("product_name"),
            col("supplier_email"),
            col("product_category").alias("category_name"),
            col("product_price").alias("price"),
            col("product_weight").alias("weight"),
            col("product_color").alias("color_name"),
            col("product_size").alias("size_name"),
            col("product_brand").alias("brand_name"),
            col("product_material").alias("material_name"),
            col("product_description").alias("description"),
            col("product_rating").alias("rating"),
            col("product_reviews").alias("reviews"),
            to_date("product_release_date", "MM/dd/yyyy").alias("release_full_date"),
            to_date("product_expiry_date", "MM/dd/yyyy").alias("expiry_full_date"),
        )
        .where(col("product_name").isNotNull())
        .distinct()
    )

    enriched = (
        raw_products
        .join(prod_cats, raw_products.category_name == prod_cats.category_name, "left")
        .join(colors,    raw_products.color_name    == colors.color_name,    "left")
        .join(sizes,     raw_products.size_name     == sizes.size_name,     "left")
        .join(brands,    raw_products.brand_name    == brands.brand_name,    "left")
        .join(materials, raw_products.material_name == materials.material_name,"left")
        .join(suppliers, raw_products.supplier_email == suppliers.supplier_email,"left")
        .join(dates_release, raw_products.release_full_date == dates_release.release_full_date, "left")
        .join(dates_expiry,  raw_products.expiry_full_date  == dates_expiry.expiry_full_date,  "left")
        .select(
            col("product_name"),
            col("supplier_id"),
            col("category_id"),
            col("price"),
            col("weight"),
            col("color_id"),
            col("size_id"),
            col("brand_id"),
            col("material_id"),
            col("description"),
            col("rating"),
            col("reviews"),
            col("release_date_id"),
            col("expiry_date_id")
        )
        .distinct()
    )

    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_products", properties=props
    ).select("product_name", "supplier_id").dropDuplicates(["product_name", "supplier_id"])

    to_add = enriched.join(existing, ["product_name", "supplier_id"], "left_anti") \
                     .dropDuplicates(["product_name", "supplier_id"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_products")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_products", mode="append", properties=props)


def refresh_customers_dimension(raw_df, url, props):
    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_customers", properties=props
    ).select("email").dropDuplicates(["email"])

    base = (
        raw_df
        .select(
            col("customer_first_name").alias("first_name"),
            col("customer_last_name").alias("last_name"),
            col("customer_age").alias("age"),
            col("customer_email").alias("email"),
            col("customer_country").alias("country_name"),
            col("customer_postal_code").alias("postal_code"),
            col("customer_pet_name").alias("pet_name"),
        )
        .where(col("email").isNotNull())
        .distinct()
    )

    countries = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_countries", properties=props
    )
    pets = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_pets", properties=props
    )

    enriched = (
        base
        .join(countries, base.country_name == countries.country_name, "left")
        .join(pets, base.pet_name == pets.pet_name, "left")
        .select(
            col("first_name"),
            col("last_name"),
            col("age"),
            col("email"),
            col("country_id"),
            col("postal_code"),
            col("pet_id")
        )
    )

    to_add = enriched.join(existing, ["email"], "left_anti") \
                     .dropDuplicates(["email"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_customers")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_customers", mode="append", properties=props)


def refresh_sellers_dimension(raw_df, url, props):
    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_sellers", properties=props
    ).select("email").dropDuplicates(["email"])

    base = (
        raw_df
        .select(
            col("seller_first_name").alias("first_name"),
            col("seller_last_name").alias("last_name"),
            col("seller_email").alias("email"),
            col("seller_country").alias("country_name"),
            col("seller_postal_code").alias("postal_code"),
        )
        .where(col("email").isNotNull())
        .distinct()
    )

    countries = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_countries", properties=props
    )

    enriched = (
        base
        .join(countries, base.country_name == countries.country_name, "left")
        .select(
            col("first_name"),
            col("last_name"),
            col("email"),
            col("country_id"),
            col("postal_code")
        )
    )

    to_add = enriched.join(existing, ["email"], "left_anti") \
                     .dropDuplicates(["email"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_sellers")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_sellers", mode="append", properties=props)


def refresh_stores_dimension(raw_df, url, props):
    existing = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_stores", properties=props
    ).select("store_name", "location").dropDuplicates(["store_name", "location"])

    base = (
        raw_df
        .select(
            col("store_name"),
            col("store_location").alias("location"),
            col("store_city").alias("city_name"),
            col("store_state").alias("state"),
            col("store_country").alias("country_name"),
            col("store_phone").alias("phone"),
            col("store_email").alias("email")
        )
        .where(col("store_name").isNotNull())
        .distinct()
    )

    cities = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_cities", properties=props
    )
    countries = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_countries", properties=props
    )

    enriched = (
        base
        .join(cities, base.city_name == cities.city_name, "left")
        .join(countries, base.country_name == countries.country_name, "left")
        .select(
            col("store_name"),
            col("location"),
            col("city_id"),
            col("state"),
            col("country_id"),
            col("phone"),
            col("email")
        )
    )

    to_add = enriched.join(existing, ["store_name", "location"], "left_anti") \
                     .dropDuplicates(["store_name", "location"])
    add_count = to_add.count()
    print(f"Будет вставлено {add_count} новых записей в dim_stores")

    if add_count > 0:
        to_add.write.jdbc(url=url, table="dim_stores", mode="append", properties=props)


def refresh_fact_sales(raw_df, url, props):
    dates = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_dates", properties=props
    ).select("full_date", "date_id").dropDuplicates(["full_date"])

    customers = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_customers", properties=props
    ).select("email", "customer_id").dropDuplicates(["email"])

    sellers = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_sellers", properties=props
    ).select("email", "seller_id").dropDuplicates(["email"])

    products = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_products", properties=props
    ).select("product_name", "product_id").dropDuplicates(["product_name"])

    stores = raw_df.sparkSession.read.jdbc(
        url=url, table="dim_stores", properties=props
    ).select("store_name", "store_id").dropDuplicates(["store_name"])

    facts_base = (
        raw_df
        .withColumn("sale_date_dt", to_date("sale_date", "MM/dd/yyyy"))
        .select(
            col("sale_date_dt").alias("full_date"),
            col("customer_email"),
            col("seller_email"),
            col("product_name"),
            col("store_name"),
            col("sale_quantity").alias("quantity"),
            col("sale_total_price").alias("total_price")
        )
    )

    joined = (
        facts_base
        .join(dates, facts_base.full_date == dates.full_date, "inner")
        .join(customers, facts_base.customer_email == customers.email, "inner")
        .join(sellers, facts_base.seller_email == sellers.email, "inner")
        .join(products, facts_base.product_name == products.product_name, "inner")
        .join(stores, facts_base.store_name == stores.store_name, "inner")
        .select(
            col("date_id"),
            col("customer_id"),
            col("seller_id"),
            col("product_id"),
            col("store_id"),
            col("quantity"),
            col("total_price")
        )
    )

    total_facts = joined.count()
    print(f"Будет вставлено {total_facts} фактов в fact_sales")

    if total_facts > 0:
        joined.write.jdbc(url=url, table="fact_sales", mode="append", properties=props)


def initialize_clickhouse_session():
    session = (
        SparkSession.builder
        .appName("Spark_ETL_Reports_ClickHouse")
        .config("spark.jars", "postgresql-42.6.0.jar,clickhouse-jdbc-0.4.6.jar")
        .getOrCreate()
    )
    return session


def main():
    postgres_url = "jdbc:postgresql://postgres:5432/spark_db"
    postgres_props = {
        "user": "spark_user",
        "password": "spark_password",
        "driver": "org.postgresql.Driver"
    }

    pg_session = initialize_postgres_session()
    source_df = load_raw_data(pg_session, postgres_url, postgres_props)

    refresh_countries_dimension(source_df, postgres_url, postgres_props)
    refresh_cities_dimension(source_df, postgres_url, postgres_props)
    refresh_dates_dimension(source_df, postgres_url, postgres_props)
    refresh_simple_lookup(source_df, postgres_url, postgres_props)
    refresh_pets_dimension(source_df, postgres_url, postgres_props)
    refresh_suppliers_dimension(source_df, postgres_url, postgres_props)
    refresh_product_dimensions(source_df, postgres_url, postgres_props)
    refresh_customers_dimension(source_df, postgres_url, postgres_props)
    refresh_sellers_dimension(source_df, postgres_url, postgres_props)
    refresh_stores_dimension(source_df, postgres_url, postgres_props)
    refresh_fact_sales(source_df, postgres_url, postgres_props)

    pg_session.stop()

    ch_session = initialize_clickhouse_session()
    ch_session.stop()


if __name__ == "__main__":
    main()


In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

def create_spark_session():
    return (
        SparkSession.builder
        .appName("ETL_Reports_ClickHouse")
        .config("spark.jars", "postgresql-42.6.0.jar,clickhouse-jdbc-0.4.6.jar")
        .getOrCreate()
    )

def load_dimensions(spark, pg_url, pg_props):
    fact_sales    = spark.read.jdbc(url=pg_url, table="fact_sales",    properties=pg_props)
    dim_products  = spark.read.jdbc(url=pg_url, table="dim_products",  properties=pg_props)
    dim_customers = spark.read.jdbc(url=pg_url, table="dim_customers", properties=pg_props)
    dim_dates     = spark.read.jdbc(url=pg_url, table="dim_dates",     properties=pg_props)
    dim_stores    = spark.read.jdbc(url=pg_url, table="dim_stores",    properties=pg_props)
    dim_suppliers = spark.read.jdbc(url=pg_url, table="dim_suppliers", properties=pg_props)
    dim_countries = spark.read.jdbc(url=pg_url, table="dim_countries", properties=pg_props)
    dim_cities    = spark.read.jdbc(url=pg_url, table="dim_cities",    properties=pg_props)
    return {
        "fact_sales":    fact_sales,
        "dim_products":  dim_products,
        "dim_customers": dim_customers,
        "dim_dates":     dim_dates,
        "dim_stores":    dim_stores,
        "dim_suppliers": dim_suppliers,
        "dim_countries": dim_countries,
        "dim_cities":    dim_cities,
    }

def write_to_clickhouse(df, table_name, order_cols, ch_url, ch_props):
    df.write.format("jdbc") \
       .option("url", ch_url) \
       .option("dbtable", table_name) \
       .options(**{
           **ch_props,
           "createTableOptions": f"ENGINE = MergeTree() ORDER BY ({','.join(order_cols)})"
       }) \
       .mode("overwrite") \
       .save()
    print(f"{table_name}: {df.count()} rows")

def sales_by_product(dimensions, ch_url, ch_props):
    fact_sales   = dimensions["fact_sales"]
    dim_products = dimensions["dim_products"]

    prod_metrics = (
        fact_sales
          .join(dim_products, "product_id")
          .select("product_id", "product_name", "category_id", "quantity", "total_price", "rating", "reviews")
    )

    top10_products = (
        prod_metrics
          .groupBy("product_id", "product_name")
          .agg(F.sum("quantity").alias("units_sold"))
          .orderBy(F.col("units_sold").desc())
          .limit(10)
    )

    revenue_by_category = (
        prod_metrics
          .groupBy("category_id")
          .agg(F.sum("total_price").alias("revenue"))
    )

    rating_reviews = (
        prod_metrics
          .groupBy("product_id", "product_name")
          .agg(
             F.avg("rating").alias("avg_rating"),
             F.sum("reviews").alias("total_reviews")
          )
    )

    write_to_clickhouse(top10_products,      "top10_products",      ["units_sold"],      ch_url, ch_props)
    write_to_clickhouse(revenue_by_category, "revenue_by_category", ["category_id"],     ch_url, ch_props)
    write_to_clickhouse(rating_reviews,      "rating_reviews",      ["product_id"],      ch_url, ch_props)

def sales_by_customer(dimensions, ch_url, ch_props):
    fact_sales    = dimensions["fact_sales"]
    dim_customers = dimensions["dim_customers"]
    dim_countries = dimensions["dim_countries"]

    top10_customers = (
        fact_sales
          .join(dim_customers, "customer_id")
          .groupBy("customer_id", "first_name", "last_name")
          .agg(F.sum("total_price").alias("total_spent"))
          .orderBy(F.col("total_spent").desc())
          .limit(10)
    )

    customers_by_country = (
        fact_sales
          .join(dim_customers, "customer_id")
          .join(dim_countries, ["country_id"], "left")
          .groupBy("country_name")
          .agg(F.countDistinct("customer_id").alias("unique_customers"))
    )

    avg_check_per_customer = (
        fact_sales
          .groupBy("customer_id")
          .agg((F.sum("total_price") / F.count("*")).alias("avg_check"))
    )

    write_to_clickhouse(top10_customers,        "top10_customers",        ["total_spent"],      ch_url, ch_props)
    write_to_clickhouse(customers_by_country,   "customers_by_country",   ["country_name"],     ch_url, ch_props)
    write_to_clickhouse(avg_check_per_customer, "avg_check_per_customer", ["customer_id"],      ch_url, ch_props)

def sales_by_time(dimensions, ch_url, ch_props):
    fact_sales = dimensions["fact_sales"]
    dim_dates   = dimensions["dim_dates"].withColumnRenamed("full_date", "sale_date")

    monthly_trends = (
        fact_sales
          .join(dim_dates, "date_id")
          .withColumn("year",  F.year("sale_date"))
          .withColumn("month", F.month("sale_date"))
          .groupBy("year", "month")
          .agg(F.sum("total_price").alias("revenue"))
    )

    yearly_revenue = (
        monthly_trends
          .groupBy("year")
          .agg(F.sum("revenue").alias("yearly_revenue"))
    )

    avg_order_size_by_month = (
        fact_sales
          .join(dim_dates, "date_id")
          .withColumn("year",  F.year("sale_date"))
          .withColumn("month", F.month("sale_date"))
          .groupBy("year", "month")
          .agg((F.sum("total_price") / F.count("*")).alias("avg_order_size"))
    )

    write_to_clickhouse(monthly_trends,          "monthly_trends",          ["year", "month"], ch_url, ch_props)
    write_to_clickhouse(yearly_revenue,          "yearly_revenue",          ["year"],          ch_url, ch_props)
    write_to_clickhouse(avg_order_size_by_month, "avg_order_size_by_month", ["year", "month"], ch_url, ch_props)

def sales_by_store(dimensions, ch_url, ch_props):
    fact_sales = dimensions["fact_sales"]
    dim_stores  = dimensions["dim_stores"]
    dim_cities  = dimensions["dim_cities"]
    dim_countries = dimensions["dim_countries"]

    top5_stores = (
        fact_sales
          .join(dim_stores, "store_id")
          .groupBy("store_id", "store_name")
          .agg(F.sum("total_price").alias("revenue"))
          .orderBy(F.col("revenue").desc())
          .limit(5)
    )

    sales_by_city_country = (
        fact_sales
          .join(dim_stores, "store_id")
          .join(dim_cities,    ["city_id"],    "left")
          .join(dim_countries, ["country_id"], "left")
          .groupBy("city_name", "country_name")
          .agg(F.sum("total_price").alias("revenue"))
    )

    avg_check_per_store = (
        fact_sales
          .groupBy("store_id")
          .agg((F.sum("total_price") / F.count("*")).alias("avg_check"))
    )

    write_to_clickhouse(top5_stores,           "top5_stores",           ["revenue"],         ch_url, ch_props)
    write_to_clickhouse(sales_by_city_country, "sales_by_city_country", ["city_name"],       ch_url, ch_props)
    write_to_clickhouse(avg_check_per_store,   "avg_check_per_store",   ["store_id"],        ch_url, ch_props)

def sales_by_supplier(dimensions, ch_url, ch_props):
    fact_sales    = dimensions["fact_sales"]
    dim_products  = dimensions["dim_products"]
    dim_suppliers = dimensions["dim_suppliers"]
    dim_countries = dimensions["dim_countries"]

    sales_with_supp = fact_sales.join(dim_products.select("product_id", "supplier_id"), "product_id")

    top5_suppliers = (
        sales_with_supp
          .groupBy("supplier_id")
          .agg(F.sum("total_price").alias("revenue"))
          .orderBy(F.col("revenue").desc())
          .limit(5)
    )

    avg_price_by_supplier = (
        sales_with_supp
          .groupBy("supplier_id")
          .agg((F.sum("total_price") / F.sum("quantity")).alias("avg_price"))
    )

    sales_by_supplier_country = (
        sales_with_supp
          .join(dim_suppliers,  "supplier_id")
          .join(dim_countries, ["country_id"], "left")
          .groupBy("country_name")
          .agg(F.sum("total_price").alias("revenue"))
    )

    write_to_clickhouse(top5_suppliers,            "top5_suppliers",            ["revenue"],    ch_url, ch_props)
    write_to_clickhouse(avg_price_by_supplier,     "avg_price_by_supplier",     ["supplier_id"],ch_url, ch_props)
    write_to_clickhouse(sales_by_supplier_country, "sales_by_supplier_country", ["country_name"],ch_url, ch_props)

def product_quality(dimensions, ch_url, ch_props):
    fact_sales   = dimensions["fact_sales"]
    dim_products = dimensions["dim_products"]

    quality = (
        fact_sales
          .join(dim_products, "product_id")
          .select("product_id", "product_name", "rating", "reviews", "quantity")
    )

    window_desc = Window.orderBy(F.col("rating").desc())
    window_asc  = Window.orderBy(F.col("rating").asc())

    highest_rating = (
        quality
          .withColumn("rn", row_number().over(window_desc))
          .filter(F.col("rn") == 1)
          .drop("rn")
    )

    lowest_rating = (
        quality
          .withColumn("rn", row_number().over(window_asc))
          .filter(F.col("rn") == 1)
          .drop("rn")
    )

    corr_val = quality.stat.corr("rating", "quantity")

    most_reviewed = (
        quality
          .groupBy("product_id", "product_name")
          .agg(F.sum("reviews").alias("total_reviews"))
          .orderBy(F.col("total_reviews").desc())
          .limit(10)
    )

    write_to_clickhouse(highest_rating, "highest_rating", ["rating"],        ch_url, ch_props)
    write_to_clickhouse(lowest_rating,  "lowest_rating",  ["rating"],        ch_url, ch_props)
    write_to_clickhouse(most_reviewed,  "most_reviewed",  ["total_reviews"], ch_url, ch_props)

    print(f"rating↔units_sold correlation: {corr_val:.4f}")


spark = create_spark_session()
pg_url = "jdbc:postgresql://postgres:5432/spark_db"
pg_props = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "org.postgresql.Driver"
}
ch_url = "jdbc:clickhouse://clickhouse:8123/default"
ch_props = {
    "driver":   "com.clickhouse.jdbc.ClickHouseDriver",
    "user":     "custom_user",
    "password": "custom_password",
}
dims = load_dimensions(spark, pg_url, pg_props)
sales_by_product(dims, ch_url, ch_props)
sales_by_customer(dims, ch_url, ch_props)
sales_by_time(dims, ch_url, ch_props)
sales_by_store(dims, ch_url, ch_props)
sales_by_supplier(dims, ch_url, ch_props)
product_quality(dims, ch_url, ch_props)
spark.stop()
