In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, year, month, dayofmonth, quarter, row_number, desc, first, corr, count, lit, concat_ws, avg
from pyspark.sql.window import Window

# Работа с ETL процессами

## 1. Подключение с помощью PySpark и проверка данных из `mock_data`

Нужно преобразовать данные из Postgres `mock_data` в `снежинку`.

In [2]:
# Инициализация SparkSession с драйвером PostgreSQL
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("ETL to Star") \
    .getOrCreate()

# Чтение данных из PostgreSQL
pg_url = "jdbc:postgresql://postgres:5432/bober_db"
pg_properties = {"user": "bober", "password": "bober", "driver": "org.postgresql.Driver"}
df = spark.read.jdbc(url=pg_url, table="mock_data", properties=pg_properties)

# Проверка чтения данных
df.head(1)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/18 13:41:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/18 13:41:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

[Row(id=1, customer_first_name='Barron', customer_last_name='Rawlyns', customer_age=61, customer_email='bmassingham0@army.mil', customer_country='China', customer_postal_code=None, customer_pet_type='cat', customer_pet_name='Priscella', customer_pet_breed='Labrador Retriever', seller_first_name='Bevan', seller_last_name='Massingham', seller_email='bmassingham0@answers.com', seller_country='Indonesia', seller_postal_code=None, product_name='Dog Food', product_category='Food', product_price=Decimal('77.97'), product_quantity=89, sale_date=datetime.date(2021, 5, 14), sale_customer_id=1, sale_seller_id=1, sale_product_id=1, sale_quantity=4, sale_total_price=Decimal('487.70'), store_name='Youopia', store_location='Suite 75', store_city='Xichehe', store_state=None, store_country='United States', store_phone='564-244-8660', store_email='bmassingham0@networkadvertising.org', pet_category='Cats', product_weight=Decimal('13.40'), product_color='Indigo', product_size='Medium', product_brand='Skaj

## 2. Создаем модель данных снежинку

In [None]:
# ===================================================================
# 1. dim_date (surrogate key — date_id)
# ===================================================================
dim_date = df.select(col("sale_date").alias("full_date")) \
    .distinct() \
    .filter(col("full_date").isNotNull()) \
    .withColumn("date_id", row_number().over(Window.orderBy("full_date"))) \
    .withColumn("year", year("full_date")) \
    .withColumn("month", month("full_date")) \
    .withColumn("day", dayofmonth("full_date")) \
    .withColumn("quarter", quarter("full_date"))

dim_date.write.jdbc(url=pg_url, table="dim_date", mode="overwrite", properties=pg_properties)

# ===================================================================
# 2. dim_customer (natural key — sale_customer_id, предполагаем уникальность)
# ===================================================================
dim_customer = df.select(
    col("sale_customer_id").alias("customer_id"),
    col("customer_first_name").alias("first_name"),
    col("customer_last_name").alias("last_name"),
    col("customer_age").alias("age"),
    col("customer_email").alias("email"),
    col("customer_country").alias("country"),
    col("customer_postal_code").alias("postal_code")
).distinct()

dim_customer.write.jdbc(url=pg_url, table="dim_customer", mode="overwrite", properties=pg_properties)

# ===================================================================
# 3. dim_seller (natural key — sale_seller_id)
# ===================================================================
dim_seller = df.select(
    col("sale_seller_id").alias("seller_id"),
    col("seller_first_name").alias("first_name"),
    col("seller_last_name").alias("last_name"),
    col("seller_email").alias("email"),
    col("seller_country").alias("country"),
    col("seller_postal_code").alias("postal_code")
).distinct()

dim_seller.write.jdbc(url=pg_url, table="dim_seller", mode="overwrite", properties=pg_properties)

# ===================================================================
# 4. dim_product (natural key — sale_product_id)
# ===================================================================
dim_product = df.select(
    col("sale_product_id").alias("product_id"),
    col("product_name").alias("name"),
    col("product_category").alias("category"),          # или pet_category — выбирай то, что подходит
    col("product_price").alias("price"),
    col("product_weight").alias("weight"),
    col("product_color").alias("color"),
    col("product_size").alias("size"),
    col("product_brand").alias("brand"),
    col("product_material").alias("material"),
    col("product_description").alias("description"),
    col("product_rating").alias("rating"),
    col("product_reviews").alias("reviews"),
    col("product_release_date").alias("release_date"),
    col("product_expiry_date").alias("expiry_date")
).distinct()

dim_product.write.jdbc(url=pg_url, table="dim_product", mode="overwrite", properties=pg_properties)

# ===================================================================
# 5. dim_store (surrogate key)
# ===================================================================
dim_store_raw = df.select(
    "store_name", "store_location", "store_city",
    "store_state", "store_country", "store_phone", "store_email"
).distinct()

store_window = Window.orderBy("store_name", "store_city", "store_country")
dim_store = dim_store_raw.withColumn("store_id", row_number().over(store_window))
dim_store.write.jdbc(url=pg_url, table="dim_store", mode="overwrite", properties=pg_properties)

# ===================================================================
# 6. dim_supplier (surrogate key)
# ===================================================================
supplier_window = Window.orderBy("supplier_name", "supplier_city", "supplier_country")
dim_supplier = df.select(
    "supplier_name",
    col("supplier_contact").alias("contact"),
    "supplier_email",
    "supplier_phone",
    "supplier_address",
    "supplier_city",
    "supplier_country"
).distinct() \
    .withColumn("supplier_id", row_number().over(supplier_window))

dim_supplier.write.jdbc(url=pg_url, table="dim_supplier", mode="overwrite", properties=pg_properties)

# ===================================================================
# 7. dim_pet (surrogate key, привязка к клиенту)
# ===================================================================
dim_pet_raw = df.select(
    col("sale_customer_id").alias("customer_id"),
    col("customer_pet_type").alias("pet_type"),
    col("customer_pet_name").alias("pet_name"),
    col("customer_pet_breed").alias("pet_breed"),
    col("pet_category").alias("pet_category")        # или просто "category"
).distinct()

# Окно определяем ПОСЛЕ select + alias, чтобы использовать новые имена колонок
pet_window = Window.orderBy("customer_id", "pet_name", "pet_type")
dim_pet = dim_pet_raw.withColumn("pet_id", row_number().over(pet_window))
dim_pet.write.jdbc(url=pg_url, table="dim_pet", mode="overwrite", properties=pg_properties)

# ===================================================================
# 8. fact_sales — собираем всё вместе
# ===================================================================
fact_sales = df \
    .join(dim_date, df.sale_date == dim_date.full_date, "left") \
    .join(dim_store, 
          (df.store_name == dim_store.store_name) &
          (df.store_location == dim_store.store_location) &
          (df.store_city == dim_store.store_city) &
          (df.store_state == dim_store.store_state) &
          (df.store_country == dim_store.store_country) &
          (df.store_phone == dim_store.store_phone) &
          (df.store_email == dim_store.store_email), "left") \
    .join(dim_supplier,
          (df.supplier_name == dim_supplier.supplier_name) &
          (df.supplier_city == dim_supplier.supplier_city) &
          (df.supplier_country == dim_supplier.supplier_country), "left") \
    .join(dim_pet,
          (df.sale_customer_id == dim_pet.customer_id) &
          (df.customer_pet_name == dim_pet.pet_name) &
          (df.customer_pet_type == dim_pet.pet_type), "left") \
    .select(
        col("id").alias("sale_id"),
        col("sale_customer_id").alias("customer_id"),
        col("pet_id"),
        col("sale_seller_id").alias("seller_id"),
        col("sale_product_id").alias("product_id"),
        col("store_id"),
        col("supplier_id"),
        col("date_id"),
        col("sale_quantity").alias("sale_quantity"),
        col("sale_total_price").alias("sale_total_price")
    )

fact_sales.write.jdbc(url=pg_url, table="fact_sales", mode="overwrite", properties=pg_properties)

print("Звёздная схема успешно построена! Проверить можно в DBeaver: SELECT * FROM fact_sales LIMIT 5;")


## 3. Создание витрин в clickhouse

In [3]:
# Инициализация SparkSession с драйвером ClickHouse
# spark = SparkSession.builder \
#     .master("spark://spark-master:7077") \
#     .appName("ETL to Star") \
#     .config("spark.jars", "/opt/spark/jars/clickhouse-jdbc-0.6.0.jar") \
#     .getOrCreate()
    
# spark = SparkSession.builder \
#     .master("spark://spark-master:7077") \
#     .appName("Spark_and_ClickHouse") \
#     .getOrCreate()

spark = SparkSession.builder \
    .appName("ClickHouse ETL") \
    .config("spark.jars",
            "/opt/spark/jars/clickhouse-jdbc-0.6.0.jar,"
            "/opt/spark/jars/clickhouse-spark-connector_2.12-0.8.0.jar") \
    .getOrCreate()

# Настройки подключения к БД и Spark
ch_url = "jdbc:clickhouse://clickhouse:8123/default"
ch_options = {
    "host": "clickhouse",
    "port": "8123",
    "user": "default",
    "password": "",
    "database": "default"
}
# ch_properties = {
#     "driver": "com.clickhouse.jdbc.ClickHouseDriver",
#     "user": "default",
#     "password": ""
# }

25/11/18 13:42:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Загружаем все таблицы звезды
fact = spark.read.jdbc(url=pg_url, table="fact_sales", properties=pg_properties)
dim_product = spark.read.jdbc(url=pg_url, table="dim_product", properties=pg_properties)
dim_customer = spark.read.jdbc(url=pg_url, table="dim_customer", properties=pg_properties)
dim_store = spark.read.jdbc(url=pg_url, table="dim_store", properties=pg_properties)
dim_supplier = spark.read.jdbc(url=pg_url, table="dim_supplier", properties=pg_properties)
dim_date = spark.read.jdbc(url=pg_url, table="dim_date", properties=pg_properties)
dim_date.head(1)

In [9]:
dim_date.head(1)

[Row(full_date=datetime.date(2021, 1, 1), date_id=1, year=2021, month=1, day=1, quarter=1)]

In [13]:
# Проверим какие колонки есть в каждой таблице
print("Колонки fact_sales:", fact.columns)
print("Колонки dim_product:", dim_product.columns)
print("Колонки dim_customer:", dim_customer.columns)
print("Колонки dim_store:", dim_store.columns)
print("Колонки dim_supplier:", dim_supplier.columns)
print("Колонки dim_date:", dim_date.columns)

Колонки fact_sales: ['sale_id', 'customer_id', 'pet_id', 'seller_id', 'product_id', 'store_id', 'supplier_id', 'date_id', 'sale_quantity', 'sale_total_price']
Колонки dim_product: ['product_id', 'name', 'category', 'price', 'weight', 'color', 'size', 'brand', 'material', 'description', 'rating', 'reviews', 'release_date', 'expiry_date']
Колонки dim_customer: ['customer_id', 'first_name', 'last_name', 'age', 'email', 'country', 'postal_code']
Колонки dim_store: ['store_name', 'store_location', 'store_city', 'store_state', 'store_country', 'store_phone', 'store_email', 'store_id']
Колонки dim_supplier: ['supplier_name', 'contact', 'supplier_email', 'supplier_phone', 'supplier_address', 'supplier_city', 'supplier_country', 'supplier_id']
Колонки dim_date: ['full_date', 'date_id', 'year', 'month', 'day', 'quarter']


In [5]:
# ===================================================================
# 1. Витрина продаж по продуктам
# ===================================================================
product_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name, dim_product.category) \
    .agg(
        _sum("sale_quantity").alias("total_quantity"),
        _sum("sale_total_price").alias("total_revenue"),
        first("rating").alias("avg_rating"),
        first("reviews").alias("review_count")
    )
product_vitrina.write.jdbc(url=ch_url, table="vitrina_product_sales", mode="overwrite", properties=ch_properties)


# Топ-10 самых продаваемых (отдельная таблица для удобства проверки)
top10_products = product_vitrina.orderBy(desc("total_quantity")).limit(10)
top10_products.write.jdbc(url=ch_url, table="top10_sold_products", mode="overwrite", properties=ch_properties)


# Выручка по категориям (отдельная таблица)
category_revenue = product_vitrina.groupBy("category") \
    .agg(_sum("total_revenue").alias("category_revenue"))
category_revenue.write.jdbc(url=ch_url, table="category_revenue", mode="overwrite", properties=ch_properties)


# ===================================================================
# 2. Витрина продаж по клиентам
# ===================================================================
customer_vitrina = fact.join(dim_customer, fact.customer_id == dim_customer.customer_id) \
    .groupBy(dim_customer.customer_id, dim_customer.first_name, dim_customer.last_name, dim_customer.country) \
    .agg(
        _sum("sale_total_price").alias("total_spent"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    ) \
    .withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .select("customer_id", "customer_name", "country", "total_spent", "order_count", "avg_check")
customer_vitrina.write.jdbc(url=ch_url, table="vitrina_customer_sales", mode="overwrite", properties=ch_properties)


# Топ-10 клиентов
top10_customers = customer_vitrina.orderBy(desc("total_spent")).limit(10)
top10_customers.write.jdbc(url=ch_url, table="top10_customers_by_spent", mode="overwrite", properties=ch_properties)


# Распределение по странам (отдельная таблица)
customer_country_dist = customer_vitrina.groupBy("country") \
    .agg(
        _sum("total_spent").alias("total_spent_by_country"),
        count("*").alias("customer_count")
    )
customer_country_dist.write.jdbc(url=ch_url, table="customer_country_distribution", mode="overwrite", properties=ch_properties)


# ===================================================================
# 3. Витрина продаж по времени
# ===================================================================
time_vitrina = fact.join(dim_date, fact.date_id == dim_date.date_id) \
    .groupBy(dim_date.year, dim_date.month) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        _sum("sale_quantity").alias("total_quantity"),
        count("*").alias("order_count")
    ) \
    .withColumn("avg_check", col("total_revenue") / col("order_count")) \
    .withColumn("avg_order_size", col("total_quantity") / col("order_count"))
time_vitrina.write.jdbc(url=ch_url, table="vitrina_time_sales", mode="overwrite", properties=ch_properties)


# ===================================================================
# 4. Витрина продаж по магазинам
# ===================================================================
store_vitrina = fact.join(dim_store, fact.store_id == dim_store.store_id) \
    .groupBy(dim_store.store_id, dim_store.store_name, dim_store.city, dim_store.country) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    )
store_vitrina.write.jdbc(url=ch_url, table="vitrina_store_sales", mode="overwrite", properties=ch_properties)


# Топ-5 магазинов
top5_stores = store_vitrina.orderBy(desc("total_revenue")).limit(5)
top5_stores.write.jdbc(url=ch_url, table="top5_stores_by_revenue", mode="overwrite", properties=ch_properties)


# ===================================================================
# 5. Витрина продаж по поставщикам
# ===================================================================
supplier_vitrina = fact.join(dim_product[["product_id", "price"]], fact.product_id == dim_product.product_id) \
    .join(dim_supplier, fact.supplier_id == dim_supplier.supplier_id) \
    .groupBy(dim_supplier.supplier_id, dim_supplier.supplier_name, dim_supplier.country) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        _sum(col("price") * col("sale_quantity")).alias("weighted_price_sum"),
        _sum("sale_quantity").alias("total_quantity")
    ) \
    .withColumn("avg_price", col("weighted_price_sum") / col("total_quantity")) \
    .select("supplier_id", "supplier_name", "country", "total_revenue", "avg_price")
supplier_vitrina.write.jdbc(url=ch_url, table="vitrina_supplier_sales", mode="overwrite", properties=ch_properties)


# Топ-5 поставщиков
top5_suppliers = supplier_vitrina.orderBy(desc("total_revenue")).limit(5)
top5_suppliers.write.jdbc(url=ch_url, table="top5_suppliers_by_revenue", mode="overwrite", properties=ch_properties)


# ===================================================================
# 6. Витрина качества продукции
# ===================================================================
quality_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name) \
    .agg(
        first("rating").alias("rating"),
        first("reviews").alias("review_count"),
        _sum("sale_quantity").alias("total_quantity"),
        _sum("sale_total_price").alias("total_revenue")
    )
quality_vitrina.write.jdbc(url=ch_url, table="vitrina_product_quality", mode="overwrite", properties=ch_properties)


# Корреляция (одна строка — отдельная таблица)
correlation = quality_vitrina.agg(
    corr("rating", "total_revenue").alias("corr_rating_revenue"),
    corr("rating", "total_quantity").alias("corr_rating_quantity")
).withColumn("description", lit("Correlation between rating and sales"))
correlation.write.jdbc(url=ch_url, table="product_quality_correlation", mode="overwrite", properties=ch_properties)


print("Все 6 витрин + топы + корреляция успешно загружены в ClickHouse!")
print("Проверить можно в DBeaver или clickhouse-client:")
print("SELECT * FROM vitrina_product_sales LIMIT 10;")
print("SELECT corr(rating, total_quantity) FROM vitrina_product_quality;")

NameError: name 'ch_properties' is not defined

spark.version

In [6]:
spark.version

'3.5.1'

In [5]:
spark._jvm.Class.forName("clickhouse.DefaultSource")

Py4JJavaError: An error occurred while calling z:java.lang.Class.forName.
: java.lang.ClassNotFoundException: clickhouse.DefaultSource
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Unknown Source)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)


In [6]:
# ===============================
# 1. Витрина продаж по продуктам
# ===============================
product_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name, dim_product.category) \
    .agg(
        _sum("sale_quantity").alias("total_quantity"),
        _sum("sale_total_price").alias("total_revenue"),
        first("rating").alias("avg_rating"),
        first("reviews").alias("review_count")
    )

product_vitrina.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "vitrina_product_sales") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY product_id") \
    .mode("overwrite") \
    .save()

# Топ-10 самых продаваемых
top10_products = product_vitrina.orderBy(desc("total_quantity")).limit(10)
top10_products.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "top10_sold_products") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY product_id") \
    .mode("overwrite") \
    .save()

# Выручка по категориям
category_revenue = product_vitrina.groupBy("category") \
    .agg(_sum("total_revenue").alias("category_revenue"))
category_revenue.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "category_revenue") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY category") \
    .mode("overwrite") \
    .save()

# ===============================
# 2. Витрина продаж по клиентам
# ===============================
customer_vitrina = fact.join(dim_customer, fact.customer_id == dim_customer.customer_id) \
    .groupBy(dim_customer.customer_id, dim_customer.first_name, dim_customer.last_name, dim_customer.country) \
    .agg(
        _sum("sale_total_price").alias("total_spent"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    ) \
    .withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .select("customer_id", "customer_name", "country", "total_spent", "order_count", "avg_check")

customer_vitrina.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "vitrina_customer_sales") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY customer_id") \
    .mode("overwrite") \
    .save()

top10_customers = customer_vitrina.orderBy(desc("total_spent")).limit(10)
top10_customers.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "top10_customers_by_spent") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY customer_id") \
    .mode("overwrite") \
    .save()

customer_country_dist = customer_vitrina.groupBy("country") \
    .agg(
        _sum("total_spent").alias("total_spent_by_country"),
        count("*").alias("customer_count")
    )
customer_country_dist.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "customer_country_distribution") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY country") \
    .mode("overwrite") \
    .save()

# ===============================
# 3. Витрина продаж по времени
# ===============================
time_vitrina = fact.join(dim_date, fact.date_id == dim_date.date_id) \
    .groupBy(dim_date.year, dim_date.month) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        _sum("sale_quantity").alias("total_quantity"),
        count("*").alias("order_count")
    ) \
    .withColumn("avg_check", col("total_revenue") / col("order_count")) \
    .withColumn("avg_order_size", col("total_quantity") / col("order_count"))

time_vitrina.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "vitrina_time_sales") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (year, month)") \
    .mode("overwrite") \
    .save()

# ===============================
# 4. Витрина продаж по магазинам
# ===============================
store_vitrina = fact.join(dim_store, fact.store_id == dim_store.store_id) \
    .groupBy(dim_store.store_id, dim_store.store_name, dim_store.city, dim_store.country) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    )

store_vitrina.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "vitrina_store_sales") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY store_id") \
    .mode("overwrite") \
    .save()

top5_stores = store_vitrina.orderBy(desc("total_revenue")).limit(5)
top5_stores.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "top5_stores_by_revenue") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY store_id") \
    .mode("overwrite") \
    .save()

# ===============================
# 5. Витрина продаж по поставщикам
# ===============================
supplier_vitrina = fact.join(dim_product[["product_id", "price"]], fact.product_id == dim_product.product_id) \
    .join(dim_supplier, fact.supplier_id == dim_supplier.supplier_id) \
    .groupBy(dim_supplier.supplier_id, dim_supplier.supplier_name, dim_supplier.country) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        _sum(col("price") * col("sale_quantity")).alias("weighted_price_sum"),
        _sum("sale_quantity").alias("total_quantity")
    ) \
    .withColumn("avg_price", col("weighted_price_sum") / col("total_quantity")) \
    .select("supplier_id", "supplier_name", "country", "total_revenue", "avg_price")

supplier_vitrina.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "vitrina_supplier_sales") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY supplier_id") \
    .mode("overwrite") \
    .save()

top5_suppliers = supplier_vitrina.orderBy(desc("total_revenue")).limit(5)
top5_suppliers.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "top5_suppliers_by_revenue") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY supplier_id") \
    .mode("overwrite") \
    .save()

# ===============================
# 6. Витрина качества продукции
# ===============================
quality_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name) \
    .agg(
        first("rating").alias("rating"),
        first("reviews").alias("review_count"),
        _sum("sale_quantity").alias("total_quantity"),
        _sum("sale_total_price").alias("total_revenue")
    )

quality_vitrina.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "vitrina_product_quality") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY product_id") \
    .mode("overwrite") \
    .save()

# Корреляция
correlation = quality_vitrina.agg(
    corr("rating", "total_revenue").alias("corr_rating_revenue"),
    corr("rating", "total_quantity").alias("corr_rating_quantity")
).withColumn("description", lit("Correlation between rating and sales"))

correlation.write \
    .format("clickhouse") \
    .options(**ch_options) \
    .option("table", "product_quality_correlation") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY description") \
    .mode("overwrite") \
    .save()

print("Все 6 витрин + топы + корреляция успешно загружены в ClickHouse!")

Py4JJavaError: An error occurred while calling o88.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: clickhouse. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: clickhouse.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(Unknown Source)
	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


In [13]:
# Завершаем сессию Spark
spark.stop()

In [8]:
from pyspark.sql.functions import *
import math

# ===============================
# Настройки подключения к ClickHouse
# ===============================
ch_jdbc_url = "jdbc:clickhouse://clickhouse:8123/default"
ch_properties = {
    "user": "default",
    "password": "",
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

# Батчинг для больших данных
batch_size = 100000

# ===============================
# Вспомогательная функция для записи через JDBC
# ===============================
def write_to_clickhouse(df, table_name, order_by_columns=None):
    """Запись DataFrame в ClickHouse через JDBC с батчингом"""
    
    # Создаем опции для создания таблицы
    create_table_options = ""
    if order_by_columns:
        create_table_options = f"ORDER BY ({', '.join(order_by_columns)})"
    
    try:
        # Записываем данные
        df.write \
            .format("jdbc") \
            .option("url", ch_jdbc_url) \
            .option("dbtable", table_name) \
            .option("user", ch_properties["user"]) \
            .option("password", ch_properties["password"]) \
            .option("driver", ch_properties["driver"]) \
            .option("batchsize", batch_size) \
            .option("createTableOptions", create_table_options) \
            .mode("append") \
            .save()
        
        print(f"✓ Таблица {table_name} успешно создана")
        
    except Exception as e:
        print(f"✗ Ошибка при создании таблицы {table_name}: {e}")

# ===============================
# 1. Витрина продаж по продуктам
# ===============================
print("Создание витрины продаж по продуктам...")
product_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name, dim_product.category) \
    .agg(
        sum("sale_quantity").alias("total_quantity"),
        sum("sale_total_price").alias("total_revenue"),
        first("rating").alias("avg_rating"),
        first("reviews").alias("review_count")
    )

write_to_clickhouse(product_vitrina, "vitrina_product_sales", ["product_id"])

# Топ-10 самых продаваемых
top10_products = product_vitrina.orderBy(desc("total_quantity")).limit(10)
write_to_clickhouse(top10_products, "top10_sold_products", ["product_id"])

# Выручка по категориям
category_revenue = product_vitrina.groupBy("category") \
    .agg(sum("total_revenue").alias("category_revenue"))
write_to_clickhouse(category_revenue, "category_revenue", ["category"])

# ===============================
# 2. Витрина продаж по клиентам
# ===============================
print("Создание витрины продаж по клиентам...")
customer_vitrina = fact.join(dim_customer, fact.customer_id == dim_customer.customer_id) \
    .groupBy(dim_customer.customer_id, dim_customer.first_name, dim_customer.last_name, dim_customer.country) \
    .agg(
        sum("sale_total_price").alias("total_spent"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    ) \
    .withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .select("customer_id", "customer_name", "country", "total_spent", "order_count", "avg_check")

write_to_clickhouse(customer_vitrina, "vitrina_customer_sales", ["customer_id"])

top10_customers = customer_vitrina.orderBy(desc("total_spent")).limit(10)
write_to_clickhouse(top10_customers, "top10_customers_by_spent", ["customer_id"])

customer_country_dist = customer_vitrina.groupBy("country") \
    .agg(
        sum("total_spent").alias("total_spent_by_country"),
        count("*").alias("customer_count")
    )
write_to_clickhouse(customer_country_dist, "customer_country_distribution", ["country"])

# ===============================
# 3. Витрина продаж по времени
# ===============================
print("Создание витрины продаж по времени...")
time_vitrina = fact.join(dim_date, fact.date_id == dim_date.date_id) \
    .groupBy(dim_date.year, dim_date.month) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        sum("sale_quantity").alias("total_quantity"),
        count("*").alias("order_count")
    ) \
    .withColumn("avg_check", col("total_revenue") / col("order_count")) \
    .withColumn("avg_order_size", col("total_quantity") / col("order_count"))

write_to_clickhouse(time_vitrina, "vitrina_time_sales", ["year", "month"])

# ===============================
# 4. Витрина продаж по магазинам
# ===============================
print("Создание витрины продаж по магазинам...")
store_vitrina = fact.join(dim_store, fact.store_id == dim_store.store_id) \
    .groupBy(dim_store.store_id, dim_store.store_name, dim_store.city, dim_store.country) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    )

write_to_clickhouse(store_vitrina, "vitrina_store_sales", ["store_id"])

top5_stores = store_vitrina.orderBy(desc("total_revenue")).limit(5)
write_to_clickhouse(top5_stores, "top5_stores_by_revenue", ["store_id"])

# ===============================
# 5. Витрина продаж по поставщикам
# ===============================
print("Создание витрины продаж по поставщикам...")
supplier_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .join(dim_supplier, fact.supplier_id == dim_supplier.supplier_id) \
    .groupBy(dim_supplier.supplier_id, dim_supplier.supplier_name, dim_supplier.country) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        sum(col("price") * col("sale_quantity")).alias("weighted_price_sum"),
        sum("sale_quantity").alias("total_quantity")
    ) \
    .withColumn("avg_price", col("weighted_price_sum") / col("total_quantity")) \
    .select("supplier_id", "supplier_name", "country", "total_revenue", "avg_price")

write_to_clickhouse(supplier_vitrina, "vitrina_supplier_sales", ["supplier_id"])

top5_suppliers = supplier_vitrina.orderBy(desc("total_revenue")).limit(5)
write_to_clickhouse(top5_suppliers, "top5_suppliers_by_revenue", ["supplier_id"])

# ===============================
# 6. Витрина качества продукции
# ===============================
print("Создание витрины качества продукции...")
quality_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name) \
    .agg(
        first("rating").alias("rating"),
        first("reviews").alias("review_count"),
        sum("sale_quantity").alias("total_quantity"),
        sum("sale_total_price").alias("total_revenue")
    )

write_to_clickhouse(quality_vitrina, "vitrina_product_quality", ["product_id"])

# Корреляция
correlation = quality_vitrina.agg(
    corr("rating", "total_revenue").alias("corr_rating_revenue"),
    corr("rating", "total_quantity").alias("corr_rating_quantity")
).withColumn("description", lit("Correlation between rating and sales"))

write_to_clickhouse(correlation, "product_quality_correlation", ["description"])

print("=" * 60)
print("ВСЕ 6 ВИТРИН + ТОПЫ + КОРРЕЛЯЦИЯ УСПЕШНО ЗАГРУЖЕНЫ В CLICKHOUSE!")
print("=" * 60)

# ===============================
# Дополнительно: проверка данных
# ===============================
def check_table_count(table_name):
    """Проверка количества записей в таблице"""
    try:
        count_df = spark.read \
            .format("jdbc") \
            .option("url", ch_jdbc_url) \
            .option("dbtable", f"(SELECT count(*) as cnt FROM {table_name}) as t") \
            .option("user", ch_properties["user"]) \
            .option("password", ch_properties["password"]) \
            .option("driver", ch_properties["driver"]) \
            .load()
        count = count_df.first()["cnt"]
        print(f"✓ Таблица {table_name}: {count} записей")
        return count
    except Exception as e:
        print(f"✗ Ошибка при проверке таблицы {table_name}: {e}")
        return 0

print("\nПроверка загруженных данных:")
tables_to_check = [
    "vitrina_product_sales", "vitrina_customer_sales", "vitrina_time_sales",
    "vitrina_store_sales", "vitrina_supplier_sales", "vitrina_product_quality"
]

for table in tables_to_check:
    check_table_count(table)

Создание витрины продаж по продуктам...
✗ Ошибка при создании таблицы vitrina_product_sales: An error occurred while calling o546.save.
: java.sql.SQLException: Code: 119. DB::Exception: Table engine is not specified in CREATE query. (ENGINE_REQUIRED) (version 23.7.6.111 (official build))
, server ClickHouseNode [uri=http://clickhouse:8123/default]@-7368705
	at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:85)
	at com.clickhouse.jdbc.SqlExceptionUtils.create(SqlExceptionUtils.java:31)
	at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:90)
	at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.getLastResponse(ClickHouseStatementImpl.java:122)
	at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.executeLargeUpdate(ClickHouseStatementImpl.java:489)
	at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.executeUpdate(ClickHouseStatementImpl.java:498)
	at org.apache.spark.sql.jdbc.JdbcDialect.createTable(JdbcDialects.scala:192)
	at org.apac

AttributeError: 'DataFrame' object has no attribute 'city'

In [12]:
from pyspark.sql.functions import *

# ===============================
# Настройки подключения к ClickHouse
# ===============================
ch_jdbc_url = "jdbc:clickhouse://clickhouse:8123/default"
ch_properties = {
    "user": "default",
    "password": "",
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

# Батчинг для больших данных
batch_size = 100000

# ===============================
# Вспомогательная функция для записи в существующие таблицы
# ===============================
def write_to_clickhouse_existing(df, table_name):
    """Запись DataFrame в существующую таблицу ClickHouse"""
    try:
        # Очищаем таблицу перед записью новых данных
        clear_table_query = f"TRUNCATE TABLE {table_name}"
        # Выполняем через Spark SQL или другим способом
        
        # Записываем данные в существующую таблицу
        df.write \
            .format("jdbc") \
            .option("url", ch_jdbc_url) \
            .option("dbtable", table_name) \
            .option("user", ch_properties["user"]) \
            .option("password", ch_properties["password"]) \
            .option("driver", ch_properties["driver"]) \
            .option("batchsize", batch_size) \
            .mode("append").save()
        
        print(f"✓ Данные записаны в таблицу {table_name}")
        
    except Exception as e:
        print(f"✗ Ошибка при записи в таблицу {table_name}: {e}")

# ===============================
# Функция для очистки таблиц через ClickHouse HTTP API
# ===============================
def clear_clickhouse_table(table_name):
    """Очистка таблицы через HTTP запрос"""
    try:
        import requests
        query = f"TRUNCATE TABLE {table_name}"
        url = "http://clickhouse:8123/"
        response = requests.post(url, data=query)
        if response.status_code == 200:
            print(f"✓ Таблица {table_name} очищена")
        else:
            print(f"✗ Ошибка очистки {table_name}: {response.text}")
    except Exception as e:
        print(f"✗ Ошибка при очистке таблицы {table_name}: {e}")

# ===============================
# 1. Витрина продаж по продуктам
# ===============================
print("Создание витрины продаж по продуктам...")
product_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name, dim_product.category) \
    .agg(
        sum("sale_quantity").alias("total_quantity"),
        sum("sale_total_price").alias("total_revenue"),
        first("rating").alias("avg_rating"),
        first("reviews").alias("review_count")
    )

# Очищаем и записываем
clear_clickhouse_table("vitrina_product_sales")
write_to_clickhouse_existing(product_vitrina, "vitrina_product_sales")

# Топ-10 самых продаваемых
top10_products = product_vitrina.orderBy(desc("total_quantity")).limit(10)
clear_clickhouse_table("top10_sold_products")
write_to_clickhouse_existing(top10_products, "top10_sold_products")

# Выручка по категориям
category_revenue = product_vitrina.groupBy("category") \
    .agg(sum("total_revenue").alias("category_revenue"))
clear_clickhouse_table("category_revenue")
write_to_clickhouse_existing(category_revenue, "category_revenue")

# ===============================
# 2. Витрина продаж по клиентам
# ===============================
print("Создание витрины продаж по клиентам...")
customer_vitrina = fact.join(dim_customer, fact.customer_id == dim_customer.customer_id) \
    .groupBy(dim_customer.customer_id, dim_customer.first_name, dim_customer.last_name, dim_customer.country) \
    .agg(
        sum("sale_total_price").alias("total_spent"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    ) \
    .withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .select("customer_id", "customer_name", "country", "total_spent", "order_count", "avg_check")

clear_clickhouse_table("vitrina_customer_sales")
write_to_clickhouse_existing(customer_vitrina, "vitrina_customer_sales")

top10_customers = customer_vitrina.orderBy(desc("total_spent")).limit(10)
clear_clickhouse_table("top10_customers_by_spent")
write_to_clickhouse_existing(top10_customers, "top10_customers_by_spent")

customer_country_dist = customer_vitrina.groupBy("country") \
    .agg(
        sum("total_spent").alias("total_spent_by_country"),
        count("*").alias("customer_count")
    )
clear_clickhouse_table("customer_country_distribution")
write_to_clickhouse_existing(customer_country_dist, "customer_country_distribution")

# ===============================
# 3. Витрина продаж по времени
# ===============================
print("Создание витрины продаж по времени...")
time_vitrina = fact.join(dim_date, fact.date_id == dim_date.date_id) \
    .groupBy(dim_date.year, dim_date.month) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        sum("sale_quantity").alias("total_quantity"),
        count("*").alias("order_count")
    ) \
    .withColumn("avg_check", col("total_revenue") / col("order_count")) \
    .withColumn("avg_order_size", col("total_quantity") / col("order_count"))

clear_clickhouse_table("vitrina_time_sales")
write_to_clickhouse_existing(time_vitrina, "vitrina_time_sales")

# ===============================
# 4. Витрина продаж по магазинам
# ===============================
print("Создание витрины продаж по магазинам...")
store_vitrina = fact.join(dim_store, fact.store_id == dim_store.store_id) \
    .groupBy(dim_store.store_id, dim_store.store_name, dim_store.city, dim_store.country) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    )

clear_clickhouse_table("vitrina_store_sales")
write_to_clickhouse_existing(store_vitrina, "vitrina_store_sales")

top5_stores = store_vitrina.orderBy(desc("total_revenue")).limit(5)
clear_clickhouse_table("top5_stores_by_revenue")
write_to_clickhouse_existing(top5_stores, "top5_stores_by_revenue")

# ===============================
# 5. Витрина продаж по поставщикам
# ===============================
print("Создание витрины продаж по поставщикам...")
supplier_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .join(dim_supplier, fact.supplier_id == dim_supplier.supplier_id) \
    .groupBy(dim_supplier.supplier_id, dim_supplier.supplier_name, dim_supplier.country) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        sum(col("price") * col("sale_quantity")).alias("weighted_price_sum"),
        sum("sale_quantity").alias("total_quantity")
    ) \
    .withColumn("avg_price", col("weighted_price_sum") / col("total_quantity")) \
    .select("supplier_id", "supplier_name", "country", "total_revenue", "avg_price")

clear_clickhouse_table("vitrina_supplier_sales")
write_to_clickhouse_existing(supplier_vitrina, "vitrina_supplier_sales")

top5_suppliers = supplier_vitrina.orderBy(desc("total_revenue")).limit(5)
clear_clickhouse_table("top5_suppliers_by_revenue")
write_to_clickhouse_existing(top5_suppliers, "top5_suppliers_by_revenue")

# ===============================
# 6. Витрина качества продукции
# ===============================
print("Создание витрины качества продукции...")
quality_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name) \
    .agg(
        first("rating").alias("rating"),
        first("reviews").alias("review_count"),
        sum("sale_quantity").alias("total_quantity"),
        sum("sale_total_price").alias("total_revenue")
    )

clear_clickhouse_table("vitrina_product_quality")
write_to_clickhouse_existing(quality_vitrina, "vitrina_product_quality")

# Корреляция
correlation = quality_vitrina.agg(
    corr("rating", "total_revenue").alias("corr_rating_revenue"),
    corr("rating", "total_quantity").alias("corr_rating_quantity")
).withColumn("description", lit("Correlation between rating and sales"))

clear_clickhouse_table("product_quality_correlation")
write_to_clickhouse_existing(correlation, "product_quality_correlation")

print("=" * 60)
print("ВСЕ ДАННЫЕ УСПЕШНО ЗАГРУЖЕНЫ В СУЩЕСТВУЮЩИЕ ТАБЛИЦЫ CLICKHOUSE!")
print("=" * 60)

# ===============================
# Проверка данных
# ===============================
def check_table_count(table_name):
    """Проверка количества записей в таблице"""
    try:
        count_df = spark.read \
            .format("jdbc") \
            .option("url", ch_jdbc_url) \
            .option("dbtable", f"(SELECT count(*) as cnt FROM {table_name}) as t") \
            .option("user", ch_properties["user"]) \
            .option("password", ch_properties["password"]) \
            .option("driver", ch_properties["driver"]) \
            .load()
        count = count_df.first()["cnt"]
        print(f"✓ Таблица {table_name}: {count} записей")
        return count
    except Exception as e:
        print(f"✗ Ошибка при проверке таблицы {table_name}: {e}")
        return 0

print("\nПроверка загруженных данных:")
tables_to_check = [
    "vitrina_product_sales", "vitrina_customer_sales", "vitrina_time_sales",
    "vitrina_store_sales", "vitrina_supplier_sales", "vitrina_product_quality",
    "top10_sold_products", "top10_customers_by_spent", "top5_stores_by_revenue",
    "top5_suppliers_by_revenue", "product_quality_correlation"
]

for table in tables_to_check:
    check_table_count(table)

Создание витрины продаж по продуктам...
✗ Ошибка очистки vitrina_product_sales: Code: 60. DB::Exception: Table default.vitrina_product_sales doesn't exist. (UNKNOWN_TABLE) (version 23.7.6.111 (official build))

✗ Ошибка при записи в таблицу vitrina_product_sales: An error occurred while calling o762.save.
: java.sql.SQLException: Code: 119. DB::Exception: Table engine is not specified in CREATE query. (ENGINE_REQUIRED) (version 23.7.6.111 (official build))
, server ClickHouseNode [uri=http://clickhouse:8123/default]@-7368705
	at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:85)
	at com.clickhouse.jdbc.SqlExceptionUtils.create(SqlExceptionUtils.java:31)
	at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:90)
	at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.getLastResponse(ClickHouseStatementImpl.java:122)
	at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.executeLargeUpdate(ClickHouseStatementImpl.java:489)
	at com.clickhouse.jdbc.

AttributeError: 'DataFrame' object has no attribute 'city'

In [14]:
import requests

def create_clickhouse_table(table_name, create_query):
    """Создание таблицы в ClickHouse через HTTP API"""
    try:
        url = "http://clickhouse:8123/"
        response = requests.post(url, data=create_query)
        if response.status_code == 200:
            print(f"✓ Таблица {table_name} создана")
            return True
        else:
            print(f"✗ Ошибка создания {table_name}: {response.text}")
            return False
    except Exception as e:
        print(f"✗ Ошибка при создании таблицы {table_name}: {e}")
        return False

# Создаем все таблицы
print("Создание таблиц в ClickHouse...")

# 1. Витрина продаж по продуктам
create_clickhouse_table("vitrina_product_sales", """
CREATE TABLE IF NOT EXISTS vitrina_product_sales (
    product_id UInt32,
    name String,
    category String,
    total_quantity UInt64,
    total_revenue Decimal(15,2),
    avg_rating Float32,
    review_count UInt32
) ENGINE = MergeTree()
ORDER BY (category, product_id)
""")

# 2. Витрина продаж по клиентам
create_clickhouse_table("vitrina_customer_sales", """
CREATE TABLE IF NOT EXISTS vitrina_customer_sales (
    customer_id UInt32,
    customer_name String,
    country String,
    total_spent Decimal(15,2),
    order_count UInt32,
    avg_check Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY (country, customer_id)
""")

# 3. Витрина продаж по времени
create_clickhouse_table("vitrina_time_sales", """
CREATE TABLE IF NOT EXISTS vitrina_time_sales (
    year UInt16,
    month UInt8,
    total_revenue Decimal(15,2),
    total_quantity UInt64,
    order_count UInt32,
    avg_check Decimal(15,2),
    avg_order_size Float32
) ENGINE = MergeTree()
ORDER BY (year, month)
""")

# 4. Витрина продаж по магазинам
create_clickhouse_table("vitrina_store_sales", """
CREATE TABLE IF NOT EXISTS vitrina_store_sales (
    store_id UInt32,
    store_name String,
    store_city String,
    store_country String,
    total_revenue Decimal(15,2),
    order_count UInt32,
    avg_check Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY (store_country, store_id)
""")

# 5. Витрина продаж по поставщикам
create_clickhouse_table("vitrina_supplier_sales", """
CREATE TABLE IF NOT EXISTS vitrina_supplier_sales (
    supplier_id UInt32,
    supplier_name String,
    supplier_country String,
    total_revenue Decimal(15,2),
    avg_price Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY (supplier_country, supplier_id)
""")

# 6. Витрина качества продукции
create_clickhouse_table("vitrina_product_quality", """
CREATE TABLE IF NOT EXISTS vitrina_product_quality (
    product_id UInt32,
    name String,
    rating Float32,
    review_count UInt32,
    total_quantity UInt64,
    total_revenue Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY product_id
""")

# Дополнительные таблицы
create_clickhouse_table("top10_sold_products", """
CREATE TABLE IF NOT EXISTS top10_sold_products (
    product_id UInt32,
    name String,
    category String,
    total_quantity UInt64,
    total_revenue Decimal(15,2),
    avg_rating Float32,
    review_count UInt32
) ENGINE = MergeTree()
ORDER BY product_id
""")

create_clickhouse_table("category_revenue", """
CREATE TABLE IF NOT EXISTS category_revenue (
    category String,
    category_revenue Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY category
""")

create_clickhouse_table("top10_customers_by_spent", """
CREATE TABLE IF NOT EXISTS top10_customers_by_spent (
    customer_id UInt32,
    customer_name String,
    country String,
    total_spent Decimal(15,2),
    order_count UInt32,
    avg_check Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY customer_id
""")

create_clickhouse_table("customer_country_distribution", """
CREATE TABLE IF NOT EXISTS customer_country_distribution (
    country String,
    total_spent_by_country Decimal(15,2),
    customer_count UInt32
) ENGINE = MergeTree()
ORDER BY country
""")

create_clickhouse_table("top5_stores_by_revenue", """
CREATE TABLE IF NOT EXISTS top5_stores_by_revenue (
    store_id UInt32,
    store_name String,
    store_city String,
    store_country String,
    total_revenue Decimal(15,2),
    order_count UInt32,
    avg_check Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY store_id
""")

create_clickhouse_table("top5_suppliers_by_revenue", """
CREATE TABLE IF NOT EXISTS top5_suppliers_by_revenue (
    supplier_id UInt32,
    supplier_name String,
    supplier_country String,
    total_revenue Decimal(15,2),
    avg_price Decimal(15,2)
) ENGINE = MergeTree()
ORDER BY supplier_id
""")

create_clickhouse_table("product_quality_correlation", """
CREATE TABLE IF NOT EXISTS product_quality_correlation (
    corr_rating_revenue Float64,
    corr_rating_quantity Float64,
    description String
) ENGINE = MergeTree()
ORDER BY description
""")

Создание таблиц в ClickHouse...
✓ Таблица vitrina_product_sales создана
✓ Таблица vitrina_customer_sales создана
✓ Таблица vitrina_time_sales создана
✓ Таблица vitrina_store_sales создана
✓ Таблица vitrina_supplier_sales создана
✓ Таблица vitrina_product_quality создана
✓ Таблица top10_sold_products создана
✓ Таблица category_revenue создана
✓ Таблица top10_customers_by_spent создана
✓ Таблица customer_country_distribution создана
✓ Таблица top5_stores_by_revenue создана
✓ Таблица top5_suppliers_by_revenue создана
✓ Таблица product_quality_correlation создана


True

In [15]:
def write_to_clickhouse_existing(df, table_name):
    """Запись DataFrame в существующую таблицу ClickHouse"""
    try:
        df.write \
            .format("jdbc") \
            .option("url", ch_jdbc_url) \
            .option("dbtable", table_name) \
            .option("user", ch_properties["user"]) \
            .option("password", ch_properties["password"]) \
            .option("driver", ch_properties["driver"]) \
            .option("batchsize", 100000) \
            .mode("append") \
            .save()
        
        print(f"✓ Данные записаны в таблицу {table_name}")
        
    except Exception as e:
        print(f"✗ Ошибка при записи в таблицу {table_name}: {e}")

# ===============================
# 1. Витрина продаж по продуктам
# ===============================
print("Создание витрины продаж по продуктам...")
product_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name, dim_product.category) \
    .agg(
        sum("sale_quantity").alias("total_quantity"),
        sum("sale_total_price").alias("total_revenue"),
        first("rating").alias("avg_rating"),
        first("reviews").alias("review_count")
    )

write_to_clickhouse_existing(product_vitrina, "vitrina_product_sales")

# Топ-10 самых продаваемых
top10_products = product_vitrina.orderBy(desc("total_quantity")).limit(10)
write_to_clickhouse_existing(top10_products, "top10_sold_products")

# Выручка по категориям
category_revenue = product_vitrina.groupBy("category") \
    .agg(sum("total_revenue").alias("category_revenue"))
write_to_clickhouse_existing(category_revenue, "category_revenue")

# ===============================
# 2. Витрина продаж по клиентам
# ===============================
print("Создание витрины продаж по клиентам...")
customer_vitrina = fact.join(dim_customer, fact.customer_id == dim_customer.customer_id) \
    .groupBy(dim_customer.customer_id, dim_customer.first_name, dim_customer.last_name, dim_customer.country) \
    .agg(
        sum("sale_total_price").alias("total_spent"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    ) \
    .withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .select("customer_id", "customer_name", "country", "total_spent", "order_count", "avg_check")

write_to_clickhouse_existing(customer_vitrina, "vitrina_customer_sales")

top10_customers = customer_vitrina.orderBy(desc("total_spent")).limit(10)
write_to_clickhouse_existing(top10_customers, "top10_customers_by_spent")

customer_country_dist = customer_vitrina.groupBy("country") \
    .agg(
        sum("total_spent").alias("total_spent_by_country"),
        count("*").alias("customer_count")
    )
write_to_clickhouse_existing(customer_country_dist, "customer_country_distribution")

# ===============================
# 3. Витрина продаж по времени
# ===============================
print("Создание витрины продаж по времени...")
time_vitrina = fact.join(dim_date, fact.date_id == dim_date.date_id) \
    .groupBy(dim_date.year, dim_date.month) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        sum("sale_quantity").alias("total_quantity"),
        count("*").alias("order_count")
    ) \
    .withColumn("avg_check", col("total_revenue") / col("order_count")) \
    .withColumn("avg_order_size", col("total_quantity") / col("order_count"))

write_to_clickhouse_existing(time_vitrina, "vitrina_time_sales")

# ===============================
# 4. Витрина продаж по магазинам (с правильными колонками)
# ===============================
print("Создание витрины продаж по магазинам...")
store_vitrina = fact.join(dim_store, fact.store_id == dim_store.store_id) \
    .groupBy(dim_store.store_id, dim_store.store_name, dim_store.store_city, dim_store.store_country) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    )

write_to_clickhouse_existing(store_vitrina, "vitrina_store_sales")

top5_stores = store_vitrina.orderBy(desc("total_revenue")).limit(5)
write_to_clickhouse_existing(top5_stores, "top5_stores_by_revenue")

# ===============================
# 5. Витрина продаж по поставщикам (с правильными колонками)
# ===============================
print("Создание витрины продаж по поставщикам...")
supplier_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .join(dim_supplier, fact.supplier_id == dim_supplier.supplier_id) \
    .groupBy(dim_supplier.supplier_id, dim_supplier.supplier_name, dim_supplier.supplier_country) \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        sum(col("price") * col("sale_quantity")).alias("weighted_price_sum"),
        sum("sale_quantity").alias("total_quantity")
    ) \
    .withColumn("avg_price", col("weighted_price_sum") / col("total_quantity")) \
    .select("supplier_id", "supplier_name", "supplier_country", "total_revenue", "avg_price")

write_to_clickhouse_existing(supplier_vitrina, "vitrina_supplier_sales")

top5_suppliers = supplier_vitrina.orderBy(desc("total_revenue")).limit(5)
write_to_clickhouse_existing(top5_suppliers, "top5_suppliers_by_revenue")

# ===============================
# 6. Витрина качества продукции
# ===============================
print("Создание витрины качества продукции...")
quality_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name) \
    .agg(
        first("rating").alias("rating"),
        first("reviews").alias("review_count"),
        sum("sale_quantity").alias("total_quantity"),
        sum("sale_total_price").alias("total_revenue")
    )

write_to_clickhouse_existing(quality_vitrina, "vitrina_product_quality")

# Корреляция
correlation = quality_vitrina.agg(
    corr("rating", "total_revenue").alias("corr_rating_revenue"),
    corr("rating", "total_quantity").alias("corr_rating_quantity")
).withColumn("description", lit("Correlation between rating and sales"))

write_to_clickhouse_existing(correlation, "product_quality_correlation")

print("=" * 60)
print("ВСЕ ДАННЫЕ УСПЕШНО ЗАГРУЖЕНЫ В CLICKHOUSE!")
print("=" * 60)

# Проверка данных
def check_table_count(table_name):
    """Проверка количества записей в таблице"""
    try:
        count_df = spark.read \
            .format("jdbc") \
            .option("url", ch_jdbc_url) \
            .option("dbtable", f"(SELECT count(*) as cnt FROM {table_name}) as t") \
            .option("user", ch_properties["user"]) \
            .option("password", ch_properties["password"]) \
            .option("driver", ch_properties["driver"]) \
            .load()
        count = count_df.first()["cnt"]
        print(f"✓ Таблица {table_name}: {count} записей")
        return count
    except Exception as e:
        print(f"✗ Ошибка при проверке таблицы {table_name}: {e}")
        return 0

print("\nПроверка загруженных данных:")
tables_to_check = [
    "vitrina_product_sales", "vitrina_customer_sales", "vitrina_time_sales",
    "vitrina_store_sales", "vitrina_supplier_sales", "vitrina_product_quality",
    "top10_sold_products", "top10_customers_by_spent", "top5_stores_by_revenue",
    "top5_suppliers_by_revenue", "product_quality_correlation"
]

for table in tables_to_check:
    check_table_count(table)

Создание витрины продаж по продуктам...


                                                                                

✓ Данные записаны в таблицу vitrina_product_sales
✓ Данные записаны в таблицу top10_sold_products
✓ Данные записаны в таблицу category_revenue
Создание витрины продаж по клиентам...
✓ Данные записаны в таблицу vitrina_customer_sales
✓ Данные записаны в таблицу top10_customers_by_spent
✓ Данные записаны в таблицу customer_country_distribution
Создание витрины продаж по времени...
✓ Данные записаны в таблицу vitrina_time_sales
Создание витрины продаж по магазинам...
✗ Ошибка при записи в таблицу vitrina_store_sales: Column store_city not found in schema Some(StructType(StructField(store_id,DecimalType(20,0),false),StructField(store_name,StringType,false),StructField(city,StringType,false),StructField(country,StringType,false),StructField(total_revenue,DecimalType(15,2),false),StructField(order_count,DecimalType(20,0),false),StructField(avg_check,DecimalType(15,2),false))).
✓ Данные записаны в таблицу top5_stores_by_revenue
Создание витрины продаж по поставщикам...
✗ Ошибка при записи в т