In [1]:
# Cell 1: Install dependencies if needed
!pip install delta-spark==2.4.0



In [2]:
# Cell 2: Import necessary modules and start Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, trim, regexp_replace, broadcast
from delta import *

# Caminho do warehouse Hive no HDFS
warehouse_location = "hdfs://hdfs-nn:9000/warehouse"

# Criação da sessão Spark com suporte a Hive + Delta Lake
spark = (
    SparkSession.builder
    .appName("Gold_Analise_Filmes")
    # ---- configurações Hive ----
    .config("spark.sql.warehouse.dir", warehouse_location)
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")
    .config("spark.sql.catalogImplementation", "hive")
    .config("hive.metastore.warehouse.dir", warehouse_location)
    # ---- extensões Delta Lake ----
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # ---- pacote Delta compatível com Spark 3.4.1 (Scala 2.12) ----
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .enableHiveSupport()
    .getOrCreate()
)

print("Spark iniciado com sucesso — versão:", spark.version)

Spark iniciado com sucesso — versão: 3.4.1


In [3]:
# Cell 3: Create gold database if not exists
spark.sql("""
    CREATE DATABASE IF NOT EXISTS gold
    LOCATION 'hdfs://hdfs-nn:9000/warehouse/gold.db'
""")

print("Base de dados 'gold' pronta.")

Base de dados 'gold' pronta.


In [4]:
# Cell 4: Load silver tables
boxoffice_df = spark.table("silver.boxoffice")
adaptations_df = spark.table("silver.adaptations")
actorfilms_df = spark.table("silver.actorfilms")
rating_movies_df = spark.table("silver.rating_movies")

print("Tabelas silver carregadas com sucesso.")

Tabelas silver carregadas com sucesso.


In [5]:
# Cell 7: Primeiro join: adaptations com rating_movies
# Join em movie_title_norm == title_norm e movie_year == release_year
# Usar broadcast na menor (adaptations)
joined_adapt_rating = (
    adaptations_df
    .join(
        broadcast(rating_movies_df),
        (adaptations_df.title== rating_movies_df.title_norm) &
        (adaptations_df.release_year == rating_movies_df.release_year),
        "left"
    )
    .select(
        adaptations_df.author,
        adaptations_df.book_title,
        adaptations_df.title,
        adaptations_df.release_year,
        rating_movies_df.writers,
        rating_movies_df.rating,
        rating_movies_df.tomatometer_status,
        rating_movies_df.tomatometer_rating,
        rating_movies_df.tomatometer_count,
        rating_movies_df.audience_rating,
        rating_movies_df.audience_count,
        rating_movies_df.critics_consensus,
        rating_movies_df.cast
    )
)


In [7]:
# Cell 8: Segundo join: resultado anterior com boxoffice
# Join em movie_title_norm == title e movie_year == year
# Usar broadcast na boxoffice (provavelmente pequena)
joined_adapt_rating_box = (
    joined_adapt_rating
    .join(
        broadcast(boxoffice_df),
        (joined_adapt_rating.title == boxoffice_df.title) &
        (joined_adapt_rating.release_year == boxoffice_df.year),
        "left"
    )
    .select(
        joined_adapt_rating.author,
        joined_adapt_rating.book_title,
        joined_adapt_rating.title,
        joined_adapt_rating.release_year,
        joined_adapt_rating.writers,
        joined_adapt_rating.rating,
        joined_adapt_rating.tomatometer_status,
        joined_adapt_rating.tomatometer_rating,
        joined_adapt_rating.tomatometer_count,
        joined_adapt_rating.audience_rating,
        joined_adapt_rating.audience_count,
        joined_adapt_rating.critics_consensus,
        joined_adapt_rating.cast,
        boxoffice_df.gross,
        boxoffice_df.decade
    )
)


In [14]:
# Cell 9: Terceiro join: resultado anterior com actorfilms
# Primeiro, normalizar o título no actorfilms (já que não está normalizado)
# Join em movie_title_norm == title_norm e movie_year == year
# Usar broadcast no resultado anterior (deve ser pequeno agora)

from pyspark.sql.functions import col, lower , trim , regexp_replace , broadcast
actorfilms_norm_df = (
    actorfilms_df
    .withColumn("title", lower(trim(regexp_replace(col("title"), r"[^a-z0-9 ]", ""))))
)

gold_analise_filmes_df = (
    joined_adapt_rating_box
    .join(
        broadcast(actorfilms_norm_df),
        (joined_adapt_rating_box.title == actorfilms_norm_df.title) &
        (joined_adapt_rating_box.release_year == actorfilms_norm_df.year),
        "left"
    )
    .select(
        joined_adapt_rating_box.author,
        joined_adapt_rating_box.book_title,
        joined_adapt_rating_box.title,
        joined_adapt_rating_box.release_year,
        joined_adapt_rating_box.writers,
        joined_adapt_rating_box.rating,
        joined_adapt_rating_box.tomatometer_status,
        joined_adapt_rating_box.tomatometer_rating,
        joined_adapt_rating_box.tomatometer_count,
        joined_adapt_rating_box.audience_rating,
        joined_adapt_rating_box.audience_count,
        joined_adapt_rating_box.critics_consensus,
        joined_adapt_rating_box.cast,
        joined_adapt_rating_box.gross,
        joined_adapt_rating_box.decade,
        actorfilms_norm_df.actor,
        actorfilms_norm_df.votes,
        actorfilms_norm_df.rating.alias("actor_rating"),  # Renomear para evitar conflito com outra rating
        actorfilms_norm_df.film_id,
        actorfilms_norm_df.actor_id
    )
)

In [None]:
# Cell 10: Write to gold.analise_filmes as Delta table
(
    gold_analise_filmes_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .option("path", "hdfs://hdfs-nn:9000/warehouse/gold.db/analise_filmes")
    .saveAsTable("gold.analise_filmes")
)

print("Tabela gold.analise_filmes gravada com sucesso!")

In [None]:
# Cell 11: Verify the gold table
spark.sql("""
    SELECT * FROM gold.analise_filmes LIMIT 5
""").show(truncate=False)

In [None]:
from pyspark.sql.functions import concat_ws, col

df = spark.table("gold.analise_filmes")

df.coalesce(1) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .csv("/tmp/analus_csv")

In [None]:
# Cell 12: Stop Spark session
spark.stop()