# 🚗 Projeto: Análise de Corridas de Transporte Privado

Este notebook segue a arquitetura Medallion (Bronze → Silver → Gold) para tratar os dados de um app de transporte.

**Objetivo:** Gerar uma tabela com estatísticas diárias das corridas, separadas por categoria e propósito.

**Camadas:**
- **Bronze:** leitura bruta do CSV
- **Silver:** tratamento e padronização de dados
- **Gold:** agregações diárias

**Ferramentas:** PySpark, Delta Lake, Pandas

## Funções

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_date, to_timestamp, date_format, concat_ws, split, lpad,
    count, avg, min, max, when, col
)
from pyspark.sql.types import DecimalType, IntegerType
import pandas as pd

def write_delta(df, camada, table_name):
    """Escreve um DataFrame em um formato Delta Lake"""
    df.write.format("delta").mode("overwrite").saveAsTable(f"{camada}.{table_name}")
    

def load_bronze_csv(path: str):
    """Leitura do CSV e criação da tabela bronze"""
    df_pandas = pd.read_csv(path, sep=';', header=0)
    df_pandas = df_pandas.where(pd.notna(df_pandas), None) # manter null
    df_spark = spark.createDataFrame(df_pandas)
    # write_delta(df_spark, "bronze", "info_transportes_raw")
    return df_spark


def standardize_datetime_column(df, column_name, output_format="MM-dd-yyyy HH:mm"):
    """
    Padroniza uma coluna de data/hora no formato 'MM-dd-yyyy H:mm' para um novo formato.
    - df: DataFrame de entrada
    - column_name: nome da coluna a ser transformada
    - output_format: formato final da coluna (padrão: 'MM-dd-yyyy HH:mm')
    """
    return df.withColumn(
    column_name,
    date_format(
        to_timestamp(col(column_name), "MM-dd-yyyy H:mm"),
        output_format
        )
    )

def create_dt_ref_column(df, column_name, new_column_name):
    """
    Cria a coluna DT_REF a partir de DATA_INICIO formatada.
    """
    return df.withColumn(
        new_column_name,
        date_format(
            to_timestamp(col(column_name), "MM-dd-yyyy H:mm"),
            "yyyy-MM-dd"
        )
    )

def aggregate_to_gold(df_gold):
    """Agrupa e calcula as métricas diárias"""
    df_gold = df_gold.groupBy("DT_REF").agg(
            count("*").alias("QT_CORR"),
            count(when(col("CATEGORIA") == "Negocio", True)).alias("QT_CORR_NEG"),
            count(when(col("CATEGORIA") == "Pessoal", True)).alias("QT_CORR_PESS"),
            max("DISTANCIA").alias("VL_MAX_DIST"),
            min("DISTANCIA").alias("VL_MIN_DIST"),
            avg("DISTANCIA").alias("VL_AVG_DIST"),
            count(when(col("PROPOSITO") == "Reunião", True)).alias("QT_CORR_REUNI"),
            count(
                when((col("PROPOSITO").isNotNull()) & (col("PROPOSITO") != "Reunião"), True)
            ).alias("QT_CORR_NAO_REUNI")
        )

    return df_gold.withColumn("QT_CORR_NEG", col("QT_CORR_NEG").cast(IntegerType()))\
                  .withColumn("QT_CORR_PESS", col("QT_CORR_PESS").cast(IntegerType()))\
                  .withColumn("VL_MAX_DIST", col("VL_MAX_DIST").cast(DecimalType(10, 2)))\
                  .withColumn("VL_MIN_DIST", col("VL_MIN_DIST").cast(DecimalType(10, 2)))\
                  .withColumn("VL_AVG_DIST", col("VL_AVG_DIST").cast(DecimalType(10, 2)))

## Execução

In [0]:
path_csv = "dbfs:/FileStore/info_transportes.csv"

#Bronze - Ingestão
df_bronze = load_bronze_csv(path_csv)

#Silver - Transformação
# df_bronze = spark.table("bronze.info_transportes_raw")
df_bronze_to_silver = standardize_datetime_column(df_bronze, "DATA_INICIO")
df_bronze_to_silver = standardize_datetime_column(df_bronze_to_silver, "DATA_FIM")
df_bronze_to_silver = create_dt_ref_column(df_bronze_to_silver, "DATA_INICIO", "DT_REF")
# write_delta(df_bronze_to_silver, "silver", "info_transportes_refined")

#Gold - Agregação
# df_silver = spark.table("silver.info_transportes_refined")
df_silver = df_bronze_to_silver
df_gold = aggregate_to_gold(df_silver)
# write_delta(df_gold, "gold", "info_corridas_do_dia")


display(df_gold.show(2,0))

+----------+-------+-----------+------------+-----------+-----------+-----------+-------------+-----------------+
|DT_REF    |QT_CORR|QT_CORR_NEG|QT_CORR_PESS|VL_MAX_DIST|VL_MIN_DIST|VL_AVG_DIST|QT_CORR_REUNI|QT_CORR_NAO_REUNI|
+----------+-------+-----------+------------+-----------+-----------+-----------+-------------+-----------------+
|2016-01-01|1      |1          |0           |51.00      |51.00      |51.00      |0            |1                |
|2016-01-02|2      |2          |0           |5.00       |48.00      |26.50      |0            |1                |
+----------+-------+-----------+------------+-----------+-----------+-----------+-------------+-----------------+
only showing top 2 rows



### Visualização

In [0]:
df_gold.printSchema()

root
 |-- DT_REF: string (nullable = true)
 |-- QT_CORR: long (nullable = false)
 |-- QT_CORR_NEG: integer (nullable = false)
 |-- QT_CORR_PESS: integer (nullable = false)
 |-- VL_MAX_DIST: decimal(10,2) (nullable = true)
 |-- VL_MIN_DIST: decimal(10,2) (nullable = true)
 |-- VL_AVG_DIST: decimal(10,2) (nullable = true)
 |-- QT_CORR_REUNI: long (nullable = false)
 |-- QT_CORR_NAO_REUNI: long (nullable = false)



In [0]:
# Visualização simples de corridas por categoria
display(df_gold.select("DT_REF", "QT_CORR_NEG", "QT_CORR_PESS"))

DT_REF,QT_CORR_NEG,QT_CORR_PESS
2016-08-17,8,0
2016-04-22,4,0
2016-08-08,5,0
2016-09-11,2,0
2016-07-06,5,0
2016-03-17,3,4
2016-04-15,3,0
2016-01-29,6,0
2016-07-23,6,0
2016-01-18,2,0


In [0]:
# Visualização da distância média por dia
display(df_gold.select("DT_REF", "VL_AVG_DIST"))

DT_REF,VL_AVG_DIST
2016-08-17,60.38
2016-04-22,118.75
2016-08-08,32.6
2016-09-11,92.0
2016-07-06,65.0
2016-03-17,127.0
2016-04-15,95.0
2016-01-29,69.33
2016-07-23,61.33
2016-01-18,47.5


In [0]:
%sql
-- Visualização completa da tabela info_corridas_do_dia
select * from gold.info_corridas_do_dia

DT_REF,QT_CORR,QT_CORR_NEG,QT_CORR_PESS,VL_MAX_DIST,VL_MIN_DIST,VL_AVG_DIST,QT_CORR_REUNI,QT_CORR_NAO_REUNI
2016-01-01,1,1,0,51.0,51.0,51.0,0,1
2016-01-02,2,2,0,5.0,48.0,26.5,0,1
2016-01-05,1,1,0,47.0,47.0,47.0,1,0
2016-01-06,3,3,0,71.0,43.0,250.33,1,2
2016-01-07,1,1,0,8.0,8.0,8.0,1,0
2016-01-10,5,5,0,83.0,108.0,98.6,4,1
2016-01-11,3,3,0,19.0,16.0,17.33,0,3
2016-01-12,6,6,0,4.0,151.0,36.33,1,5
2016-01-13,4,4,0,39.0,112.0,122.0,2,2
2016-01-15,3,3,0,8.0,104.0,72.0,0,3
