# 🚗 Projeto: Análise de Corridas de Transporte Privado

Este notebook segue a arquitetura Medallion (Bronze → Silver → Gold) para tratar os dados de um app de transporte.

**Objetivo:** Gerar uma tabela com estatísticas diárias das corridas, separadas por categoria e propósito.

**Camadas:**
- **Bronze:** leitura bruta do CSV
- **Silver:** tratamento e padronização de dados
- **Gold:** agregações diárias

**Ferramentas:** PySpark, Delta Lake, Pandas

In [0]:
%sql
create schema if not exists bronze;
create schema if not exists silver;
create schema if not exists gold;

In [0]:
path_csv = "/Workspace/Users/kelvin_soares@outlook.com/info_transportes.csv"

## Funções

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_date, to_timestamp, date_format, concat_ws, split, lpad,
    count, avg, min, max, when, col
)
import pandas as pd

def write_delta(df, camada, table_name):
    """Escreve um DataFrame em um formato Delta Lake"""
    df.write.format("delta").mode("overwrite").saveAsTable(f"{camada}.{table_name}")
    

def load_bronze_csv(path: str):
    """Leitura do CSV e criação da tabela bronze"""
    df_pandas = pd.read_csv(path, sep=";", header=0)
    df_pandas = df_pandas.where(pd.notna(df_pandas), None)  # manter nulls
    df_spark = spark.createDataFrame(df_pandas)
    write_delta(df_spark, "bronze", "info_transportes_raw")


def standardize_datetime_column(df, column_name, output_format="MM-dd-yyyy HH:mm"):
    """
    Padroniza uma coluna de data/hora no formato 'MM-dd-yyyy H:mm' para um novo formato.
    - df: DataFrame de entrada
    - column_name: nome da coluna a ser transformada
    - output_format: formato final da coluna (padrão: 'MM-dd-yyyy HH:mm')
    """
    return df.withColumn(
    column_name,
    date_format(
        to_timestamp(col(column_name), "MM-dd-yyyy H:mm"),
        output_format
        )
    )

def create_dt_ref_column(df, column_name, new_column_name):
    """
    Cria a coluna DT_REF a partir de DATA_INICIO formatada.
    """
    return df.withColumn(
        new_column_name,
        date_format(
            to_timestamp(col(column_name), "MM-dd-yyyy H:mm"),
            "yyyy-MM-dd"
        )
    )

def aggregate_to_gold(df):
    """Agrupa e calcula as métricas diárias"""
    return df.groupBy("DT_REF").agg(
        count("*").alias("QT_CORR"),
        count(when(col("CATEGORIA") == "Negocio", True)).alias("QT_CORR_NEG"),
        count(when(col("CATEGORIA") == "Pessoal", True)).alias("QT_CORR_PESS"),
        max("DISTANCIA").alias("VL_MAX_DIST"),
        min("DISTANCIA").alias("VL_MIN_DIST"),
        avg("DISTANCIA").alias("VL_AVG_DIST"),
        count(when(col("PROPOSITO") == "Reunião", True)).alias("QT_CORR_REUNI"),
        count(
            when((col("PROPOSITO").isNotNull()) & (col("PROPOSITO") != "Reunião"), True)
        ).alias("QT_CORR_NAO_REUNI")
    )

## Execução

In [0]:
#Bronze - Ingestão
load_bronze_csv(path_csv)

#Silver - Transformação
df_bronze = spark.table("bronze.info_transportes")
df_bronze_to_silver = standardize_datetime_column(df_bronze, "DATA_INICIO")
df_bronze_to_silver = standardize_datetime_column(df_bronze_to_silver, "DATA_FIM")
df_bronze_to_silver = create_dt_ref_column(df_bronze_to_silver, "DATA_INICIO", "DT_REF")
write_delta(df_bronze_to_silver, "silver", "info_transportes_refined")

#Gold - Agregação
df_silver = spark.table("silver.info_transportes_refined")
df_gold = aggregate_to_gold(df_silver)
write_delta(df_gold, "gold", "info_corridas_do_dia")


display(df_gold)

DT_REF,QT_CORR,QT_CORR_NEG,QT_CORR_PESS,VL_MAX_DIST,VL_MIN_DIST,VL_AVG_DIST,QT_CORR_REUNI,QT_CORR_NAO_REUNI
2016-09-30,2,2,0,377,167,272.0,0,0
2016-11-25,2,2,0,111,103,107.0,2,0
2016-08-23,8,8,0,177,5,70.375,0,0
2016-12-19,11,11,0,102,7,45.0,1,5
2016-11-17,1,1,0,163,163,163.0,0,1
2016-02-01,3,3,0,233,39,155.33333333333334,0,3
2016-07-02,2,2,0,101,99,100.0,2,0
2016-11-16,2,2,0,31,23,27.0,0,2
2016-08-07,4,4,0,27,25,26.0,0,2
2016-07-14,3,2,1,1953,33,701.3333333333334,0,1


## Visualização

In [0]:
# Visualização simples de corridas por categoria
display(df_gold.select("DT_REF", "QT_CORR_NEG", "QT_CORR_PESS"))

DT_REF,QT_CORR_NEG,QT_CORR_PESS
2016-09-30,2,0
2016-11-25,2,0
2016-08-23,8,0
2016-12-19,11,0
2016-11-17,1,0
2016-02-01,3,0
2016-07-02,2,0
2016-11-16,2,0
2016-08-07,4,0
2016-07-14,2,1


In [0]:
# Visualização da distância média por dia
display(df_gold.select("DT_REF", "VL_AVG_DIST"))

DT_REF,VL_AVG_DIST
2016-09-30,272.0
2016-11-25,107.0
2016-08-23,70.375
2016-12-19,45.0
2016-11-17,163.0
2016-02-01,155.33333333333334
2016-07-02,100.0
2016-11-16,27.0
2016-08-07,26.0
2016-07-14,701.3333333333334


In [0]:
%sql
select * from gold.info_corridas_do_dia

DT_REF,QT_CORR,QT_CORR_NEG,QT_CORR_PESS,VL_MAX_DIST,VL_MIN_DIST,VL_AVG_DIST,QT_CORR_REUNI,QT_CORR_NAO_REUNI
2016-09-30,2,2,0,377,167,272.0,0,0
2016-11-25,2,2,0,111,103,107.0,2,0
2016-08-23,8,8,0,177,5,70.375,0,0
2016-12-19,11,11,0,102,7,45.0,1,5
2016-11-17,1,1,0,163,163,163.0,0,1
2016-02-01,3,3,0,233,39,155.33333333333334,0,3
2016-07-02,2,2,0,101,99,100.0,2,0
2016-11-16,2,2,0,31,23,27.0,0,2
2016-08-07,4,4,0,27,25,26.0,0,2
2016-07-14,3,2,1,1953,33,701.3333333333334,0,1
