In [None]:
!pip install clickhouse-driver

In [None]:
from clickhouse_driver import Client
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import sum as _sum, desc, col, avg, year, month, count, lag, round as _round

In [None]:
client = Client(
    host='clickhouse-server',   # имя сервиса из docker-compose
    port=9000,                  # native TCP‑порт
    user='default',
    password=''                 # если у вас нет пароля
)

In [None]:
packages = [
    "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0",
    "com.clickhouse:clickhouse-client:0.7.0",
    "com.clickhouse:clickhouse-http-client:0.7.0",
    "org.apache.httpcomponents.client5:httpclient5:5.2.1"
]

spark = (
    SparkSession.builder
      .master("spark://spark-master:7077")
      .appName("ToClickHouseViaConnector")
      .config("spark.jars.packages", ",".join(packages))
      .config("spark.driver.bindAddress", "0.0.0.0")
      .config("spark.driver.port",        "7078")
      .config("spark.blockManager.port",  "7079")
      .config("spark.sql.catalog.ClickHouse1",    "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.ClickHouse1.host",      "clickhouse-server")
      .config("spark.sql.catalog.ClickHouse1.http_port", "8123")
      .getOrCreate()
)

In [None]:
#print("Deploy mode   :", spark.sparkContext.deployMode)
print("Master URL    :", spark.sparkContext.master)

In [None]:
jdbc_url_pg = "jdbc:postgresql://postgres_db:5432/abd_lr2"
jdbc_props_pg = {
    "user": "postgres",
    "password": "14LgiN",
    "driver": "org.postgresql.Driver"
}

ch_url = "jdbc:clickhouse://clickhouse-server:9000/default"
ch_props = {
    "user": "default",
    "password": "",
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

In [None]:
client.execute("""
CREATE TABLE IF NOT EXISTS default.product_stats (
    product_id          UInt32,
    total_sold          UInt64,
    avg_rating          Float32,
    total_reviews       UInt64,
    product_category_id UInt32,
    total_revenue       Float64
) ENGINE = MergeTree()
  ORDER BY (product_id)
""")

client.execute("""
CREATE TABLE IF NOT EXISTS default.client_stats (
    client_id           UInt32,
    total_spent         Float64,
    avg_check           Float64,
    country_id          UInt32,
    clients_in_country  UInt64
) ENGINE = MergeTree()
  ORDER BY (country_id, client_id)
""")

client.execute("""
CREATE TABLE IF NOT EXISTS default.sales_trends (
    period                String,
    total_revenue         Float64,
    prev_period_revenue   Float64,
    revenue_change_pct    Float32,
    avg_order_price        Float64,
    is_monthly            UInt8
) ENGINE = MergeTree()
  ORDER BY (period, is_monthly)
""")

client.execute("""
CREATE TABLE IF NOT EXISTS default.store_stats (
    store_id         UInt32,
    city_id          UInt32,
    country_id       UInt32,
    total_revenue    Float64,
    city_revenue     Float64,
    country_revenue  Float64,
    avg_check        Float64
) ENGINE = MergeTree()
  ORDER BY total_revenue
""")

client.execute("""
CREATE TABLE IF NOT EXISTS default.supplier_stats (
    supplier_name_id UInt32,
    supplier_name    String,
    country_name     String,
    total_revenue    Float64,
    avg_product_price Float64
) ENGINE = MergeTree()
  ORDER BY total_revenue
""")

client.execute("""
CREATE TABLE IF NOT EXISTS default.product_rating_stats (
    product_id           UInt32,
    avg_rating           Float32,
    total_reviews        UInt64,
    total_sold           UInt64,
    rating_sales_corr    Float32
) ENGINE = MergeTree()
  ORDER BY product_id
""")

In [None]:
sales_df    = spark.read.jdbc(jdbc_url_pg, "public.sales",    properties=jdbc_props_pg)
products_df = spark.read.jdbc(jdbc_url_pg, "public.products", properties=jdbc_props_pg)

prod_sales = (
    sales_df
      .groupBy("sale_product_id")
      .agg(_sum("sale_quantity").alias("total_sold"))
)

prod_cat = products_df.select(
    col("product_id"), col("product_category_id")
)
revenue_by_cat = (
    sales_df
      .join(prod_cat,
            sales_df.sale_product_id == prod_cat.product_id,
            how="inner")
      .groupBy("product_category_id")
      .agg(_sum("sale_total_price").alias("total_revenue"))
)

rating_reviews = (
    products_df
      .groupBy("product_id")
      .agg(
         avg("product_rating").alias("avg_rating"),
         _sum("product_reviews").alias("total_reviews")
      )
)

stats_df = (
    prod_sales
      .join(rating_reviews,
            prod_sales.sale_product_id == rating_reviews.product_id,
            how="left")
      .join(products_df.select("product_category_id"),
            prod_sales.sale_product_id == products_df.product_id,
            how="left")\
      .join(revenue_by_cat, "product_category_id", how="left")
      .select(
         prod_sales.sale_product_id.alias("product_id"),
         col("total_sold"),
         col("avg_rating"),
         col("total_reviews"),
         col("product_category_id"),
         col("total_revenue")
      )
      .orderBy(desc("total_sold"))
)

In [None]:
stats_df.writeTo("ClickHouse1.default.product_stats").append()

In [None]:
sales_df     = spark.read.jdbc(jdbc_url_pg, "public.sales",     properties=jdbc_props_pg)
customers_df = spark.read.jdbc(jdbc_url_pg, "public.customers", properties=jdbc_props_pg)
countries_df = spark.read.jdbc(jdbc_url_pg, "public.countries", properties=jdbc_props_pg)


client_agg = (
    sales_df
      .groupBy("sale_customer_id")
      .agg(
          _sum("sale_total_price").alias("total_spent"),
          _sum("sale_quantity").alias("num_purchases")
      )
      .withColumn("avg_check", col("total_spent") / col("num_purchases"))
      .withColumnRenamed("sale_customer_id", "client_id")
)

client_with_country = (
    client_agg
      .join(customers_df.select("customer_id","customer_country_id"), client_agg.client_id == customers_df.customer_id, how="left")
      .select(
          "client_id","total_spent","avg_check","customer_country_id"
      )
)

country_counts = (
    customers_df
      .groupBy("customer_country_id")
      .agg(count("*").alias("clients_in_country"))
)

stats_df = (
    client_with_country
      .join(country_counts, "customer_country_id", how="left")
      .select(
          "client_id",
          "total_spent",
          "avg_check",
          col("customer_country_id").alias("country_id"),
          col("clients_in_country")
      )
      .orderBy(desc("total_spent"))
)

In [None]:
stats_df.writeTo("ClickHouse1.default.client_stats").append()

In [None]:
sales_df = spark.read.jdbc(url=jdbc_url_pg, table="public.sales", properties=jdbc_props_pg)
 .withColumn("year",  year("sale_date")) \
 .withColumn("month", month("sale_date"))

yearly = (
    sales_df
      .groupBy("year")
      .agg(
         _sum("sale_total_price").alias("total_revenue"),
         _round(_sum("sale_total_price")/count("*"), 2).alias("avg_order_price")
      )
)

w_year = Window.orderBy("year")
yearly = (
    yearly
      .withColumn("prev_year_revenue", lag("total_revenue", 1).over(w_year))
      .withColumn("revenue_change_pct", _round((col("total_revenue") - col("prev_year_revenue")) / col("prev_year_revenue") * 100, 2))
      .withColumn("period_type",       col("year").cast("string"))
      .withColumn("period_granularity", _round(lit(0),0))
      .select(
          col("period_type").alias("period"),
          col("total_revenue"),
          col("prev_year_revenue"),
          col("revenue_change_pct"),
          col("avg_order_price")
      )
)

monthly = (
    sales_df
      .groupBy("year","month")
      .agg(
         _sum("sale_total_price").alias("total_revenue"),
         _round(_sum("sale_total_price")/count("*"), 2).alias("avg_order_price")
      )
      .withColumn("period",     (col("year")*100 + col("month")).cast("string"))  # e.g. "202507"
      .withColumn("prev_period_revenue",
          lag("total_revenue",1).over(
            Window.orderBy("year","month")
          )
      )
      .withColumn(
          "revenue_change_pct",
          _round(
            (col("total_revenue") - col("prev_period_revenue"))
            / col("prev_period_revenue") * 100, 
          2)
      )
      .select(
         "period","year","month",
         "total_revenue","prev_period_revenue",
         "revenue_change_pct","avg_order_price"
      )
)

yearly_std = yearly.select(
    col("period"),
    col("total_revenue"),
    col("prev_year_revenue").alias("prev_period_revenue"),
    col("revenue_change_pct"),
    col("avg_order_price"),
    lit(None).cast("UInt8").alias("is_monthly")   # 0 = yearly
)

monthly_std = monthly.select(
    col("period"),
    col("total_revenue"),
    col("prev_period_revenue"),
    col("revenue_change_pct"),
    col("avg_order_price"),
    lit(1).cast("UInt8").alias("is_monthly")      # 1 = monthly
)

stats_df = yearly_std.unionByName(monthly_std)

In [None]:
stats_df.writeTo("ClickHouse1.default.sales_trends").append()

In [None]:
sales_df     = spark.read.jdbc(jdbc_url_pg, "public.sales",     properties=jdbc_props_pg)
stores_df    = spark.read.jdbc(jdbc_url_pg, "public.stores",    properties=jdbc_props_pg)
cities_df    = spark.read.jdbc(jdbc_url_pg, "public.cities",    properties=jdbc_props_pg)
countries_df = spark.read.jdbc(jdbc_url_pg, "public.countries", properties=jdbc_props_pg)


store_agg = (
    sales_df
      .groupBy("sale_store_id")
      .agg(
          _sum("sale_total_price").alias("total_revenue"),
          avg("sale_total_price").alias("avg_check")
      )
      .withColumnRenamed("sale_store_id", "store_id")
)

city_totals = (
    sales_df
      .join(stores_df.select(
                  col("store_id"),
                  col("store_city_id")
          ), sales_df.sale_store_id == stores_df.store_id)
      .groupBy("store_city_id")
      .agg(_sum("sale_total_price").alias("city_revenue"))
)
country_totals = (
    sales_df
      .join(stores_df.select(
                  col("store_id"),
                  col("store_country_id")
          ), sales_df.sale_store_id == stores_df.store_id)
      .groupBy("store_country_id")
      .agg(_sum("sale_total_price").alias("city_revenue"))
)

stats_df = (
    store_agg
      .join(stores_df.select(
                col("store_id"),
                col("store_city_id").alias("city_id"),
                col("store_country_id").alias("country_id")
             ), "store_id", how="left")
      .join(city_totals,    stats_df.city_name    == city_totals.store_city_id,    "left")
      .join(country_totals, stats_df.country_name == country_totals.store_country_id, "left")
      .select(
          "store_id",
          "city_id",
          "country_id",
          col("total_revenue"),
          col("city_revenue"),
          col("country_revenue"),
          col("avg_check")
      )
      .orderBy(desc("total_revenue"))
)

In [None]:
stats_df.writeTo("ClickHouse1.default.store_stats").append()

In [None]:
sales_df          = spark.read.jdbc(jdbc_url_pg, "public.sales",     properties=jdbc_props_pg)
suppliers_df      = spark.read.jdbc(jdbc_url_pg, "public.suppliers", properties=jdbc_props_pg)
supplier_names_df = spark.read.jdbc(jdbc_url_pg, "public.supplier_names", properties=jdbc_props_pg)
products_df       = spark.read.jdbc(jdbc_url_pg, "public.products",  properties=jdbc_props_pg)
countries_df      = spark.read.jdbc(jdbc_url_pg, "public.countries", properties=jdbc_props_pg)


base = (
    sales_df
      .join(suppliers_df, sales_df.sale_supplier_contact_id == suppliers_df.supplier_contact_id, how="inner")
      .join(supplier_names_df, "supplier_name_id", how="inner")
      .select(
          col("supplier_name_id"),
          col("supplier_name_description").alias("supplier_name"),
          suppliers_df.supplier_country_id.alias("country_id"),
          col("sale_product_id"),
          col("sale_total_price")
      )
)

rev = (
    base
      .groupBy("supplier_name_id","supplier_name","country_id")
      .agg(_sum("sale_total_price").alias("total_revenue"))
)

avg_price = (
    base
      .join(products_df.select("product_id","product_price"), base.sale_product_id == products_df.product_id)
      .groupBy("supplier_name_id")
      .agg(avg("product_price").alias("avg_product_price"))
)

rev = rev.join(countries_df.select("country_id","country_name"), "country_id", how="left")

stats_df = (
    rev
      .join(avg_price, "supplier_name_id", how="left")
      .select(
          "supplier_name_id",
          "supplier_name",
          "country_name",
          col("total_revenue"),
          col("avg_product_price")
      )
      .orderBy(desc("total_revenue"))
)

In [None]:
stats_df.writeTo("ClickHouse1.default.supplier_stats").append()

In [None]:
products_df = spark.read.jdbc(jdbc_url_pg, "public.products", properties=jdbc_props_pg)
sales_df    = spark.read.jdbc(jdbc_url_pg, "public.sales",    properties=jdbc_props_pg)


ratings = products_df.select(
    col("product_id"),
    col("product_rating").alias("avg_rating"),
    col("product_reviews").alias("total_reviews")
)

sales_agg = sales_df.groupBy("sale_product_id") \
    .agg(_sum("sale_quantity").alias("total_sold")) \
    .withColumnRenamed("sale_product_id","product_id")

df = ratings.join(sales_agg, "product_id", how="left") \
   .na.fill({"total_sold":0})

corr_val = df.stat.corr("avg_rating","total_sold") or 0.0

df_final = df.withColumn("rating_sales_corr", col("avg_rating") * 0 + col(lit(corr_val)))

df_sorted = df_final.orderBy(col("avg_rating").desc())
df_top_reviews = df_final.orderBy(col("total_reviews").desc())

In [None]:
df_final.writeTo("ClickHouse1.default.supplier_stats").append()

In [None]:
spark.stop()