In [0]:
# -----------------------------------------------------------
# 1. IMPORTAR BIBLIOTECAS
# -----------------------------------------------------------
from pyspark.sql.functions import current_timestamp
from pyspark.sql import Row
import warnings

warnings.filterwarnings("ignore")

In [0]:
# -----------------------------------------------------------
# 2. CONFIGURAR BANCOS DE DADOS (SCHEMAS)
# -----------------------------------------------------------
print("schemas criados (databases)")

spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
spark.sql("CREATE DATABASE IF NOT EXISTS silver")
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

schemas criados (databases)


DataFrame[]

In [0]:
# -----------------------------------------------------------
# 3. CARGAS DA CAMADA BRONZE (RAW -> DELTA)
# -----------------------------------------------------------
# Caminho base (ajuda se precisar mudar o volume depois)
base_path = "/Volumes/workspace/default/books_goodreads"

# --- 3.1 CARREGAR BOOKS.CSV ---
print("Books carregado")
books_bronze = (
    spark.read
        .option("header", True)
        .option("inferSchema", True) # Identifica int, double, string automaticamente
        .csv(f"{base_path}/books.csv")
)

# Salva sobrescrevendo e permitindo sobrescrita de schema (overwriteSchema é mais seguro que mergeSchema para cargas full)
books_bronze.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bronze.books")

Books carregado


In [0]:
# --- 3.2 CARREGAR RATINGS.CSV ---
print("Ratings carregado")
ratings_bronze = (
    spark.read
        .option("header", True)
        .option("inferSchema", True)
        .csv(f"{base_path}/ratings.csv")
)

ratings_bronze.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bronze.ratings")

Ratings carregado


In [0]:
# --- 3.3 CARREGAR TAGS.CSV ---
print("Tags carregado")
tags_bronze = (
    spark.read
        .option("header", True)
        .option("inferSchema", True) # Adicionei inferSchema para garantir IDs numéricos
        .csv(f"{base_path}/tags.csv")
)

tags_bronze.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bronze.tags")

Tags carregado


In [0]:
# --- 3.4 CARREGAR BOOK_TAGS.CSV ---
# CORREÇÃO: Caminho do arquivo alterado de tags.csv para book_tags.csv
print("Book_Tags carregado")
book_tags_bronze = (
    spark.read
        .option("header", True)
        .option("inferSchema", True)
        .csv(f"{base_path}/book_tags.csv") 
)

book_tags_bronze.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bronze.book_tags")

Book_Tags carregado


In [0]:
# --- 3.5 CARREGAR POPULARITY.CSV ---
print("Popularity carregado")
popularity_bronze = (
    spark.read
        .option("header", True)
        .option("inferSchema", True)
        .option("sep", ",") 
        .csv(f"{base_path}/popularity_id.csv")
)

popularity_bronze.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bronze.popularity")

print("Carga Bronze concluída com sucesso!")

Popularity carregado
Carga Bronze concluída com sucesso!


In [0]:
# -----------------------------------------------------------
# 1. IMPORTS E FUNÇÕES AUXILIARES
# -----------------------------------------------------------
from pyspark.sql.functions import col, trim, when, lower, current_timestamp
# Importe colunas que serão usadas (ex: year se precisar tratar nulos)

print("Camada SILVER")

Camada SILVER


In [0]:
# -----------------------------------------------------------
# 2. TABELA BOOKS (Corrigida com tratamento de erros/dirty data)
# -----------------------------------------------------------
from pyspark.sql.functions import col, trim, expr

print("Books Silver")

books_bronze = spark.table("bronze.books")

books_silver = (
    books_bronze
        .select(
            # IDs
            col("id").cast("int").alias("book_id"),
            col("book_id").cast("int").alias("goodreads_book_id"),
            col("best_book_id").cast("int"),
            col("work_id").cast("int"),
            
            # Dados do Livro
            col("books_count").cast("int"),
            col("isbn"),
            col("isbn13").cast("string"),
            
            trim(col("authors")).alias("authors"),
            # try_cast para ano, pois as vezes vem texto sujo
            expr("try_cast(original_publication_year as int)").alias("publication_year"),
            trim(col("original_title")).alias("original_title"),
            trim(col("title")).alias("title"),
            col("language_code"),
            
            # --- AQUI ESTAVA O ERRO ---
            # Usamos try_cast para que 'eng' vire NULL em vez de travar o pipeline
            expr("try_cast(average_rating as double)").alias("average_rating"),
            expr("try_cast(ratings_count as int)").alias("ratings_count")
        )
        # Removemos linhas que ficaram com ID nulo (sujeira severa)
        .filter(col("book_id").isNotNull())
        .dropDuplicates(["book_id"])
)

books_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.books")
print("Tabela Books processada com sucesso.")

Books Silver
Tabela Books processada com sucesso.


In [0]:
# -----------------------------------------------------------
# 3. TABELA RATINGS (Interações)
# -----------------------------------------------------------
print("Ratings Silver")

ratings_bronze = spark.table("bronze.ratings")

ratings_silver = (
    ratings_bronze
        .select(
            col("book_id").cast("int"),
            col("user_id").cast("int"),
            col("rating").cast("int")
        )
        # Remove duplicatas se o mesmo usuário avaliou o mesmo livro duas vezes (mantém um)
        .dropDuplicates(["book_id", "user_id"])
)

ratings_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.ratings")

Ratings Silver


In [0]:
# -----------------------------------------------------------
# 4. TABELA POPULARITY (Auxiliar)
# -----------------------------------------------------------
print("Popularity Silver")

popularity_bronze = spark.table("bronze.popularity")

popularity_silver = (
    popularity_bronze
        .dropDuplicates(["popularity_id"])
        .withColumn("popularity_level", trim(col("popularity_level")))
        .withColumn("rule", trim(col("rule")))
)

popularity_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.popularity")


Processando Popularity...


In [0]:
# -----------------------------------------------------------
# 5. TABELA TAGS (Gêneros/Categorias)
# -----------------------------------------------------------
print("Tags Silver")

tags_bronze = spark.table("bronze.tags")

tags_silver = (
    tags_bronze
        .dropDuplicates(["tag_id"])
)

tags_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.tags")

Tags Silver


In [0]:
# -----------------------------------------------------------
# 6. TABELA BOOK_TAGS (Ligação N:N)
# -----------------------------------------------------------
print("Book_Tags Silver")

book_tags_bronze = spark.table("bronze.book_tags") 

book_tags_silver = (
    book_tags_bronze
        .select(
            col("goodreads_book_id"),
            col("tag_id"),
            col("count")
        )
        # Importante: goodreads_book_id NÃO é o mesmo que book_id da tabela books no dataset Kaggle padrão.
        .dropDuplicates(["goodreads_book_id", "tag_id"])
)

book_tags_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.book_tags")

print("Camada SILVER concluída com sucesso!")

Book_Tags Silver
Camada SILVER concluída com sucesso!


In [0]:
# -----------------------------------------------------------
# 1. FATO INTERAÇÕES
# -----------------------------------------------------------
from pyspark.sql.functions import col, to_date

print("Fato Interações Gold")

ratings_silver = spark.table("silver.ratings")

# A Fato deve conter os IDs para ligar nas dimensões e as métricas
fato_interacoes = (
    ratings_silver
        .select(
            col("user_id"),
            col("book_id"),
            col("rating"),
            # Adicionamos uma chave substituta ou usamos a combinação user-book como PK lógica
        )
)

fato_interacoes.write.format("delta").mode("overwrite").saveAsTable("gold.fato_interacoes")

Fato Interações Gold


In [0]:
# -----------------------------------------------------------
# 2. DIMENSÃO USUÁRIOS
# -----------------------------------------------------------
from pyspark.sql.functions import count, avg, round

print("Dimensão Usuários Gold")

dim_usuarios = (
    ratings_silver
        .groupBy("user_id")
        .agg(
            count("book_id").alias("total_reviews"),
            round(avg("rating"), 2).alias("avg_score_given")
        )
)

dim_usuarios.write.format("delta").mode("overwrite").saveAsTable("gold.dim_usuarios")

Dimensão Usuários Gold


In [0]:
# -----------------------------------------------------------
# 3. DATAFRAME AUXILIAR 
# -----------------------------------------------------------
# Calculamos a média real baseada nos dados que temos, não no metadado antigo do CSV
df_metrics = (
    ratings_silver
        .groupBy("book_id")
        .agg(
            avg("rating").alias("calc_avg_rating"),
            count("rating").alias("calc_ratings_count")
        )
)

In [0]:
# -----------------------------------------------------------
# 4. DIMENSÃO LIVROS 
# -----------------------------------------------------------
from pyspark.sql.functions import when, col, lit

print("Dimensão Livros Gold")

books_silver = spark.table("silver.books")

# Join com as métricas calculadas acima
dim_books_logic = (
    books_silver
        .join(df_metrics, on="book_id", how="left")
        # Preencher nulos para livros que não tiveram reviews neste dataset
        .fillna(0, subset=["calc_ratings_count"]) 
)

# Aplicar Regra de Popularidade
dim_livros_gold = (
    dim_books_logic
        .withColumn(
            "popularity_level",
            when(col("calc_ratings_count") < 100, "Baixa") # Ajustei para 100 pois o dataset é amostra
            .when((col("calc_ratings_count") >= 100) & (col("calc_ratings_count") <= 1000), "Média")
            .otherwise("Alta")
        )
)

# Selecionar e organizar colunas finais
dim_livros_final = dim_livros_gold.select(
    "book_id",
    "goodreads_book_id",
    "title",
    "authors",
    "publication_year",
    "calc_avg_rating",
    "calc_ratings_count",
    "popularity_level",
    "average_rating" # Mantendo a original para comparação de Data Quality
)

dim_livros_final.write.format("delta").mode("overwrite").saveAsTable("gold.dim_livros")

Dimensão Livros Gold


In [0]:
# -----------------------------------------------------------
# 5. TAGS e BRIDGE (N:N)
# -----------------------------------------------------------
print("tabelas de Tags Gold")

# Dimensão Tag (Apenas o dicionário de tags)
tags_silver = spark.table("silver.tags")
tags_silver.write.format("delta").mode("overwrite").saveAsTable("gold.dim_tags")

# Tabela Ponte (Bridge) para ligar Livros <-> Tags
# IMPORTANTE: Usamos goodreads_book_id aqui
book_tags_silver = spark.table("silver.book_tags")

bridge_livros_tags = (
    book_tags_silver
        .join(tags_silver, on="tag_id", how="inner")
        .select(
            col("goodreads_book_id"), # Chave de ligação
            col("tag_id"),
            col("tag_name"),
            col("count").alias("tag_count")
        )
)

bridge_livros_tags.write.format("delta").mode("overwrite").saveAsTable("gold.bridge_livros_tags")

print("Pipeline GOLD finalizado com sucesso!")

tabelas de Tags Gold
Pipeline GOLD finalizado com sucesso!
