In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
POSTGRES_URL = "jdbc:postgresql://postgres:5432/postgres"
POSTGRES_USER = "postgres"
POSTGRES_PASSWORD = "postgres"

In [3]:
CLICKHOUSE_URL = "jdbc:clickhouse://clickhouse-server:8123/default"
CLICKHOUSE_USER = "user"
CLICKHOUSE_PASSWORD = "pass" 

In [4]:
POSTGRES_JAR = "/home/jovyan/jars/postgresql-42.7.8.jar"
CLICKHOUSE_JAR = "/home/jovyan/jars/clickhouse-jdbc-0.9.4-all.jar"

In [5]:
spark = (
    SparkSession.builder
        .appName("ETL_star_and_clickhouse")
        .config("spark.jars", f"{POSTGRES_JAR},{CLICKHOUSE_JAR}")
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
        .getOrCreate()
)

spark.conf.set("spark.sql.shuffle.partitions", "8")

In [6]:
mock_raw = (
    spark.read.format("jdbc")
        .option("url", POSTGRES_URL)
        .option("dbtable", "mock_data_raw")
        .option("user", POSTGRES_USER)
        .option("password", POSTGRES_PASSWORD)
        .option("driver", "org.postgresql.Driver")
        .load()
)


In [7]:
dim_customer = (
    mock_raw.select(
        col("customer_first_name").alias("first_name"),
        col("customer_last_name").alias("last_name"),
        col("customer_age").alias("age"),
        col("customer_email").alias("email"),
        col("customer_country").alias("country"),
        col("customer_postal_code").alias("postal_code"),
        col("customer_pet_type").alias("pet_type"),
        col("customer_pet_name").alias("pet_name"),
        col("customer_pet_breed").alias("pet_breed")
    )
    .dropDuplicates()
    .withColumn("customer_key", monotonically_increasing_id())
)

In [8]:
dim_seller = (
    mock_raw.select(
        col("seller_first_name").alias("first_name"),
        col("seller_last_name").alias("last_name"),
        col("seller_email").alias("email"),
        col("seller_country").alias("country"),
        col("seller_postal_code").alias("postal_code")
    )
    .dropDuplicates()
    .withColumn("seller_key", monotonically_increasing_id())
)

In [9]:
dim_product = (
    mock_raw.select(
        col("product_name"),
        col("product_category").alias("category"),
        col("product_brand").alias("brand"),
        col("product_material").alias("material"),
        col("product_description").alias("description"),
        col("pet_category"),
        col("product_weight").alias("weight"),
        col("product_color").alias("color"),
        col("product_size").alias("size"),
        col("product_price").alias("price"),
        col("product_rating").alias("rating"),
        col("product_reviews").alias("reviews"),
        to_date(col("product_release_date"), "MM/dd/yyyy").alias("release_date"),
        to_date(col("product_expiry_date"), "MM/dd/yyyy").alias("expiry_date")
    )
    .dropDuplicates()
    .withColumn("product_key", monotonically_increasing_id())
)

In [10]:
dim_store = (
    mock_raw.select(
        col("store_name").alias("name"),
        col("store_location").alias("location"),
        col("store_city").alias("city"),
        col("store_state").alias("state"),
        col("store_country").alias("country"),
        col("store_phone").alias("phone"),
        col("store_email").alias("email")
    )
    .dropDuplicates()
    .withColumn("store_key", monotonically_increasing_id())
)

In [11]:
dim_supplier = (
    mock_raw.select(
        col("supplier_name").alias("name"),
        col("supplier_contact").alias("contact"),
        col("supplier_email").alias("email"),
        col("supplier_phone").alias("phone"),
        col("supplier_address").alias("address"),
        col("supplier_city").alias("city"),
        col("supplier_country").alias("country")
    )
    .dropDuplicates()
    .withColumn("supplier_key", monotonically_increasing_id())
)

In [12]:
for df, table in [
    (dim_customer, "dim_customer"),
    (dim_seller, "dim_seller"),
    (dim_product, "dim_product"),
    (dim_store, "dim_store"),
    (dim_supplier, "dim_supplier")
]:
    df.write.format("jdbc") \
      .option("url", POSTGRES_URL) \
      .option("dbtable", table) \
      .option("user", POSTGRES_USER) \
      .option("password", POSTGRES_PASSWORD) \
      .option("driver", "org.postgresql.Driver") \
      .mode("overwrite") \
      .save()


In [13]:
fact_sales = (
    mock_raw
    .join(dim_customer, (mock_raw.customer_email == dim_customer.email))
    .join(dim_seller, (mock_raw.seller_email == dim_seller.email))
    .join(dim_product, (mock_raw.product_name == dim_product.product_name) & (mock_raw.product_price == dim_product.price))
    .join(dim_store, (mock_raw.store_name == dim_store.name) & (mock_raw.store_phone == dim_store.phone))
    .join(dim_supplier, (mock_raw.supplier_name == dim_supplier.name) & (mock_raw.supplier_email == dim_supplier.email))
    .select(
        to_date(col("sale_date"), "MM/dd/yyyy").alias("sale_date"),
        col("sale_quantity"),
        col("sale_total_price"),
        col("customer_key"),
        col("seller_key"),
        col("product_key"),
        col("store_key"),
        col("supplier_key")
    )
)

fact_sales.write.format("jdbc") \
    .option("url", POSTGRES_URL) \
    .option("dbtable", "fact_sales") \
    .option("user", POSTGRES_USER) \
    .option("password", POSTGRES_PASSWORD) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [14]:
def write_to_ch(df, table_name, mode='overwrite'):
    df.write.format("jdbc") \
      .option("url", CLICKHOUSE_URL) \
      .option("dbtable", table_name) \
      .option("user", CLICKHOUSE_USER) \
      .option("password", CLICKHOUSE_PASSWORD) \
      .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
      .mode(mode) \
      .save()

In [15]:
vitrina_products = (
    fact_sales.join(dim_product, "product_key")
    .groupBy("product_name")
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        sum("sale_quantity").alias("total_quantity"),
        avg("rating").alias("avg_rating"),
        avg("reviews").alias("avg_reviews")
    )
)
write_to_ch(vitrina_products, "vitrina_products")

In [16]:
top10_products = vitrina_products.orderBy(desc("total_quantity")).limit(10)
write_to_ch(top10_products, "top10_products")

In [17]:
category_revenue = (
    fact_sales.join(dim_product, "product_key")
    .groupBy("category")
    .agg(sum("sale_total_price").alias("category_revenue"))
)
write_to_ch(category_revenue, "category_revenue")

In [18]:
vitrina_customers = (
    fact_sales.join(dim_customer, "customer_key")
    .groupBy("first_name", "last_name", "email")
    .agg(
        sum("sale_total_price").alias("total_spent"),
        avg("sale_total_price").alias("avg_check")
    )
)
write_to_ch(vitrina_customers, "vitrina_customers")

In [19]:
top10_customers = vitrina_customers.orderBy(desc("total_spent")).limit(10)
write_to_ch(top10_customers, "top10_customers")

In [20]:
vitrina_time = (
    fact_sales.withColumn("month", month("sale_date"))
              .withColumn("year", year("sale_date"))
    .groupBy("year", "month")
    .agg(
        sum("sale_total_price").alias("monthly_revenue"),
        avg("sale_total_price").alias("avg_order")
    )
)
write_to_ch(vitrina_time, "vitrina_time")

In [21]:
vitrina_stores = (
    fact_sales.join(dim_store, "store_key")
    .groupBy("name", "city", "country")
    .agg(
        sum("sale_total_price").alias("store_revenue"),
        avg("sale_total_price").alias("avg_check")
    )
)
write_to_ch(vitrina_stores, "vitrina_stores")

In [22]:
top5_stores = vitrina_stores.orderBy(desc("store_revenue")).limit(5)
write_to_ch(top5_stores, "top5_stores")

In [23]:
vitrina_suppliers = (
    fact_sales.join(dim_supplier, "supplier_key")
    .join(dim_product, "product_key") 
    .groupBy(dim_supplier["name"], dim_supplier["country"])  
    .agg(
        sum("sale_total_price").alias("supplier_revenue"),
        avg(dim_product["price"]).alias("avg_product_price")  
    )
)

In [24]:
top5_suppliers = vitrina_suppliers.orderBy(desc("supplier_revenue")).limit(5)
write_to_ch(top5_suppliers, "top5_suppliers")

In [25]:
vitrina_quality = (
    dim_product.select("product_name", "rating", "reviews").orderBy(desc("rating"))
)
write_to_ch(vitrina_quality, "vitrina_quality")

In [26]:
spark.stop()
print("Finished")

Finished
