In [0]:
from pyspark.sql.functions import (
    col, sum, avg, count, expr, min, max, datediff, current_date, when, round, desc, month, year
)
from pyspark.sql.window import Window

df_silver_vendas = spark.table("classes.silver.vendas")
df_silver_estoque = spark.table("classes.silver.estoque")
df_silver_produtos = spark.table("classes.silver.produtos")


In [0]:
df_producao = df_silver_estoque.withColumn("ano_fabricacao", year("data_fabricacao"))

df_gold_producao_anual = df_producao.groupBy("ano_fabricacao", "local_fabricacao") \
    .agg(
        sum("quantidade_estoque").alias("total_itens_produzidos"),
        count("id_produto").alias("numero_lotes_produzidos")
    ) \
    .orderBy("ano_fabricacao")

display(df_gold_producao_anual)

df_gold_producao_anual.write.format("delta").mode("overwrite").saveAsTable("classes.gold.producao_anual")

In [0]:
df_gold_moda_produtos = df_silver_vendas.groupBy("nome_produto") \
    .agg(count("*").alias("frequencia_vendas")) \
    .orderBy(col("frequencia_vendas").desc())

display(df_gold_moda_produtos.limit(5))

df_gold_moda_produtos.write.format("delta").mode("overwrite").saveAsTable("classes.gold.variedade_produtos")

In [0]:
df_produtos_limpa = df_silver_produtos.drop("nome_produto", "nome_subproduto")

df_join_estoque = df_silver_estoque.join(df_produtos_limpa, on=["id_produto", "id_subproduto"], how="left")

df_calculo_estoque = df_join_estoque \
    .withColumn("dias_para_vencer", datediff(col("data_validade"), current_date())) \
    .withColumn("valor_financeiro_estoque", col("quantidade_estoque") * col("valor_custo")) \
    .withColumn("status_validade", 
                when(col("dias_para_vencer") < 0, "VENCIDO")
                .when(col("dias_para_vencer") <= 60, "ALERTA (Vence em 60 dias)")
                .otherwise("SEGURO")
    )

df_gold_estoque = df_calculo_estoque.groupBy("local_fabricacao", "status_validade", "nome_produto") \
    .agg(
        sum("quantidade_estoque").alias("qtd_total_estoque"),
        round(sum("valor_financeiro_estoque"), 2).alias("valor_total_parado"),
        round(avg("dias_para_vencer"), 0).alias("media_dias_vencimento")
    ) \
    .orderBy("media_dias_vencimento")

display(df_gold_estoque)

df_gold_estoque.write.format("delta").mode("overwrite").saveAsTable("classes.gold.analise_estoque")

In [0]:
from pyspark.sql.functions import avg, desc, round, count

df_produtos = spark.table("classes.silver.produtos")

df_gold_margem_marca = df_produtos.withColumn(
    "margem_lucro", col("valor_venda") - col("valor_custo")
).withColumn(
    "margem_percentual", ((col("valor_venda") - col("valor_custo")) / col("valor_venda")) * 100
).groupBy("nome_marca").agg(
    count("id_produto").alias("qtd_produtos_mix"),
    round(avg("valor_custo"), 2).alias("custo_medio"),
    round(avg("valor_venda"), 2).alias("preco_venda_medio"),
    round(avg("margem_lucro"), 2).alias("lucro_medio_por_unidade"),
    round(avg("margem_percentual"), 2).alias("margem_media_perc")
).orderBy(desc("margem_media_perc"))

display(df_gold_margem_marca)

df_gold_margem_marca.write.format("delta").mode("overwrite").saveAsTable("classes.gold.analise_margem_marcas")

In [0]:
from pyspark.sql.functions import sum, desc, format_number

df_vendas = spark.table("classes.silver.vendas")

df_gold_geo_vendas = df_vendas.groupBy("local_fabricacao").agg(
    count("nr_lote").alias("total_transacoes"),
    sum("quantidade_estoque").alias("volume_total_vendido")
).withColumn(
    "participacao_relativa", 
    round((col("volume_total_vendido") / sum("volume_total_vendido").over(Window.partitionBy())) * 100, 2)
).orderBy(desc("volume_total_vendido"))

display(df_gold_geo_vendas)

from pyspark.sql.window import Window
df_gold_geo_vendas.write.format("delta").mode("overwrite").saveAsTable("classes.gold.geo_vendas")

In [0]:
from pyspark.sql.functions import col, sum, round

df_s_vendas = spark.table("classes.silver.vendas")
df_s_estoque = spark.table("classes.silver.estoque")
df_s_produtos = spark.table("classes.silver.produtos")

agg_vendas = df_s_vendas.groupBy("id_produto").agg(
    sum("quantidade_estoque").alias("qtd_vendida_total") 
)

agg_estoque = df_s_estoque.groupBy("id_produto").agg(
    sum("quantidade_estoque").alias("qtd_atual_estoque")
)

df_gold_giro = df_s_produtos.select("id_produto", "nome_produto", "nome_marca", "valor_venda") \
    .join(agg_vendas, on="id_produto", how="left") \
    .join(agg_estoque, on="id_produto", how="left")

df_gold_giro = df_gold_giro.na.fill(0, subset=["qtd_vendida_total", "qtd_atual_estoque"])

df_gold_giro_final = df_gold_giro.withColumn(
    "receita_realizada", round(col("qtd_vendida_total") * col("valor_venda"), 2)
).withColumn(
    "receita_potencial_estoque", round(col("qtd_atual_estoque") * col("valor_venda"), 2)
).orderBy(desc("receita_realizada"))

display(df_gold_giro_final)

df_gold_giro_final.write.format("delta").mode("overwrite").saveAsTable("classes.gold.monitoramento_giro_financeiro")