In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

catalog = "movielens"
silver = f"{catalog}.silver"
gold = f"{catalog}.gold"

movies = spark.table(f"{silver}.movies")
mov_gen  = spark.table(f"{silver}.movie_genres")
ratings = spark.table(f"{silver}.ratings")
tags_s = spark.table(f"{silver}.tags") 

## Dim User

In [0]:
dim_user = ratings.select(F.col("userId").cast("bigint").alias("userId")).distinct()
dim_user.write.format("delta").mode("overwrite").saveAsTable(f"{gold}.dim_user")

## Dim Genre

In [0]:
dim_genre = (
    mov_gen
      .select(F.lower(F.trim("genre")).alias("canon"),
              F.trim("genre").alias("genre"))
      .where(F.col("genre").isNotNull() & (F.col("genre") != ""))
      .distinct()
      .withColumn("genre_id", F.abs(F.xxhash64("canon")))
      .select("genre_id", "genre")
)
dim_genre.write.format("delta").mode("overwrite").saveAsTable(f"{gold}.dim_genre")

## Dim Movie

In [0]:
dim_movie = (
    movies
      .withColumn("primary_genre",
                  F.when(F.size("genres") > 0, F.col("genres")[0]).otherwise(F.lit(None)))
      .select(
          F.col("movieId").cast("bigint").alias("movieId"),
          F.col("title"),
          F.col("year").cast("int").alias("year"),
          F.lower(F.trim("primary_genre")).alias("pg_canon")
      )
      .join(
          dim_genre.select("genre_id", F.lower(F.col("genre")).alias("canon_dim")),
          on=F.col("pg_canon")==F.col("canon_dim"),
          how="left"
      )
      .select("movieId", "title", "year", "genre_id")
)
dim_movie.write.format("delta").mode("overwrite").saveAsTable(f"{gold}.dim_movie")

## Dim Date

In [0]:
date_series = (
    ratings.select(F.sequence(F.min("rating_date"), F.max("rating_date")).alias("dates"))
           .select(F.explode("dates").alias("date"))
)
dim_date = date_series.select(
    F.col("date"),
    F.date_format("date", "yyyyMMdd").cast("int").alias("date_key"),
    F.year("date").alias("year"),
    F.quarter("date").alias("quarter"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.date_format("date", "EEEE").alias("weekday"),
    F.dayofweek("date").alias("weekday_num")
)
dim_date.write.format("delta").mode("overwrite").saveAsTable(f"{gold}.dim_date")

## Dim Tag

In [0]:
dim_tag = (
    tags_s
      .select(F.lower(F.trim("tag")).alias("canon"),
              F.trim("tag").alias("tag"))
      .where(F.col("tag").isNotNull() & (F.col("tag") != ""))
      .groupBy("canon").agg(F.first("tag").alias("tag"))
      .withColumn("tag_id", F.abs(F.xxhash64("canon")))
      .select("tag_id", "tag")
)
dim_tag.write.format("delta").mode("overwrite").saveAsTable(f"{gold}.dim_tag")

## Fact Ratings

In [0]:
tags_norm = (
    tags_s
      .join(dim_tag, F.lower(F.trim(tags_s.tag)) == F.lower(F.trim(dim_tag.tag)), "inner")
      .select(
          F.col("userId").cast("bigint").alias("userId"),
          F.col("movieId").cast("bigint").alias("movieId"),
          F.col("tag_id").cast("bigint").alias("tag_id"),
          F.col("tag_ts").alias("tag_ts")
      )
)

ratings_rx = ratings.select(
    F.col("userId").cast("bigint").alias("userId"),
    F.col("movieId").cast("bigint").alias("movieId"),
    F.col("rating").cast("double").alias("rating"),
    F.col("rating_ts").alias("rating_ts"),
    F.col("rating_date").alias("rating_date"),
    F.monotonically_increasing_id().alias("rating_row_id")
)

joined = (
    ratings_rx.alias("r")
      .join(
          tags_norm.alias("t"),
          (F.col("r.userId")==F.col("t.userId")) &
          (F.col("r.movieId")==F.col("t.movieId")) &
          (F.col("t.tag_ts")<=F.col("r.rating_ts")),
          "left"
      )
)

w = Window.partitionBy("r.rating_row_id").orderBy(F.col("t.tag_ts").desc(), F.col("t.tag_id").asc())
fct_rating = (
    joined
      .withColumn("rn", F.row_number().over(w))
      .filter(F.col("rn")==1)
      .select(
          F.col("r.userId").alias("userId"),
          F.col("r.movieId").alias("movieId"),
          F.date_format("r.rating_date", "yyyyMMdd").cast("int").alias("date_key"),
          F.col("r.rating").alias("rating"),
          F.col("r.rating_ts").alias("rating_ts"),
          F.col("r.rating_date").alias("rating_date"),
          F.col("t.tag_id").alias("tag_id")   # puede ser NULL si no hay tag
      )
)

(fct_rating.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("rating_date")
    .option("overwriteSchema","true")
    .saveAsTable(f"{gold}.fct_rating"))