In [None]:
# ===========================================
# 💾 SILVER - LIMPEZA, PADRONIZAÇÃO E SALVAMENTO (VERSÃO FINAL REFORÇADA)
# ===========================================

from pyspark.sql.functions import col, when, regexp_replace, lower, trim, length, min as min_, max as max_
from pyspark.sql.window import Window

# ========================
# 🧹 LIMPEZA DE DADOS
# ========================

# Books (sem ISBN nulo ou vazio)
df_books_bronze = spark.table("book_bronze.books")
df_books_clean = df_books_bronze.filter(col("ISBN").isNotNull() & (col("ISBN") != ""))

# Ratings (sem user_id ou isbn nulos)
df_ratings_bronze = spark.table("book_bronze.ratings")
df_ratings_clean = df_ratings_bronze.filter(col("User-ID").isNotNull() & col("ISBN").isNotNull())

# Users (filtrando idade fora da faixa)
df_users_bronze = spark.table("book_bronze.users")
df_users_clean = df_users_bronze.filter(
    col("User-ID").isNotNull() & 
    (col("Age").cast("int") >= 5) & (col("Age").cast("int") <= 100)
)

# ========================
# ✍️ PADRONIZAÇÃO
# ========================

# Books - padronização e limpeza extra de author e publisher
df_books_silver_raw = df_books_clean.select(
    col("ISBN").alias("isbn"),
    col("Book-Title").alias("title"),
    regexp_replace(
        regexp_replace(trim(lower(col("Book-Author"))), r'&amp;', '&'),
        r'[\"“”]+', ''
    ).alias("author"),
    col("Year-Of-Publication").cast("int").alias("year"),
    regexp_replace(trim(lower(col("Publisher"))), r'&amp;', '&').alias("publisher"),
    col("Image-URL-S").alias("img_url_s"),
    col("Image-URL-M").alias("img_url_m"),
    col("Image-URL-L").alias("img_url_l")
).filter(
    (col("author").isNotNull()) &
    (length(col("author")) > 3) &
    (~col("author").rlike("^[0-9]+$")) &
    (~col("author").rlike("[0-9]{4,}")) &
    (~col("author").rlike("committee|conference|school|publishers|ltd|inaugural|collection")) &
    (~col("author").rlike("^[;:\\-]")) &
    (~col("author").rlike("\\:")) &
    (col("year").isNotNull()) &
    (col("year") >= 1450) &
    (col("year") <= 2025)
)

# Remover autores cujo intervalo de anos entre livros seja 50 ou mais
year_window = Window.partitionBy("author")
df_books_silver_filtered = df_books_silver_raw.withColumn("min_year", min_("year").over(year_window)) \
                                                   .withColumn("max_year", max_("year").over(year_window)) \
                                                   .filter((col("max_year") - col("min_year")) < 50) \
                                                   .drop("min_year", "max_year")

# Ratings
ratings_pad = df_ratings_clean.select(
    col("User-ID").alias("user_id"),
    col("ISBN").alias("isbn"),
    col("Book-Rating").alias("rating")
)

# Users
users_pad = df_users_clean.select(
    col("User-ID").alias("user_id"),
    trim(lower(col("Location"))).alias("location"),
    col("Age").cast("int").alias("age")
)

# ========================
# 🔗 ESTRUTURAÇÃO DE TABELAS
# ========================

# Remover duplicados
books_unique = df_books_silver_filtered.dropDuplicates(["isbn"])
ratings_unique = ratings_pad.dropDuplicates()
users_unique = users_pad.dropDuplicates(["user_id"])

# Tabela auxiliar: autores únicos
authors_raw = books_unique.select("author").distinct()

# ========================
# 🧹 LIMPEZA DE TABELAS EXISTENTES (se houver)
# ========================

spark.sql("DROP TABLE IF EXISTS book_silver.books")
spark.sql("DROP TABLE IF EXISTS book_silver.ratings")
spark.sql("DROP TABLE IF EXISTS book_silver.users")
spark.sql("DROP TABLE IF EXISTS book_silver.authors_raw")

# ========================
# 💾 SALVAMENTO FINAL
# ========================

books_unique.write.mode("overwrite").saveAsTable("book_silver.books")
ratings_unique.write.mode("overwrite").saveAsTable("book_silver.ratings")
users_unique.write.mode("overwrite").saveAsTable("book_silver.users")
authors_raw.write.mode("overwrite").saveAsTable("book_silver.authors_raw")