In [0]:
# ========================================
# CONFIGURACIÃ“N STORAGE CON MANAGED IDENTITY
# ========================================

storage_account = "adbdatalake01111"

spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net",
    "OAuth"
)

spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider"
)

bronze_path = f"abfss://bronze@{storage_account}.dfs.core.windows.net/movies"
silver_path = f"abfss://silver@{storage_account}.dfs.core.windows.net/movies"


In [0]:
movies_df = spark.read.parquet(f"{bronze_path}/movies")
film_details_df = spark.read.parquet(f"{bronze_path}/film_details")
more_info_df = spark.read.parquet(f"{bronze_path}/more_info")
poster_path_df = spark.read.parquet(f"{bronze_path}/poster_path")


In [0]:
display(movies_df.limit(5))
display(film_details_df.limit(5))


In [0]:
movies_df = movies_df.withColumnRenamed("id", "movie_id")
film_details_df = film_details_df.withColumnRenamed("id", "film_id")
more_info_df = more_info_df.withColumnRenamed("id", "more_info_id")
poster_path_df = poster_path_df.withColumnRenamed("id", "poster_id")

In [0]:
from pyspark.sql.functions import col


In [0]:
movies_film_df = (
    movies_df.alias("m")
    .join(
        film_details_df.alias("f"),
        col("m.movie_id") == col("f.film_id"),
        "left"
    )
)

In [0]:
movies_info_df = (
    movies_film_df.alias("mf")
    .join(
        more_info_df.alias("i"),
        col("mf.movie_id") == col("i.more_info_id"),
        "left"
    )
)


In [0]:
movies_enriched_df = (
    movies_info_df.alias("mi")
    .join(
        poster_path_df.alias("p"),
        col("mi.movie_id") == col("p.poster_id"),
        "left"
    )
)
display(movies_enriched_df)

In [0]:
from pyspark.sql.functions import col

silver_movies_df = movies_enriched_df.select(
    col("movie_id"),
    col("title"),
    col("release_date"),
    col("genres"),
    col("runtime"),
    col("vote_count").alias("rating"),
    col("director"),
    col("poster_path")
)


In [0]:
from pyspark.sql.functions import col

silver_movies_df = (
    silver_movies_df
    .withColumn("rating", col("rating").cast("double"))
    #.withColumn("runtime", col("runtime").cast("int"))
    .filter(col("title").isNotNull())
    #.filter(col("runtime") > 0)
)


In [0]:
from pyspark.sql.functions import (
    regexp_extract, col, when
)

silver_movies_df = (
    silver_movies_df
    # Extraer horas
    .withColumn(
        "hours",
        regexp_extract(col("runtime"), r"(\d+)h", 1).cast("int")
    )
    # Extraer minutos
    .withColumn(
        "minutes",
        regexp_extract(col("runtime"), r"(\d+)\s*min", 1).cast("int")
    )
    # Convertir a minutos totales
    .withColumn(
        "runtime_minutes",
        when(col("hours").isNotNull(), col("hours") * 60).otherwise(0)
        + when(col("minutes").isNotNull(), col("minutes")).otherwise(0)
    )
    # Eliminar columnas auxiliares
    .drop("hours", "minutes")
)

silver_movies_df = silver_movies_df.filter(col("runtime_minutes") > 0)

display(silver_movies_df)

In [0]:
silver_movies_df = silver_movies_df.dropDuplicates(["movie_id"])


In [0]:
silver_movies_df.write.mode("overwrite").parquet(
    f"{silver_path}/silver_movies"
)
