In [0]:
from pyspark.sql.functions import col, count, sum, avg, month, year, round, lit

silver_path = "/mnt/silver"
gold_path = "/mnt/gold"

# Carregar as tabelas da camada Silver
clientes_df = spark.read.parquet(f"{silver_path}/clientes")
pedidos_df = spark.read.parquet(f"{silver_path}/pedidos")
pagamentos_df = spark.read.parquet(f"{silver_path}/pagamentos")
entregas_df = spark.read.parquet(f"{silver_path}/entregas")

print("Tabelas da camada Silver carregadas com sucesso.")

In [0]:
# Join entre os pedidos e pagamentos
pedidos_pagos_df = pedidos_df.join(pagamentos_df, "pedido_id", "inner")

# Agregação para calcular a receita mensal
receita_mensal_df = pedidos_pagos_df.groupBy(
    year("data_pedido").alias("ano"),
    month("data_pedido").alias("mes")
).agg(
    round(sum("preco_total"), 2).alias("receita_total"),
    count("pedido_id").alias("total_pedidos")
).orderBy("ano", "mes")

# Salvar a tabela agregada na camada Gold
receita_mensal_df.write.mode("overwrite").format("parquet").save(f"{gold_path}/receita_mensal")

print("Tabela 'receita_mensal' salva na camada Gold.")
display(receita_mensal_df)

In [0]:
# Agregação para contar a quantidade vendida por produto
produtos_mais_vendidos_df = pedidos_df.groupBy("produto_nome") \
    .agg(
        sum("quantidade").alias("quantidade_vendida"),
        round(sum("preco_total"), 2).alias("receita_gerada")
    ) \
    .orderBy(col("quantidade_vendida").desc()) \
    .limit(10) # Pegando o Top 10

# Salvar na camada Gold
produtos_mais_vendidos_df.write.mode("overwrite").format("parquet").save(f"{gold_path}/produtos_mais_vendidos")

print("Tabela 'produtos_mais_vendidos' salva na camada Gold.")
display(produtos_mais_vendidos_df)

In [0]:
from pyspark.sql.functions import col, when, sum, count, round

# Agregação para analisar o SLA de entrega por estado
sla_por_regiao_df = entregas_df.groupBy("estado_entrega") \
    .agg(
        count("*").alias("total_entregas"),
        sum(when(col("sla_atrasado") == True, 1).otherwise(0)).alias("total_atrasos")
    ) \
    .withColumn("percentual_atraso", round(col("total_atrasos") / col("total_entregas") * 100, 2)) \
    .orderBy(col("percentual_atraso").desc())

# Salvar na camada Gold
sla_por_regiao_df.write.mode("overwrite").format("parquet").save(f"{silver_path}/gold/sla_por_regiao")

print("Tabela 'sla_por_regiao' salva na camada Gold.")
display(sla_por_regiao_df)


In [0]:
# Agregação para contar o uso de cada método de pagamento
pagamentos_usados_df = pagamentos_df.groupBy("metodo_pagamento") \
    .agg(count("*").alias("quantidade_usos")) \
    .orderBy(col("quantidade_usos").desc())

# Salvar na camada Gold
pagamentos_usados_df.write.mode("overwrite").format("parquet").save(f"{gold_path}/meios_pagamento_usados")

print("Tabela 'meios_pagamento_usados' salva na camada Gold.")
display(pagamentos_usados_df)