##### Sumário – Produto de Dados: Análise de Performance Logística por Estado
O presente projeto propõe a criação de um processo ETL para consolidar informações logísticas dos pedidos realizados na plataforma ACME, com o objetivo de fornecer uma visão estratégica sobre o desempenho de entregas por unidade federativa (UF). A solução foi desenhada para atender principalmente a área de Logística, oferecendo métricas essenciais para monitoramento de SLA, experiência do cliente e eficiência operacional.

In [0]:
display(dbutils.fs.ls('/Volumes/valeparaopeba/landing/vol_landing/mba/trabalho/'))

Passo 01 - Leitura com spark.read dos datasets

In [0]:
caminho = '/Volumes/valeparaopeba/landing/vol_landing/mba/trabalho/'

In [0]:
df_orders = spark.read.csv(f"{caminho}acme_orders.csv", header=True, inferSchema=True)
df_customers = spark.read.csv(f"{caminho}acme_customers.csv", header=True, inferSchema=True)
df_geoloc = spark.read.csv(f"{caminho}acme_geolocation.csv", header=True, inferSchema=True)
df_payments = spark.read.csv(f"{caminho}acme_order_payments.csv", header=True, inferSchema=True)
df_reviews = spark.read.csv(f"{caminho}acme_order_reviews.csv", header=True, inferSchema=True)

Passo 02 - Realizar agregação com JOINs

In [0]:
# Orders + Customers
df = df_orders.join(df_customers, on='customer_id', how='left')

In [0]:
# Customers + Geolocation
df = df.join(
   df_geoloc,
   df.customer_zip_code_prefix == df_geoloc.geolocation_zip_code_prefix,
   how="left"
)

In [0]:
# Orders + Payments
df = df.join(
   df_payments,
   on="order_id",
   how="left"
)

In [0]:
# Orders + Reviews
df = df.join(
   df_reviews,
   on="order_id",
   how="left"
)

Passo 03 - Filtro de pedidos entregues

In [0]:
df = df.filter(df.order_status == "delivered")

Passo 04 - Coluna calculada de atraso

In [0]:
from pyspark.sql.functions import col, when, count, date_diff, lit, avg, sum, current_timestamp

In [0]:
df = df.withColumn('atraso_dias', 
                   when(
                       col('order_delivered_customer_date') > col('order_estimated_delivery_date'),
                       date_diff("order_delivered_customer_date", "order_estimated_delivery_date")) \
                    .otherwise(lit(0)))

In [0]:
display(df.limit(10))

Passo 05 - Agregação por UF (geolocation_state)

In [0]:
resultado = df.groupBy("geolocation_state").agg(
   count("order_id").alias("total_pedidos"),
   avg("atraso_dias").alias("atraso_medio_dias"),
   (
       sum(when(col("atraso_dias") > 0, 1).otherwise(0)) /
       count("order_id")
   ).alias("perc_pedidos_atrasados"),
   avg("review_score").alias("nota_media_review"),
   avg("payment_value").alias("ticket_medio"),
   sum("payment_value").alias("valor_total"),
   count("review_score").alias("qtd_pedidos_avaliados"),
   current_timestamp().alias("data_processamento")
)

In [0]:
display(resultado)

Passo 06 - Salvando como tabela de consumo

In [0]:
resultado.write \
    .mode('overwrite') \
    .option("overwriteSchema", "true") \
    .format('delta') \
    .saveAsTable('personalfinance.silver.mba_resultado')

In [0]:
%sql
SHOW TABLES IN personalfinance.silver;

In [0]:
%sql
SELECT * FROM personalfinance.silver.mba_resultado;