# 🎯 Transformação: Silver to Gold
## Caminhos absolutos corrigidos

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, avg, max, min, countDistinct, when, col
import os

# Configurar Spark
spark = SparkSession.builder \
    .appName("Silver to Gold Transformations") \
    .getOrCreate()

# CAMINHOS ABSOLUTOS CORRETOS
base_path = "/home/jovyan/data/"
silver_path = base_path + "silver/"
gold_path = base_path + "gold/"

print("📁 Verificando estrutura de diretórios...")
print("Diretório atual:", os.getcwd())
print("Conteúdo de data:", os.listdir(base_path))
print("Conteúdo de silver:", os.listdir(silver_path))

# Garantir que diretório gold existe
os.makedirs(gold_path, exist_ok=True)
print("✅ Diretório gold preparado!")

## 1. Ler dados da camada Silver

In [None]:
# Ler dados Silver
orders_silver = spark.read.parquet(f"{silver_path}orders")
customers_silver = spark.read.parquet(f"{silver_path}customers")
products_silver = spark.read.parquet(f"{silver_path}products")

print("✅ Dados Silver carregados:")
print(f"Orders: {orders_silver.count()} registros")
print(f"Customers: {customers_silver.count()} registros")
print(f"Products: {products_silver.count()} registros")

print("\n📋 Amostra de Orders:")
orders_silver.show(3)
orders_silver.printSchema()

## 2. Análise de Vendas por Cliente

In [None]:
# Vendas por cliente
sales_by_customer = orders_silver \
    .groupBy("customer_id") \
    .agg(
        sum("order_amount").alias("total_gasto"),
        count("order_id").alias("total_pedidos"),
        avg("order_amount").alias("valor_medio_pedido"),
        max("order_amount").alias("maior_valor_pedido"),
        min("order_date").alias("primeira_compra"),
        max("order_date").alias("ultima_compra")
    )

print("✅ Vendas por cliente calculadas!")
sales_by_customer.show(5)

## 3. Análise de Vendas por Categoria

In [None]:
sales_by_category = orders_silver \
    .join(products_silver, "product_id") \
    .groupBy("product_category") \
    .agg(
        sum("order_amount").alias("faturamento_total"),
        sum("quantity").alias("unidades_vendidas"),
        countDistinct("order_id").alias("pedidos_unicos"),
        avg("order_amount").alias("ticket_medio")
    ) \
    .orderBy(col("faturamento_total").desc())

print("✅ Vendas por categoria calculadas!")
sales_by_category.show()

## 4. Customer 360 (Visão Completa)

In [None]:
customer_360 = customers_silver \
    .join(sales_by_customer, "customer_id", "left") \
    .withColumn("segmento_cliente", 
               when(col("total_gasto") > 5000, "VIP")
               .when(col("total_gasto") > 2000, "Premium")
               .otherwise("Standard")) \
    .withColumn("frequencia_compra", 
               when(col("total_pedidos") > 10, "Alta")
               .when(col("total_pedidos") > 5, "Média")
               .otherwise("Baixa"))

print("✅ Customer 360 criado!")
customer_360.show(5)

## 5. Escrever na camada Gold

In [None]:
# Escrever tabelas Gold
sales_by_customer.write.format("parquet").mode("overwrite").save(f"{gold_path}vendas_por_cliente")
sales_by_category.write.format("parquet").mode("overwrite").save(f"{gold_path}vendas_por_categoria")
customer_360.write.format("parquet").mode("overwrite").save(f"{gold_path}customer_360")

print("✅ Tabelas Gold criadas com sucesso!")

## 6. Verificação Final

In [None]:
# Verificar criação das tabelas
print("📊 Tabelas criadas na camada Gold:")
gold_files = os.listdir(gold_path)
for file in gold_files:
    df = spark.read.parquet(f"{gold_path}{file}")
    print(f"{file}: {df.count()} registros")

print("\n🎯 Resumo das vendas por categoria:")
spark.read.parquet(f"{gold_path}vendas_por_categoria").show()

print("\n✅ Transformação Silver → Gold concluída com sucesso!")

## 7. Visualização dos Resultados

In [None]:
# Instalar bibliotecas de visualização se necessário
try:
    import matplotlib.pyplot as plt
    import seaborn as sns
except:
    print("Instalando bibliotecas de visualização...")
    !pip install matplotlib seaborn
    import matplotlib.pyplot as plt
    import seaborn as sns

# Converter para Pandas
sales_category_pd = spark.read.parquet(f"{gold_path}vendas_por_categoria").toPandas()

print("📈 Visualização - Faturamento por Categoria")

plt.figure(figsize=(12, 6))
sns.barplot(x='product_category', y='faturamento_total', data=sales_category_pd)
plt.title('Faturamento por Categoria de Produto')
plt.xlabel('Categoria')
plt.ylabel('Faturamento Total')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print("🎉 Análise e visualização concluídas!")