## MVP Engenharia de Dados

**Nome:** Bianca Carvalho Lima

**Matrícula:** 4052025000297

- **Dataset Original (de 2007 a 2020):** https://www.kaggle.com/datasets/equeiroz/acidentes-rodovias-federais-brasil-jan07-a-jul19
- **Dataset Original (de 2023 a 2025):** https://www.kaggle.com/datasets/jairsouza/acidentes-rodovias-federais

**Repo GitHub:** https://github.com/biaacarvalhoo27/PUCRIO_MVP_eng_dados

## Imports

In [0]:
import unicodedata
import re
from functools import reduce
from pyspark.sql.functions import lit, regexp_replace, col, to_date, try_to_date
from pyspark.sql.types import IntegerType, DoubleType, DateType, StringType
from pyspark.sql import functions as F

## Consolidação dos arquivos
- Consolidação dos arquivos do schema _raw_ > "/Volumes/lakehouse/raw/volumes/"
- Output da consolidação no schema _bronze_: dados padronizados, com metadados e controle de ingestão

In [0]:
def consolidar_tudo():
    dfs = []
    for pasta in dbutils.fs.ls("/Volumes/lakehouse/raw/volumes/datatran"):
        if pasta.isDir():
            nome = pasta.name.rstrip("/")
            df = None
            formato = None
            try:
                try:
                    # Verifica se existe _delta_log para tentar ler como Delta
                    delta_log_path = f"{pasta.path}/_delta_log"
                    if dbutils.fs.exists(delta_log_path):
                        df = spark.read.format("delta").load(pasta.path)
                        formato = "Delta"
                    else:
                        raise Exception("Delta log não encontrado")
                except Exception:
                    df = spark.read.parquet(pasta.path)
                    formato = "Parquet"

                for coluna in df.columns:
                    df = df.withColumn(
                        coluna,
                        regexp_replace(col(coluna).cast(StringType()), r",", "."),
                    )
                # metadados
                df = df.withColumn("origem_arquivo", lit(nome))
                df = df.withColumn("formato_origem", lit(formato))
                df = df.withColumn("data_execucao", F.current_timestamp() - F.expr("INTERVAL 3 HOURS"))
                dfs.append(df)
            except Exception as e:
                print(f"Erro ao processar {nome}: {e}")

    if dfs:
        df_final = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), dfs)
        print(f"CONSOLIDAÇÃO CONCLUÍDA!")
        return df_final
    else:
        print("Nenhum dado foi processado!")
        return None

# Executar
df_final = consolidar_tudo()

CONSOLIDAÇÃO CONCLUÍDA!


## Limpeza e transformação dos dados do dataframe

## Tratamento de tipagem das colunas

In [0]:
def clean_data(df_final):
    """
    Função corrigida para limpeza dos dados do PRF
    """
    # 1. Primeiro, tratar os valores decimais como string
    int_cols = [
        "br", "pessoas", "mortos", "feridos_leves", 
        "feridos_graves", "ilesos", "ignorados", "feridos", "veiculos"
    ]
    
    for col_name in int_cols:
        # Remover ".0" dos valores e converter para inteiro
        df_final = df_final.withColumn(
            col_name,
            F.when(
                F.col(col_name).isNotNull(),
                F.regexp_replace(F.col(col_name).cast("string"), r"\.0$", "")
            ).otherwise(None)
        )
        # Agora converter para inteiro
        df_final = df_final.withColumn(
            col_name,
            F.col(col_name).cast(IntegerType())
        )
    
    # 2. Tratar a coluna 'km' - pode ter vírgulas
    df_final = df_final.withColumn(
        "km",
        F.when(
            F.col("km").isNotNull(),
            F.regexp_replace(F.col("km").cast("string"), ",", ".")
        ).otherwise(None)
    ).withColumn("km", F.col("km").cast(DoubleType()))
    
    # 3. Corrigir ID - remover .0
    df_final = df_final.withColumn(
        "id",
        F.regexp_replace(F.col("id").cast("string"), r"\.0$", "")
    )
    
    # 4. Corrigir dia_semana
    df_final = df_final.withColumn(
        "dia_semana",
        F.when(
            F.col("dia_semana").contains("-"),
            F.initcap(F.split(F.col("dia_semana"), "-")[0])
        ).otherwise(F.initcap(F.col("dia_semana")))
    )
    
    # 5. Criar timestamp combinado
    df_final = df_final.withColumn(
        "data_inversa_horario",
        F.to_timestamp(
            F.concat_ws(
                " ",
                F.col("data_inversa").cast("string"),
                F.concat(F.col("horario").cast("string"), ":00")
            ),
            "yyyy-MM-dd HH:mm:ss"
        )
    )
    
    return df_final

## Tratamento coluna "data_inversa" > tipagem date

In [0]:
df_final = df_final.withColumn(
    "data_ajustada",
    F.coalesce(
        F.expr("try_to_date(data_inversa, 'yyyy-MM-dd')"),
        F.expr("try_to_date(data_inversa, 'dd/MM/yyyy')"),
        F.expr("try_to_date(data_inversa, 'dd-MM-yyyy')")
    )
)

## Remoção espaços entre palavras

In [0]:
df_final = df_final.withColumn(
    "condicao_metereologica",
    F.trim(F.col("condicao_metereologica"))
).withColumn(
    "causa_acidente",
    F.trim(F.col("causa_acidente"))
)

## Tratamento e padronização dados da coluna condicao_metereologica

In [0]:
df_final = df_final.withColumn(
    "condicao_metereologica",
    F.when(
        F.col("condicao_metereologica").isin("Nevoeiro/neblina", "Nevoeiro/Neblina"),
        "Nevoeiro/Neblina"
    ).when(
        F.col("condicao_metereologica").isin("(null)", "Ignorado"),
        "Ignorada"
    ).when(
        F.col("condicao_metereologica").isin("Ceu Claro", "CÃ©u Claro"),
        "Céu Claro"
    ).otherwise(F.col("condicao_metereologica"))
)

## Remover duplicidade

In [0]:
df_final = df_final.dropDuplicates()

## Save da tabela em delta no schema bronze

In [0]:
df_final.write.option("inferSchema", False).mode("overwrite").option("encoding", "UTF-8").format("delta").saveAsTable(
    "lakehouse.bronze.datatran_consolidado"
)