In [1]:
from pyspark.sql import SparkSession

jar_path = "/opt/shared-jars/postgresql-42.7.5.jar"

spark = SparkSession.builder \
    .appName("PostgreSQL Integration") \
    .config("spark.jars", jar_path) \
    .config("spark.driver.extraClassPath", jar_path) \
    .config("spark.executor.extraClassPath", jar_path) \
    .config("spark.cassandra.connection.host", "cassandra") \
    .getOrCreate()

# Повторная проверка
print("PostgreSQL driver visible to JVM:",
      spark._jvm.Thread.currentThread().getContextClassLoader().getResources("org/postgresql/Driver.class").hasMoreElements())

PostgreSQL driver visible to JVM: True


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, avg, desc, coalesce, to_date, lit
from pyspark.sql.types import DateType

# === Инициализация SparkSession ===
spark = SparkSession.builder \
    .appName("Full Integration: PostgreSQL + Cassandra") \
    .config("spark.jars", "/opt/shared-jars/postgresql-42.7.5.jar,/opt/shared-jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar") \
    .config("spark.driver.extraClassPath", "/opt/shared-jars/postgresql-42.7.5.jar:/opt/shared-jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar") \
    .config("spark.executor.extraClassPath", "/opt/shared-jars/postgresql-42.7.5.jar:/opt/shared-jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .getOrCreate()

# === Параметры подключения к PostgreSQL ===
url = "jdbc:postgresql://host.docker.internal:65432/mydatabase"
properties = {
    "user": "myuser",
    "password": "mysecretpassword",
    "driver": "org.postgresql.Driver"
}

# === Загрузка и объединение CSV-файлов ===
csv_files = [(i, f"/home/jovyan/work/data/MOCK_DATA_{i}.csv") for i in range(1, 11)]

def load_file_with_offset(file_num, file_path):
    df = spark.read.option("header", "true").option("inferSchema", "true") \
        .option("multiLine", "true").option("quote", "\"").option("escape", "\"") \
        .csv(file_path)

    offset = 1000 * (file_num - 1)

    df = df.withColumn("id", (col("id") + offset).cast("integer")) \
        .withColumn("customer_age", col("customer_age").cast("integer")) \
        .withColumn("product_quantity", col("product_quantity").cast("integer")) \
        .withColumn("sale_customer_id", col("sale_customer_id").cast("integer")) \
        .withColumn("sale_seller_id", col("sale_seller_id").cast("integer")) \
        .withColumn("sale_product_id", col("sale_product_id").cast("integer")) \
        .withColumn("sale_quantity", col("sale_quantity").cast("integer")) \
        .withColumn("product_reviews", col("product_reviews").cast("integer")) \
        .withColumn("product_price", col("product_price").cast("decimal(10,2)")) \
        .withColumn("product_weight", col("product_weight").cast("decimal(10,2)")) \
        .withColumn("product_rating", col("product_rating").cast("decimal(3,1)")) \
        .withColumn("sale_total_price", col("sale_total_price").cast("decimal(10,2)")) \
        .withColumn("sale_date", coalesce(to_date(col("sale_date"), "M/d/yyyy"), to_date(col("sale_date"), "MM/dd/yyyy"))) \
        .withColumn("product_release_date", coalesce(to_date(col("product_release_date"), "M/d/yyyy"), to_date(col("product_release_date"), "MM/dd/yyyy"))) \
        .withColumn("product_expiry_date", coalesce(to_date(col("product_expiry_date"), "M/d/yyyy"), to_date(col("product_expiry_date"), "MM/dd/yyyy")))

    return df

In [2]:
# Загружаем и объединяем все файлы с переиндексацией
df_final = None
for file_num, file_path in csv_files:
    df = load_file_with_offset(file_num, file_path)
    if df_final is None:
        df_final = df
    else:
        df_final = df_final.union(df)

# Проверяем диапазоны ID
print("Минимальный ID:", df_final.agg({"id": "min"}).collect()[0][0])
print("Максимальный ID:", df_final.agg({"id": "max"}).collect()[0][0])
print("Общее количество записей:", df_final.count())

Минимальный ID: 1
Максимальный ID: 10000
Общее количество записей: 10000


In [3]:
customers_df = df.select("sale_customer_id", "customer_first_name", "customer_last_name", 
                         "customer_age", "customer_email", "customer_country", 
                         "customer_postal_code", "customer_pet_type", 
                         "customer_pet_name", "customer_pet_breed").distinct()

sellers_df = df.select("sale_seller_id", "seller_first_name", "seller_last_name", 
                       "seller_email", "seller_country", "seller_postal_code").distinct()

products_df = df.select("sale_product_id", "product_name", "product_category", 
                        "product_price", "product_quantity", "product_weight", 
                        "product_color", "product_size", "product_brand", 
                        "product_material", "product_description", 
                        "product_rating", "product_reviews", 
                        "product_release_date", "product_expiry_date").distinct()

stores_df = df.select("store_name", "store_location", "store_city", "store_state", 
                      "store_country", "store_phone", "store_email").distinct()

suppliers_df = df.select("supplier_name", "supplier_contact", "supplier_email", 
                         "supplier_phone", "supplier_address", "supplier_city", 
                         "supplier_country").distinct()

sales_df = df.select("id", "sale_customer_id", "sale_seller_id", "sale_product_id", 
                     "sale_quantity", "sale_total_price", "sale_date", "store_name").distinct()

customers_df.write.jdbc(url=url, table="customers", mode="overwrite", properties=properties)
sellers_df.write.jdbc(url=url, table="sellers", mode="overwrite", properties=properties)
products_df.write.jdbc(url=url, table="products", mode="overwrite", properties=properties)
stores_df.write.jdbc(url=url, table="stores", mode="overwrite", properties=properties)
suppliers_df.write.jdbc(url=url, table="suppliers", mode="overwrite", properties=properties)
sales_df.write.jdbc(url=url, table="sales", mode="overwrite", properties=properties)

In [None]:
spark._jvm.Thread.currentThread().getContextClassLoader().getResources("org/apache/spark/sql/cassandra/DefaultSource.class").hasMoreElements()

In [4]:
spark._jvm.Thread.currentThread().getContextClassLoader().getResources("com/datastax/spark/connector/util/Logging.class").hasMoreElements()

True

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, avg, desc

jar_path = "/opt/shared-jars/postgresql-42.7.5.jar,/opt/shared-jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar"

spark = SparkSession.builder \
    .appName("Cassandra Final Setup") \
    .config("spark.jars", jar_path) \
    .config("spark.driver.extraClassPath", jar_path) \
    .config("spark.executor.extraClassPath", jar_path) \
    .config("spark.cassandra.connection.host", "my-cassandra") \
    .getOrCreate()

url = "jdbc:postgresql://host.docker.internal:65432/mydatabase"
properties = {
    "user": "myuser",
    "password": "mysecretpassword",
    "driver": "org.postgresql.Driver"
}

products_df = spark.read.jdbc(url=url, table="products", properties=properties)
sales_df = spark.read.jdbc(url=url, table="sales", properties=properties)

# === Топ-10 самых продаваемых продуктов ===
top_products_df = sales_df.groupBy("sale_product_id") \
    .agg(
        sum("sale_quantity").alias("total_quantity_sold"),
        sum("sale_total_price").alias("total_revenue"),
        count("*").alias("sales_count")
    ) \
    .alias("sales") \
    .join(
        products_df.alias("products"),
        col("sales.sale_product_id") == col("products.sale_product_id"),
        "left"
    ) \
    .select(
        col("sales.sale_product_id").alias("product_id"),
        col("products.product_name"),
        col("products.product_category"),
        col("total_quantity_sold"),
        col("total_revenue"),
        col("sales_count")
    ) \
    .orderBy(desc("total_quantity_sold")) \
    .limit(10)

# === Выручка по категориям продуктов ===
revenue_by_category_df = sales_df.alias("sales") \
    .join(products_df.alias("products"), col("sales.sale_product_id") == col("products.sale_product_id"), "left") \
    .groupBy("products.product_category") \
    .agg(sum("sales.sale_total_price").alias("total_revenue")) \
    .withColumnRenamed("product_category", "category") \
    .orderBy(desc("total_revenue"))

# === Средний рейтинг и количество отзывов по каждому продукту ===
ratings_df = products_df.select(
    col("sale_product_id").alias("product_id"),
    "product_name",
    "product_category",
    "product_rating",
    "product_reviews"
).filter(col("product_rating").isNotNull())

# === Запись витрин в Cassandra ===
top_products_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="top_selling_products", keyspace="analytics") \
    .save()

revenue_by_category_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="revenue_by_category", keyspace="analytics") \
    .save()

ratings_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="product_ratings", keyspace="analytics") \
    .save()

print("✅ Витрина по продуктам успешно создана и загружена в Cassandra.")

✅ Витрина по продуктам успешно создана и загружена в Cassandra.


In [10]:
customers_df = spark.read.jdbc(url=url, table="customers", properties=properties)
sales_df = spark.read.jdbc(url=url, table="sales", properties=properties)

# === Топ-10 клиентов с наибольшей общей суммой покупок ===
top_customers_df = sales_df.groupBy("sale_customer_id") \
    .agg(
        sum("sale_total_price").alias("total_spent")
    ) \
    .alias("sales") \
    .join(
        customers_df.alias("customers"),
        col("sales.sale_customer_id") == col("customers.sale_customer_id"),
        "left"
    ) \
    .select(
        col("sales.sale_customer_id").alias("customer_id"),
        col("customers.customer_first_name"),
        col("customers.customer_last_name"),
        col("customers.customer_email"),
        col("customers.customer_country"),
        col("total_spent")
    ) \
    .orderBy(desc("total_spent")) \
    .limit(10)

# === Распределение клиентов по странам ===
customers_by_country_df = customers_df.groupBy("customer_country") \
    .agg(count("*").alias("customer_count")) \
    .orderBy(desc("customer_count"))

# === Средний чек для каждого клиента ===
avg_check_df = sales_df.groupBy("sale_customer_id") \
    .agg(
        (sum("sale_total_price") / sum("sale_quantity")).alias("avg_check")
    ) \
    .alias("sales") \
    .join(
        customers_df.alias("customers"),
        col("sales.sale_customer_id") == col("customers.sale_customer_id"),
        "left"
    ) \
    .select(
        col("sales.sale_customer_id").alias("customer_id"),
        col("customers.customer_first_name"),
        col("customers.customer_last_name"),
        col("customers.customer_email"),
        col("customers.customer_country"),
        col("avg_check")
    ) \
    .orderBy(desc("avg_check"))

# === Запись витрин в Cassandra ===
top_customers_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="top_customers", keyspace="analytics") \
    .save()

customers_by_country_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="customers_by_country", keyspace="analytics") \
    .save()

avg_check_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="average_check_per_customer", keyspace="analytics") \
    .save()

print("✅ Витрина по клиентам успешно создана и загружена в Cassandra.")

✅ Витрина по клиентам успешно создана и загружена в Cassandra.


In [11]:
from pyspark.sql.functions import year, month

sales_df = spark.read.jdbc(url=url, table="sales", properties=properties)

# === Месячные и годовые тренды продаж ===
monthly_trends_df = sales_df \
    .withColumn("year", year("sale_date")) \
    .withColumn("month", month("sale_date")) \
    .groupBy("year", "month") \
    .agg(
        sum("sale_total_price").alias("monthly_revenue"),
        sum("sale_quantity").alias("monthly_quantity")
    ) \
    .orderBy("year", "month")

# === Сравнение выручки за разные года ===
yearly_comparison_df = sales_df \
    .withColumn("year", year("sale_date")) \
    .groupBy("year") \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        count("*").alias("sales_count")
    ) \
    .orderBy("year")

# === Средний размер заказа по месяцам ===
avg_order_monthly_df = sales_df \
    .withColumn("year", year("sale_date")) \
    .withColumn("month", month("sale_date")) \
    .groupBy("year", "month") \
    .agg(
        (sum("sale_total_price") / count("*")).alias("avg_order_value")
    ) \
    .orderBy("year", "month")

# === Запись витрин в Cassandra ===
monthly_trends_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="monthly_sales_trends", keyspace="analytics") \
    .save()

yearly_comparison_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="yearly_sales_summary", keyspace="analytics") \
    .save()

avg_order_monthly_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="monthly_avg_order_value", keyspace="analytics") \
    .save()

print("✅ Витрина по времени успешно создана и загружена в Cassandra.")

✅ Витрина по времени успешно создана и загружена в Cassandra.


In [12]:
stores_df = spark.read.jdbc(url=url, table="stores", properties=properties)
sales_df = spark.read.jdbc(url=url, table="sales", properties=properties)

# === Топ-5 магазинов с наибольшей выручкой ===
top_stores_df = sales_df.groupBy("store_name") \
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        count("*").alias("sales_count")
    ) \
    .orderBy(desc("total_revenue")) \
    .limit(5)

# === Распределение продаж по городам и странам ===
geo_sales_df = sales_df.alias("sales") \
    .join(stores_df.alias("stores"), col("sales.store_name") == col("stores.store_name"), "left") \
    .groupBy("stores.store_city", "stores.store_country") \
    .agg(sum("sales.sale_total_price").alias("total_revenue")) \
    .withColumnRenamed("store_city", "city") \
    .withColumnRenamed("store_country", "country") \
    .orderBy(desc("total_revenue"))

# === Средний чек по каждому магазину ===
avg_check_per_store_df = sales_df.groupBy("store_name") \
    .agg(
        (sum("sale_total_price") / count("*")).alias("avg_receipt")
    ) \
    .orderBy(desc("avg_receipt"))

# === Запись витрин в Cassandra ===
top_stores_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="top_stores_by_revenue", keyspace="analytics") \
    .save()

geo_sales_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="store_sales_by_location", keyspace="analytics") \
    .save()

avg_check_per_store_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="store_avg_check", keyspace="analytics") \
    .save()

print("✅ Витрина по магазинам успешно создана и загружена в Cassandra.")

✅ Витрина по магазинам успешно создана и загружена в Cassandra.


In [15]:
suppliers_df = spark.read.jdbc(url=url, table="suppliers", properties=properties)
products_df = spark.read.jdbc(url=url, table="products", properties=properties)
sales_df = spark.read.jdbc(url=url, table="sales", properties=properties)

# === Топ-5 поставщиков с наибольшей выручкой ===
supplier_revenue_df = sales_df.alias("sales") \
    .join(products_df.alias("products"), col("sales.sale_product_id") == col("products.sale_product_id"), "left") \
    .join(suppliers_df.alias("suppliers"), col("products.product_brand") == col("suppliers.supplier_name"), "left") \
    .groupBy("suppliers.supplier_name") \
    .agg(sum("sales.sale_total_price").alias("total_revenue")) \
    .orderBy(desc("total_revenue")) \
    .limit(5)

# === Средняя цена товаров от каждого поставщика ===
avg_price_by_supplier_df = products_df.alias("products") \
    .join(suppliers_df.alias("suppliers"), col("products.product_brand") == col("suppliers.supplier_name"), "left") \
    .groupBy("suppliers.supplier_name") \
    .agg(avg("products.product_price").alias("avg_product_price")) \
    .orderBy(desc("avg_product_price"))

# === Распределение продаж по странам поставщиков ===
sales_by_supplier_country_df = sales_df.alias("sales") \
    .join(products_df.alias("products"), col("sales.sale_product_id") == col("products.sale_product_id"), "left") \
    .join(suppliers_df.alias("suppliers"), col("products.product_brand") == col("suppliers.supplier_name"), "left") \
    .groupBy("suppliers.supplier_country") \
    .agg(sum("sales.sale_total_price").alias("total_revenue")) \
    .withColumnRenamed("supplier_country", "country") \
    .orderBy(desc("total_revenue"))

# === Фильтрация null'ов по ключевому полю ===
supplier_revenue_df_clean = supplier_revenue_df.filter(col("supplier_name").isNotNull())
avg_price_by_supplier_df_clean = avg_price_by_supplier_df.filter(col("supplier_name").isNotNull())
sales_by_supplier_country_df_clean = sales_by_supplier_country_df.filter(col("country").isNotNull())

# === Запись в Cassandra ===
supplier_revenue_df_clean.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="top_suppliers_by_revenue", keyspace="analytics") \
    .save()

avg_price_by_supplier_df_clean.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="avg_price_by_supplier", keyspace="analytics") \
    .save()

sales_by_supplier_country_df_clean.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="sales_by_supplier_country", keyspace="analytics") \
    .save()
print("✅ Витрина по поставщикам успешно создана и загружена в Cassandra.")

✅ Витрина по поставщикам успешно создана и загружена в Cassandra.


In [16]:
# === Продукты с наивысшим и наименьшим рейтингом ===
extreme_ratings_df = products_df.select(
    "sale_product_id", "product_name", "product_category", "product_rating"
).filter(col("product_rating").isNotNull()) \
    .orderBy(col("product_rating").desc()) \
    .limit(5) \
    .union(
        products_df.select(
            "sale_product_id", "product_name", "product_category", "product_rating"
        ).filter(col("product_rating").isNotNull()) \
        .orderBy(col("product_rating").asc()) \
        .limit(5)
    )

# === Корреляция между рейтингом и объёмом продаж ===
rating_sales_corr_df = sales_df.alias("sales") \
    .join(products_df.alias("products"), col("sales.sale_product_id") == col("products.sale_product_id"), "left") \
    .groupBy("products.product_rating") \
    .agg(sum("sales.sale_quantity").alias("total_quantity_sold")) \
    .orderBy("product_rating")

# === Продукты с наибольшим количеством отзывов ===
most_reviewed_df = products_df.select(
    "sale_product_id", "product_name", "product_category", "product_reviews"
).filter(col("product_reviews").isNotNull()) \
    .orderBy(desc("product_reviews")) \
    .limit(10)

# === Запись в Cassandra ===
extreme_ratings_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="extreme_rated_products", keyspace="analytics") \
    .save()

rating_sales_corr_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="rating_vs_sales_volume", keyspace="analytics") \
    .save()

most_reviewed_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .option("confirm.truncate", "true") \
    .options(table="most_reviewed_products", keyspace="analytics") \
    .save()

print("✅ Витрина по качеству продукции успешно создана и загружена в Cassandra.")

✅ Витрина по качеству продукции успешно создана и загружена в Cassandra.
