In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

silver_path = "/Volumes/workspace/default/imdb_silver"
gold_base = "/Volumes/workspace/default/imdb_gold"

df = spark.read.format("delta").load(silver_path)

dim_title = (df.select(
                "tconst",
                "titleType",
                "primaryTitle",
                "originalTitle",
                "is_adult",
                "runtimeMinutes",
                "startYear"
             )
             .dropDuplicates(["tconst"])
             .withColumnRenamed("startYear","year")
            )

dim_genre = (df.select("tconst", F.explode_outer("genres_array").alias("genre"))
               .filter(F.col("genre").isNotNull())
               .dropDuplicates(["tconst","genre"])
            )

dim_date = (df.select(F.col("year_key").alias("year"))
             .filter(F.col("year").isNotNull())
             .dropDuplicates()
             .withColumn("decade", (F.col("year")/10).cast("int")*10)
           )

fact = (df.select("tconst","averageRating","numVotes","year_key","titleType")
          .withColumn("averageRating", F.col("averageRating").cast("double"))
          .withColumn("numVotes", F.col("numVotes").cast("long"))
       )

try:
    dbutils.fs.rm(gold_base, recurse=True)
except Exception:
    pass

dim_title.write.format("delta").mode("overwrite").save(gold_base + "/dim_title")
dim_genre.write.format("delta").mode("overwrite").save(gold_base + "/dim_genre")
dim_date.write.format("delta").mode("overwrite").save(gold_base + "/dim_date")
fact.write.format("delta").mode("overwrite").partitionBy("year_key").save(gold_base + "/fact_title_rating")

print("Gold salvo em:", gold_base)
display(dbutils.fs.ls(gold_base))


Gold salvo em: /Volumes/workspace/default/imdb_gold


path,name,size,modificationTime
dbfs:/Volumes/workspace/default/imdb_gold/dim_date/,dim_date/,0,1764615720700
dbfs:/Volumes/workspace/default/imdb_gold/dim_genre/,dim_genre/,0,1764615720700
dbfs:/Volumes/workspace/default/imdb_gold/dim_title/,dim_title/,0,1764615720700
dbfs:/Volumes/workspace/default/imdb_gold/fact_title_rating/,fact_title_rating/,0,1764615720700


In [0]:
print("dim_title:", spark.read.format("delta").load(gold_base+"/dim_title").count())
print("dim_genre:", spark.read.format("delta").load(gold_base+"/dim_genre").count())
print("fact:", spark.read.format("delta").load(gold_base+"/fact_title_rating").count())


dim_title: 2341369
dim_genre: 3871466
fact: 2341369


In [0]:
# Carregar os Delta files que você já salvou (gold path)
gold_base = "/Volumes/workspace/default/imdb_gold"

dim_title_df = spark.read.format("delta").load(gold_base + "/dim_title")
dim_genre_df = spark.read.format("delta").load(gold_base + "/dim_genre")
dim_date_df  = spark.read.format("delta").load(gold_base + "/dim_date")
fact_df      = spark.read.format("delta").load(gold_base + "/fact_title_rating")

# Criar database se não existe
spark.sql("CREATE DATABASE IF NOT EXISTS imdb_mvp")
spark.sql("USE imdb_mvp")

# Salvar como tabelas gerenciadas no catálogo (padrão)
dim_title_df.write.format("delta").mode("overwrite").saveAsTable("imdb_mvp.dim_title")
dim_genre_df.write.format("delta").mode("overwrite").saveAsTable("imdb_mvp.dim_genre")
dim_date_df.write.format("delta").mode("overwrite").saveAsTable("imdb_mvp.dim_date")
fact_df.write.format("delta").mode("overwrite").saveAsTable("imdb_mvp.fact_title_rating")

# Verificar
spark.sql("SHOW TABLES IN imdb_mvp").show(truncate=False)


+--------+-----------------+-----------+
|database|tableName        |isTemporary|
+--------+-----------------+-----------+
|imdb_mvp|dim_date         |false      |
|imdb_mvp|dim_genre        |false      |
|imdb_mvp|dim_title        |false      |
|imdb_mvp|fact_title_rating|false      |
+--------+-----------------+-----------+

