In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("media-retail-analytics").getOrCreate()

STAGED_BASE = "s3://media-retail-314159/staged"
ANALYTICS_BASE = "s3://media-retail-314159/analytics"

# Read staged tables
album_df = spark.read.format("delta").load(f"{STAGED_BASE}/album")
artist_df = spark.read.format("delta").load(f"{STAGED_BASE}/artist")
customer_df = spark.read.format("delta").load(f"{STAGED_BASE}/customer")
genre_df = spark.read.format("delta").load(f"{STAGED_BASE}/genre")
invoice_df = spark.read.format("delta").load(f"{STAGED_BASE}/invoice")
invoice_line_df = spark.read.format("delta").load(f"{STAGED_BASE}/invoice_line")
track_df = spark.read.format("delta").load(f"{STAGED_BASE}/track")



In [0]:

# ----------------------------
# dim_customer
# ----------------------------
customer_window = Window.orderBy(col("customer_id"))

dim_customer = (
    customer_df
    .dropDuplicates(["customer_id"])
    .withColumn("customer_sk", row_number().over(customer_window))
    .select(
        col("customer_sk"),
        col("customer_id").alias("customer_key"),
        col("country"),
        col("state"),
        current_timestamp().alias("created_at")
    )
)

dim_customer.show(n=5, truncate=True)


# ----------------------------
# dim_genre
# ----------------------------
genre_window = Window.orderBy(col("genre_id"))

dim_genre = (
    genre_df
    .dropDuplicates(["genre_id"])
    .withColumn("genre_sk", row_number().over(genre_window))
    .select(
        col("genre_sk"),
        col("genre_id").alias("genre_key"),
        col("name"),
        current_timestamp().alias("created_at")
    )
)
dim_genre.show(n=5, truncate=True)


# ----------------------------
# dim_track
# ----------------------------
track_window = Window.orderBy(col("track_id"))

# Step 1: Join album to track to get album title and artist_id
track_with_album = track_df.join(
    album_df.select("album_id", col("title").alias("album"), "artist_id"),
    on="album_id",
    how="inner"
)

# Step 2: Join artist to get artist name
dim_track = track_with_album.join(
    artist_df.select("artist_id", col("name").alias("artist")),
    on="artist_id",
    how="left"
).dropDuplicates(["track_id"]) \
 .withColumn("track_sk", row_number().over(track_window)) \
 .select(
     col("track_sk"),
     col("track_id").alias("track_key"),
     col("name").alias("track"),
     col("album"),
     col("artist"),
     current_timestamp().alias("created_at")
 )

dim_track.show(n=5, truncate=True)


# ----------------------------
# dim_invoice
# ----------------------------
invoice_window = Window.orderBy(col("invoice_id"))

dim_invoice = (
    invoice_df
    .dropDuplicates(["invoice_id"])
    .withColumn("invoice_sk", row_number().over(invoice_window))
    .select(
        col("invoice_sk"),
        col("invoice_id").alias("invoice_key"),
        col("invoice_date"),
        current_timestamp().alias("created_at")
    )
)

dim_invoice.show(n=5, truncate=True)
# =====================================================
# FACT TABLE (using surrogate keys)
# =====================================================
fact_window = Window.orderBy(col("invoice_line_id"))

fact_base = (
    invoice_line_df
    .join(invoice_df.select("customer_id", "invoice_id"), "invoice_id", "inner")
    .join(track_df.select("track_id", "genre_id", "album_id"), "track_id", "inner")
)

fact_sales = (
    fact_base
    .join(dim_customer.select("customer_sk", col("customer_key")),
          fact_base.customer_id == col("customer_key"), "left")
    .join(dim_invoice.select("invoice_sk", col("invoice_key")),
          fact_base.invoice_id == col("invoice_key"), "left") 
    .join(dim_track.select("track_sk", col("track_key")), 
          fact_base.track_id == col("track_key"), 
          "left")
    .join(dim_genre.select("genre_sk", col("genre_key")), 
          fact_base.genre_id == col("genre_key"), 
          "left")
    .withColumn("fact_sales_sk", row_number().over(fact_window))
    .select(
        "fact_sales_sk",
        "genre_sk",
        "customer_sk",
        "invoice_sk",
        "track_sk",
        col("unit_price").alias("price")
    )
)

fact_sales.show(n=20, truncate=False)