In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType

print("Iniciando Camada Gold - Projeto 1: Logística...")

#Criação do Database Gold
spark.sql("CREATE DATABASE IF NOT EXISTS gold")


print("Criando tabela gold.ft_vendas_consumidor_local...")

#Ler as tabelas Silver necessárias
df_pedido_total = spark.table("silver.pedido_total")
df_consumidores = spark.table("silver.ft_consumidores")

#Realizar o Join
df_vendas_local = df_pedido_total.join(
    df_consumidores,
    on="id_consumidor",
    how="inner"
)

#Selecionar e renomear colunas conforme PDF
df_ft_vendas_local = df_vendas_local.select(
    F.col("id_pedido"),
    F.col("id_consumidor"),
    F.col("valor_total_pago_brl").alias("valor_total_pedido_brl"),
    F.col("cidade"),
    F.col("estado"),
    F.col("data_pedido")
)

#Salvar a FATO na camada Gold
df_ft_vendas_local.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold.ft_vendas_consumidor_local")

print("Tabela gold.ft_vendas_consumidor_local criada com sucesso.")

print("Criando view gold.view_total_compras_por_consumidor...")

query_view = """
    SELECT 
        cidade,
        estado,
        COUNT(id_pedido) AS quantidade_vendas,
        SUM(valor_total_pedido_brl) AS valor_total_localidade
    FROM gold.ft_vendas_consumidor_local
    GROUP BY cidade, estado
"""

#Executa o comando SQL para criar a view
spark.sql(f"CREATE OR REPLACE VIEW gold.view_total_compras_por_consumidor AS {query_view}")

print("View gold.view_total_compras_por_consumidor criada com sucesso.")


print("--- Resposta da Pergunta de Negócio: Total de Vendas por Estado ---")
spark.sql("""
    SELECT 
        estado, 
        SUM(valor_total_localidade) as total_vendas_estado
    FROM gold.view_total_compras_por_consumidor
    GROUP BY estado
    ORDER BY total_vendas_estado DESC
""").show()

In [0]:
#logistica_de_atrasos
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

print("Iniciando Camada Gold - Projeto 2: Logística (Atrasos)...")


print("Criando tabela gold.ft_atrasos_pedidos_local_vendedor...")

#Carregar tabelas Silver necessárias
df_pedidos = spark.table("silver.ft_pedidos")
df_consumidores = spark.table("silver.ft_consumidores")
df_itens = spark.table("silver.ft_itens_pedidos")

#Realizar Joins
#Precisamos: Pedido -> Consumidor (para local)
#Precisamos: Pedido -> Item (para pegar o id_vendedor)
df_join_passo1 = df_pedidos.join(df_consumidores, on="id_consumidor", how="inner")
df_completo = df_join_passo1.join(df_itens, on="id_pedido", how="inner")

#Selecionar colunas conforme PDF
df_ft_atrasos = df_completo.select(
    F.col("id_pedido"),
    F.col("id_vendedor"),
    F.col("id_consumidor"),
    F.col("entrega_no_prazo"),
    F.col("tempo_entrega_dias"),
    F.col("tempo_entrega_estimado_dias"),
    F.col("cidade"),
    F.col("estado")
)

#Salvar a FATO
df_ft_atrasos.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold.ft_atrasos_pedidos_local_vendedor")

print("Tabela gold.ft_atrasos_pedidos_local_vendedor criada com sucesso.")


print("Criando view gold.view_tempo_medio_entrega_localidade...")

query_view_local = """
    SELECT 
        cidade,
        estado,
        CAST(AVG(tempo_entrega_dias) AS DECIMAL(10,2)) AS tempo_medio_entrega,
        CAST(AVG(tempo_entrega_estimado_dias) AS DECIMAL(10,2)) AS tempo_medio_estimado,
        CASE 
            WHEN AVG(tempo_entrega_dias) > AVG(tempo_entrega_estimado_dias) THEN 'SIM'
            ELSE 'NÃO'
        END AS entrega_maior_que_estimado
    FROM gold.ft_atrasos_pedidos_local_vendedor
    GROUP BY cidade, estado
"""

spark.sql(f"CREATE OR REPLACE VIEW gold.view_tempo_medio_entrega_localidade AS {query_view_local}")
print("View gold.view_tempo_medio_entrega_localidade criada.")


print("Criando view gold.view_vendedor_pontualidade...")

query_view_vendedor = """
    SELECT 
        id_vendedor,
        COUNT(id_pedido) as total_pedidos,
        SUM(CASE WHEN entrega_no_prazo = 'Não' THEN 1 ELSE 0 END) as total_atrasados,
        CAST(
            (SUM(CASE WHEN entrega_no_prazo = 'Não' THEN 1 ELSE 0 END) / COUNT(id_pedido)) * 100 
            AS DECIMAL(10,2)
        ) as percentual_atraso
    FROM gold.ft_atrasos_pedidos_local_vendedor
    GROUP BY id_vendedor
"""

spark.sql(f"CREATE OR REPLACE VIEW gold.view_vendedor_pontualidade AS {query_view_vendedor}")
print("View gold.view_vendedor_pontualidade criada.")

In [0]:
#comercial
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, IntegerType, DateType

print("Iniciando Camada Gold - Projeto 3: Comercial...")


print("Criando dimensão gold.dm_tempo...")

#Descobrir o intervalo de datas (do primeiro ao último pedido)
min_max_datas = spark.table("silver.ft_pedidos") \
    .select(
        F.min("pedido_compra_timestamp").alias("data_min"),
        F.max("pedido_compra_timestamp").alias("data_max")
    ).first()

#Gerar a sequência de datas (sequence + explode)
df_datas = spark.sql(f"""
    SELECT explode(sequence(to_date('{min_max_datas.data_min}'), to_date('{min_max_datas.data_max}'), interval 1 day)) AS data
""")

#Criar as colunas de calendário (Extração e Tradução)
df_dm_tempo = df_datas.select(
    F.col("data").alias("sk_tempo"),
    F.year("data").alias("ano"),
    F.quarter("data").alias("trimestre"),
    F.month("data").alias("mes"),
    F.weekofyear("data").alias("semana_do_ano"),
    F.dayofmonth("data").alias("dia"),
    F.dayofweek("data").alias("dia_da_semana_num"), # 1=Domingo, 7=Sábado no padrão Spark padrão
    
    #Tradução do Nome do Dia
    F.when(F.dayofweek("data") == 1, "Domingo")
     .when(F.dayofweek("data") == 2, "Segunda-feira")
     .when(F.dayofweek("data") == 3, "Terça-feira")
     .when(F.dayofweek("data") == 4, "Quarta-feira")
     .when(F.dayofweek("data") == 5, "Quinta-feira")
     .when(F.dayofweek("data") == 6, "Sexta-feira")
     .when(F.dayofweek("data") == 7, "Sábado")
     .alias("dia_da_semana_nome"),

    #Tradução do Nome do Mês
    F.when(F.month("data") == 1, "Janeiro")
     .when(F.month("data") == 2, "Fevereiro")
     .when(F.month("data") == 3, "Março")
     .when(F.month("data") == 4, "Abril")
     .when(F.month("data") == 5, "Maio")
     .when(F.month("data") == 6, "Junho")
     .when(F.month("data") == 7, "Julho")
     .when(F.month("data") == 8, "Agosto")
     .when(F.month("data") == 9, "Setembro")
     .when(F.month("data") == 10, "Outubro")
     .when(F.month("data") == 11, "Novembro")
     .when(F.month("data") == 12, "Dezembro")
     .alias("mes_nome"),
     
    #Flag Fim de Semana (1=Dom, 7=Sab)
    F.when(F.dayofweek("data").isin([1, 7]), "Sim")
     .otherwise("Não")
     .alias("eh_fim_de_semana")
)

#Salvar tabela
df_dm_tempo.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold.dm_tempo")
print("Tabela gold.dm_tempo criada com sucesso.")


print("Criando fato gold.ft_vendas_geral...")

#Carregar tabelas Silver
df_itens = spark.table("silver.ft_itens_pedidos")
df_pedidos = spark.table("silver.ft_pedidos")
df_cotacao = spark.table("silver.dm_cotacao_dolar")
df_avaliacoes = spark.table("silver.ft_avaliacoes_pedidos")

#Preparar Avaliações (Média por pedido, caso haja mais de uma)
df_aval_agg = df_avaliacoes.groupBy("id_pedido").agg(
    F.avg("avaliacao").cast(DecimalType(3,2)).alias("avaliacao_pedido")
)

#Realizar os Joins
df_join = df_itens.join(df_pedidos, on="id_pedido", how="inner")

#Adiciona coluna de data temporária para join com cotação
df_join = df_join.withColumn("data_ref", F.to_date("pedido_compra_timestamp"))

#Join com Cotação
df_join = df_join.join(df_cotacao, df_join.data_ref == df_cotacao.data, how="left")

#Join com Avaliação
df_join = df_join.join(df_aval_agg, on="id_pedido", how="left")

#Calcular valores e Selecionar colunas finais
df_ft_vendas_geral = df_join.select(
    F.col("id_pedido"),
    F.col("id_item"),
    F.col("id_consumidor").alias("fk_cliente"),
    F.col("id_produto").alias("fk_produto"),
    F.col("id_vendedor").alias("fk_vendedor"),
    F.col("data_ref").alias("fk_tempo"),
    F.col("status").alias("status_pedido"),
    F.col("tempo_entrega_dias"),
    F.col("entrega_no_prazo"),
    
    #Valores BRL
    F.col("preco_BRL").alias("valor_produto_brl"),
    F.col("preco_frete").alias("valor_frete_brl"),
    (F.col("preco_BRL") + F.col("preco_frete")).cast(DecimalType(12,2)).alias("valor_total_item_brl"),
    
    #Valores USD (Valor / Cotacao)
    (F.col("preco_BRL") / F.col("cotacao_dolar")).cast(DecimalType(12,2)).alias("valor_produto_usd"),
    (F.col("preco_frete") / F.col("cotacao_dolar")).cast(DecimalType(12,2)).alias("valor_frete_usd"),
    ((F.col("preco_BRL") + F.col("preco_frete")) / F.col("cotacao_dolar")).cast(DecimalType(12,2)).alias("valor_total_item_usd"),
    
    F.col("cotacao_dolar"),
    F.col("avaliacao_pedido")
)

#Salvar tabela
df_ft_vendas_geral.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold.ft_vendas_geral")
print("Tabela gold.ft_vendas_geral criada com sucesso.")

In [0]:
#views_comercial
print("Iniciando Criação de Views Analíticas - Projeto 3: Comercial...")

print("Criando view gold.view_vendas_por_periodo...")

query_periodo = """
    SELECT 
        t.ano,
        t.trimestre,
        t.mes,
        t.mes_nome,
        t.dia,
        t.dia_da_semana_num,
        COUNT(DISTINCT f.id_pedido) AS total_pedidos,
        COUNT(f.id_item) AS total_itens,
        SUM(f.valor_total_item_brl) AS receita_total_brl,
        SUM(f.valor_total_item_usd) AS receita_total_usd,
        AVG(f.valor_total_item_brl) AS ticket_medio_brl,
        AVG(f.avaliacao_pedido) AS avaliacao_media
    FROM gold.ft_vendas_geral f
    JOIN gold.dm_tempo t ON f.fk_tempo = t.sk_tempo
    GROUP BY t.ano, t.trimestre, t.mes, t.mes_nome, t.dia, t.dia_da_semana_num
"""

spark.sql(f"CREATE OR REPLACE VIEW gold.view_vendas_por_periodo AS {query_periodo}")
print("View gold.view_vendas_por_periodo criada.")

print("--- 1. Dia da semana com maior receita total (BRL) ---")
spark.sql("""
    SELECT dia_da_semana_num, SUM(receita_total_brl) as receita_acumulada
    FROM gold.view_vendas_por_periodo
    GROUP BY dia_da_semana_num
    ORDER BY receita_acumulada DESC
    LIMIT 1
""").show()

print("--- 2. Mês com maior ticket médio (BRL) no último ano disponível ---")
#primeiro descobrimos qual é o último ano, depois filtramos
spark.sql("""
    SELECT mes_nome, AVG(ticket_medio_brl) as ticket_medio_mensal
    FROM gold.view_vendas_por_periodo
    WHERE ano = (SELECT MAX(ano) FROM gold.view_vendas_por_periodo)
    GROUP BY mes_nome
    ORDER BY ticket_medio_mensal DESC
    LIMIT 1
""").show()


print("Criando view gold.view_top_produto...")

#Precisamos juntar a Fato Geral com a Tabela de Produtos (Silver) para pegar nome/peso
query_top_prod = """
    SELECT 
        p.id_produto,
        p.categoria_produto,
        COUNT(f.id_item) AS quantidade_vendida,
        COUNT(DISTINCT f.id_pedido) AS total_pedidos,
        SUM(f.valor_total_item_brl) AS receita_brl,
        SUM(f.valor_total_item_usd) AS receita_usd,
        AVG(f.valor_produto_brl) AS preco_medio_brl,
        AVG(f.avaliacao_pedido) AS avaliacao_media,
        AVG(p.peso_produto_gramas) AS peso_medio_gramas
    FROM gold.ft_vendas_geral f
    JOIN silver.ft_produtos p ON f.fk_produto = p.id_produto
    GROUP BY p.id_produto, p.categoria_produto
"""

spark.sql(f"CREATE OR REPLACE VIEW gold.view_top_produto AS {query_top_prod}")
print("View gold.view_top_produto criada.")


print("Criando view gold.view_vendas_produtos_esteticos...")

#CTE 
query_fashion = """
    WITH vendas_fashion AS (
        SELECT 
            t.ano,
            t.mes,
            p.categoria_produto,
            f.id_pedido,
            f.id_item,
            f.valor_total_item_brl,
            f.valor_total_item_usd,
            f.avaliacao_pedido
        FROM gold.ft_vendas_geral f
        JOIN gold.dm_tempo t ON f.fk_tempo = t.sk_tempo
        JOIN silver.ft_produtos p ON f.fk_produto = p.id_produto
        WHERE p.categoria_produto LIKE 'fashion%'
    )
    SELECT 
        ano,
        mes,
        categoria_produto,
        COUNT(DISTINCT id_pedido) AS total_pedidos,
        COUNT(id_item) AS total_itens_vendidos,
        SUM(valor_total_item_brl) AS receita_total_brl,
        SUM(valor_total_item_usd) AS receita_total_usd,
        AVG(valor_total_item_brl) AS ticket_medio_brl,
        AVG(valor_total_item_usd) AS ticket_medio_usd,
        AVG(avaliacao_pedido) AS avaliacao_media
    FROM vendas_fashion
    GROUP BY ano, mes, categoria_produto
"""

spark.sql(f"CREATE OR REPLACE VIEW gold.view_vendas_produtos_esteticos AS {query_fashion}")
print("View gold.view_vendas_produtos_esteticos criada.")

print("--- PROJETO 3 CONCLUÍDO E NOTEBOOK GOLD FINALIZADO ---")