In [None]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Pipeline Completo de Filmes: Arquitetura Medalhão
# MAGIC 
# MAGIC **Objetivo:** Executar o pipeline de dados de ponta a ponta, desde a ingestão dos dados brutos (Bronze), passando pela limpeza (Silver) e finalizando com as agregações de negócio (Gold).
# MAGIC 
# MAGIC **Linguagens:**
# MAGIC - **Python (PySpark):** Utilizado para a ingestão de arquivos e para as transformações complexas nas camadas Bronze e Silver.
# MAGIC - **SQL:** Utilizado para criar as tabelas de agregação (camada Gold) e para realizar verificações.

# COMMAND ----------

# MAGIC %md
# MAGIC ### 1. Configuração do Ambiente (Python)
# MAGIC 
# MAGIC Define os nomes do catálogo e schemas que serão usados em todo o notebook.

# COMMAND ----------

# Define as variáveis para os nomes do catálogo e schemas.
# Usar variáveis torna o código mais fácil de manter e adaptar.
catalog_name = "movies_catalog"
landing_schema = "landing"
bronze_schema = "bronze"
silver_schema = "silver"
gold_schema = "gold"

# Define o catálogo padrão para a sessão atual.
spark.sql(f"USE CATALOG {catalog_name}")

print(f"Catálogo '{catalog_name}' definido como padrão.")

# COMMAND ----------

# MAGIC %md
# MAGIC ---
# MAGIC ### 2. Camada Bronze: Ingestão da Landing Zone (Python)
# MAGIC 
# MAGIC Lê o arquivo CSV bruto, adiciona metadados de controle e salva na camada Bronze.

# COMMAND ----------

from pyspark.sql.functions import current_timestamp, input_file_name

# Caminho completo para o arquivo de origem no Volume
source_file_path = f"/Volumes/{catalog_name}/{landing_schema}/landing_zone_movies/top_100_movies_full_best_effort.csv"

# Nome completo da tabela de destino na camada Bronze
bronze_table_name = f"{bronze_schema}.movies_raw"

print(f"Iniciando ingestão do arquivo: {source_file_path}")

try:
    # Leitura do arquivo CSV bruto
    df_raw = spark.read.format("csv") \
                       .option("header", "true") \
                       .option("inferSchema", "true") \
                       .load(source_file_path)

    # Adiciona metadados de controle
    df_bronze = df_raw.withColumn("data_ingestao", current_timestamp()) \
                      .withColumn("arquivo_origem", input_file_name())

    # Salva como uma tabela Delta na camada Bronze
    df_bronze.write.format("delta") \
             .mode("overwrite") \
             .option("overwriteSchema", "true") \
             .saveAsTable(bronze_table_name)
    
    print(f"SUCESSO: Tabela '{bronze_table_name}' salva na camada Bronze.")

except Exception as e:
    print(f"ERRO ao processar a camada Bronze.")
    raise e

# COMMAND ----------

# MAGIC %md
# MAGIC ---
# MAGIC ### 3. Camada Silver: Limpeza e Transformação (Python)
# MAGIC 
# MAGIC Lê os dados da camada Bronze, aplica limpezas, converte tipos de dados e padroniza o schema.

# COMMAND ----------

from pyspark.sql.functions import col, regexp_replace, to_date, year

# Nomes das tabelas de origem e destino
bronze_table = f"{bronze_schema}.movies_raw"
silver_table = f"{silver_schema}.movies_cleaned"

print(f"Iniciando transformação da tabela '{bronze_table}' para a camada Silver.")

# Carrega a tabela da camada Bronze
df_bronze = spark.read.table(bronze_table)

# Aplica as transformações e limpezas
df_silver = df_bronze \
    .withColumn("orcamento", regexp_replace(col("budget"), "[$,]", "").cast("long")) \
    .withColumn("receita", regexp_replace(col("revenue"), "[$,]", "").cast("long")) \
    .withColumn("data_lancamento", to_date(col("release_date"), "yyyy-MM-dd")) \
    .withColumn("ano_lancamento", year(col("data_lancamento"))) \
    .withColumnRenamed("original_title", "titulo_original") \
    .withColumnRenamed("vote_average", "nota_media") \
    .withColumnRenamed("runtime", "duracao_min") \
    .drop("budget", "revenue", "release_date", "homepage", "tagline")

# Seleciona e reordena as colunas para a tabela final
df_silver_final = df_silver.select(
    "id", "titulo_original", "data_lancamento", "ano_lancamento", "genres",
    "orcamento", "receita", "duracao_min", "nota_media", "overview"
)

# Salva a tabela limpa na camada Silver
df_silver_final.write.format("delta") \
               .mode("overwrite") \
               .option("overwriteSchema", "true") \
               .saveAsTable(silver_table)

print(f"SUCESSO: Tabela '{silver_table}' salva na camada Silver.")

In [None]:
-- MAGIC %sql
-- MAGIC -- AGREGAÇÃO 1: Lucro por Filme
-- MAGIC -- Cria uma tabela com o cálculo do lucro para cada filme, ordenada do maior para o menor.
-- MAGIC CREATE OR REPLACE TABLE gold.agg_lucro_por_filme AS
-- MAGIC SELECT
-- MAGIC   titulo_original,
-- MAGIC   ano_lancamento,
-- MAGIC   orcamento,
-- MAGIC   receita,
-- MAGIC   (receita - orcamento) AS lucro
-- MAGIC FROM
-- MAGIC   silver.movies_cleaned
-- MAGIC ORDER BY
-- MAGIC   lucro DESC;

# COMMAND ----------

-- MAGIC %sql
-- MAGIC -- AGREGAÇÃO 2: Quantidade de Filmes por Ano
-- MAGIC -- Cria uma tabela que conta quantos filmes foram lançados em cada ano.
-- MAGIC CREATE OR REPLACE TABLE gold.agg_filmes_por_ano AS
-- MAGIC SELECT
-- MAGIC   ano_lancamento,
-- MAGIC   COUNT(id) AS quantidade_filmes
-- MAGIC FROM
-- MAGIC   silver.movies_cleaned
-- MAGIC GROUP BY
-- MAGIC   ano_lancamento
-- MAGIC ORDER BY
-- MAGIC   ano_lancamento DESC;

# COMMAND ----------

-- MAGIC %sql
-- MAGIC -- AGREGAÇÃO 3: Receita Média por Gênero
-- MAGIC -- Extrai os nomes dos gêneros da coluna JSON e calcula a receita média para cada um.
-- MAGIC CREATE OR REPLACE TABLE gold.agg_receita_media_por_genero AS
-- MAGIC WITH movies_genres AS (
-- MAGIC   SELECT
-- MAGIC     receita,
-- MAGIC     explode(from_json(genres, 'array<struct<name:string>>')).name AS genero
-- MAGIC   FROM
-- MAGIC     silver.movies_cleaned
-- MAGIC )
-- MAGIC SELECT
-- MAGIC   genero,
-- MAGIC   AVG(receita) AS receita_media,
-- MAGIC   COUNT(1) AS quantidade_filmes
-- MAGIC FROM
-- MAGIC   movies_genres
-- MAGIC GROUP BY
-- MAGIC   genero
-- MAGIC ORDER BY
-- MAGIC   receita_media DESC;

# COMMAND ----------

# MAGIC %md
# MAGIC ---
# MAGIC ### 5. Verificação Final (SQL)
# MAGIC 
# MAGIC Exibe amostras das tabelas criadas na camada Gold para confirmar o sucesso do pipeline.

# COMMAND ----------

-- MAGIC %sql
-- MAGIC SELECT * FROM gold.agg_lucro_por_filme LIMIT 10;

# COMMAND ----------

-- MAGIC %sql
-- MAGIC SELECT * FROM gold.agg_filmes_por_ano LIMIT 10;

# COMMAND ----------

-- MAGIC %sql
-- MAGIC SELECT * FROM gold.agg_receita_media_por_genero LIMIT 10;

# COMMAND ----------

# MAGIC %md
# MAGIC ## Pipeline Concluído com Sucesso!