IMPORTS Y RUTAS

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StructType, StructField, LongType, StringType

#Bronze en Unity Catalog Volume
BRONZE_BASE = "/Volumes/tmdb/default/bronze_tmdb"
BRONZE_GENRES = f"{BRONZE_BASE}/genres"
BRONZE_DISCOVER = f"{BRONZE_BASE}/discover"

#Unity Catalog
CATALOG = "tmdb"
SCHEMA = "default"

T_SILVER_GENRES = f"{CATALOG}.{SCHEMA}.silver_genres"
T_SILVER_MOVIES = f"{CATALOG}.{SCHEMA}.silver_movies"
T_SILVER_MOVIE_GENRES = f"{CATALOG}.{SCHEMA}.silver_movie_genres"

BRONZE_GENRES, BRONZE_DISCOVER, T_SILVER_MOVIES


('/Volumes/tmdb/default/bronze_tmdb/genres',
 '/Volumes/tmdb/default/bronze_tmdb/discover',
 'tmdb.default.silver_movies')

CARGA BRONZE 

In [0]:
df_genres_raw   = spark.read.option("multiLine", "true").json(f"{BRONZE_GENRES}/*.json")
df_discover_raw = spark.read.option("multiLine", "true").json(f"{BRONZE_DISCOVER}/*.json")


CREAR SILVER_GENRES

In [0]:
# 2) SILVER: GENRES
df_silver_genres = (
    df_genres_raw
    .select(F.explode("genres").alias("g"))
    .select(
        F.col("g.id").alias("genre_id"),
        F.col("g.name").alias("genre_name")
    )
    .filter(F.col("genre_id").isNotNull())
    .dropDuplicates(["genre_id"])
)

print("silver_genres rebuilt:", df_silver_genres.count())
display(df_silver_genres)

# Re-escribir tabla Silver genres en Unity Catalog
df_silver_genres.write.format("delta").mode("overwrite").saveAsTable(T_SILVER_GENRES)

# Verificación
print("table silver_genres now:", spark.table(T_SILVER_GENRES).count())

silver_genres rebuilt: 19


genre_id,genre_name
12,Adventure
14,Fantasy
16,Animation
18,Drama
27,Horror
28,Action
35,Comedy
36,History
37,Western
53,Thriller


table silver_genres now: 19


3) EXPLODE MOVIES (desde results)

In [0]:
# 3) EXPLODE MOVIES (desde results)
df_movies_exploded = df_discover_raw.select(F.explode("results").alias("m"))

silver_genres

In [0]:
# Schema de cada género
genre_schema = ArrayType(
    StructType([
        StructField("id", LongType(), True),
        StructField("name", StringType(), True)
    ])
)

df_silver_genres = (
    df_genres_raw
    # Si genres ya es array, from_json no daña; si es string, lo convierte bien.
    .withColumn("genres_parsed", F.from_json(F.col("genres").cast("string"), genre_schema))
    .select(F.explode("genres_parsed").alias("g"))
    .select(
        F.col("g.id").alias("genre_id"),
        F.col("g.name").alias("genre_name")
    )
    .dropDuplicates(["genre_id"])
)

df_silver_genres.printSchema()
df_silver_genres.display()
df_genres_raw.printSchema()



root
 |-- genre_id: long (nullable = true)
 |-- genre_name: string (nullable = true)



genre_id,genre_name


root
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)



4) SILVER: MOVIES

In [0]:
# 4) SILVER: MOVIES
df_silver_movies = (
    df_movies_exploded
    .select(
        F.col("m.id").alias("movie_id"),
        F.col("m.title").alias("title"),
        F.to_date(F.col("m.release_date"), "yyyy-MM-dd").alias("release_date"),
        F.col("m.vote_average").cast("double").alias("vote_average"),
        F.col("m.vote_count").cast("long").alias("vote_count"),
        F.col("m.popularity").cast("double").alias("popularity"),
        F.col("m.original_language").alias("original_language")
    )
    .filter(F.col("movie_id").isNotNull())
    .dropDuplicates(["movie_id"])
)

5) SILVER: MOVIE_GENRES (tabla puente)

In [0]:
# 5) SILVER: MOVIE_GENRES (tabla puente)
df_silver_movie_genres = (
    df_movies_exploded
    .select(
        F.col("m.id").alias("movie_id"),
        F.explode(F.col("m.genre_ids")).alias("genre_id")
    )
    .filter(F.col("movie_id").isNotNull() & F.col("genre_id").isNotNull())
    .dropDuplicates(["movie_id", "genre_id"])
)

7) GUARDAR COMO DELTA TABLES EN UNITY CATALOG

In [0]:

# 6) GUARDAR COMO DELTA TABLES EN UNITY CATALOG
df_silver_genres.write.format("delta").mode("overwrite").saveAsTable(T_SILVER_GENRES)
df_silver_movies.write.format("delta").mode("overwrite").saveAsTable(T_SILVER_MOVIES)
df_silver_movie_genres.write.format("delta").mode("overwrite").saveAsTable(T_SILVER_MOVIE_GENRES)