In [0]:
from pyspark.sql.functions import *


In [0]:
# Importação dados para o dataframe:
df_gold = spark.read.table("silver.grain_logistic_shipping_silver")
display(df_gold)

In [0]:
# Adicionando as colunas de 'valor_total' (preco * qtd_itens) e 'tempo_entrega_dias' (tempo que levou para chegar a carga);

df_gold = (df_gold.withColumn('valor_total',col('preco') * col('qtd_itens'))
                  .withColumn("tempo_entrega_dias", datediff("data_entrega", "data_envio"))
           )

In [0]:
# Adiciona a receita líquida por venda:

df_gold = (df_gold.withColumn('receita_liquida',
        round(col('valor_total') - ((col('desconto') / 100) * col('valor_total')), 2))
        )

In [0]:
# Adiciona a coluna de ANO-MÊS (para as análises)

df_gold = df_gold.withColumn("periodo",concat_ws("-",year(col("data_envio")),
                  lpad(month(col("data_envio")), 2, '0'))
            )

In [0]:
# Ticket médio por venda:

df_gold = df_gold.withColumn("ticket_medio",
    round((col("receita_liquida") / col("qtd_itens")).cast("double"), 2)
)

In [0]:
# Descritivo qualidade do tempo entrega

df_gold = df_gold.withColumn(
    "categoria_tempo_entrega",
    when(col("tempo_entrega_dias") <= 2, "Muito Rápido")
    .when(col("tempo_entrega_dias") <= 5, "Rápido")
    .when(col("tempo_entrega_dias") <= 10, "Normal")
    .otherwise("Demorado")
)

In [0]:
display(df_gold)

In [0]:
# Dataframe taxa entrega envios por período e destino:

df_taxa_entrega = (
    df_gold.withColumn("data_envio", to_date(col("data_envio"), "yyyy-MM-dd"))
        .withColumn("periodo", concat_ws("-", year(col("data_envio")), lpad(month(col("data_envio")), 2, '0')))
        .groupBy("destino", "periodo")
        .agg(
            count("*").alias("total_envios"),
            sum(when(col("chegou_no_tempo") == 1, 1).otherwise(0)).alias("entregas_no_prazo"),
            round(
                (sum(when(col("chegou_no_tempo") == 1, 1).otherwise(0)) / count("*")) * 100, 2
            ).alias("tx_entrega_prazo_%")
        )
    )

display(df_taxa_entrega)


In [0]:
# Receita, Ticket Médio e Volume por Estado e Período

df_receita_estado = (
    df_gold.withColumn("data_envio", to_date(col("data_envio"), "yyyy-MM-dd"))
      .withColumn("periodo", concat_ws("-", year(col("data_envio")), lpad(month(col("data_envio")), 2, '0')))
      .withColumn("receita_liquida", col("receita_liquida").cast("double"))
      .withColumn("ticket_medio", (col("receita_liquida") / col("qtd_itens")).cast("double"))
      .groupBy("destino", "periodo")
      .agg(
          round(sum("receita_liquida"),2).alias("receita_liquida"),
          round(avg("ticket_medio"),2).alias("ticket_medio"),
          sum("qtd_itens").alias("total_itens"),
          count("*").alias("total_pedidos")
      )
)

display(df_receita_estado)


In [0]:
# Volume de Pedidos e Taxa de Atraso por Método de Envio

df_atraso_metodo = (
    df_gold.withColumn("atrasou", when(col("chegou_no_tempo") == 1, 0).otherwise(1))
      .groupBy("metodo_de_envio")
      .agg(
          count("*").alias("total_envios"),
          sum(when(col("atrasou") == 1, 1).otherwise(0)).alias("total_atrasos"),
          round(
              (sum(when(col("atrasou") == 1, 1).otherwise(0)) / count("*")) * 100, 2
          ).alias("tx_atraso_%")
      )
)

display(df_atraso_metodo)


In [0]:
# (Opcional) Salvando arquivo na camada gold no S3
# OBS: Isso gera um save a mais!


df_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("s3a://grao-direto-mmk/gold/grain_logistic_shipping_gold")

In [0]:
# Salvando arquivo na camada gold do lakehouse no Databricks:
# Criação da tabela Gold no Delta Lake no WorkSpace do Databricks;

spark.sql("CREATE DATABASE IF NOT EXISTS gold")

df_gold.write.format("delta")\
            .option("mergeSchema", "true") \
            .mode("overwrite") \
            .saveAsTable("gold.grain_logistic_shipping_gold_OBT")

df_taxa_entrega.write.format("delta")\
            .option("mergeSchema", "true") \
            .mode("overwrite") \
            .saveAsTable("gold.grain_logistic_shipping_taxa_entrega")

df_receita_estado.write.format("delta")\
            .option("mergeSchema", "true") \
            .mode("overwrite") \
            .saveAsTable("gold.grain_logistic_shipping_receita_estado")

df_atraso_metodo.write.format("delta")\
            .option("mergeSchema", "true") \
            .mode("overwrite") \
            .saveAsTable("gold.grain_logistic_shipping_atraso_metodo")

## Testes de Queries

In [0]:
%sql

-- Teste SQL da tabela:

SELECT * FROM gold.grain_logistic_shipping_gold_OBT

In [0]:
%sql

-- Consulta SQL de produtos por corredor_de_armazenagem:

select 
  corredor_de_armazenagem,
  count(*)
from
  gold.grain_logistic_shipping_gold_OBT
  group by
    corredor_de_armazenagem

In [0]:
%sql

-- Consulta SQL de soma valor_total de produtos por metodo_de_envio:

select
  metodo_de_envio,
  sum(valor_total)
from
  gold.grain_logistic_shipping_gold_OBT
group by
  metodo_de_envio

In [0]:
%sql

-- Consulta SQL de soma valor_total de produtos por importancia:

select
  importancia,
  sum(valor_total) as total,
  sum(ligacoes_do_cliente) as total_ligacoes
from
  gold.grain_logistic_shipping_gold_OBT
group by
  importancia
order by total desc

In [0]:
%sql

-- Top 5 destinos com maior valor_total

select
  destino,
  sum(valor_total) as total
from
  gold.grain_logistic_shipping_gold_OBT
group by
  destino
order by total desc
limit 5

In [0]:
%sql

-- Top 5 últimos destinos com menor valor_total

select
  destino,
  sum(valor_total) as total
from
  gold.grain_logistic_shipping_gold_OBT
group by
  destino
order by total asc
limit 5

In [0]:
%sql

-- Tabela com destino = 'Indefinido'

select
  * 
from
  gold.grain_logistic_shipping_gold_OBT
where destino == 'Indefinido'

In [0]:
%sql

-- Valor_total por metodo_de_envio e importancia

select
  metodo_de_envio, importancia,
  sum(valor_total)
from
  gold.grain_logistic_shipping_gold_OBT
where destino == 'Indefinido'
group by
  metodo_de_envio, importancia
order by metodo_de_envio