#  Camada Bronze – Ingestão de Dados

##  Documentação

**Objetivo:**  
Realizar a ingestão bruta dos arquivos `yellow_tripdata_2025-MM.parquet` diretamente de uma URL pública, com lógica de retry e salvamento no volume bronze.

**Entradas:**  
- URL pública com os arquivos de viagens mensais em formato Parquet.

**Saídas:**  
- Arquivos salvos na camada `bronze` (`/Volumes/lakehouse/bronze/filestore/`).

**Transformações aplicadas:**  
- Nenhuma transformação nos dados (cópia bruta).
- Retry automático caso falhe por erro HTTP.
- Skip do mês caso o arquivo não seja encontrado (erro 403).

##  Decisões Tomadas

- Adotado o volume Delta para garantir estruturação futura.
- Retry configurado com 5 tentativas e 5s de espera.
- Controle de erro simples para permitir automação no pipeline.

In [0]:
import time
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType


In [0]:


# CONFIGURAÇÕES
url_base = "https://d37ci6vzurychx.cloudfront.net/trip-data/"
bronze_root = "/Volumes/lakehouse/bronze/filestore/"

#exponential backoff - usados para lidar com falhas temporárias no download (resiliência).
max_retries = 5
retry_delay = 5  # segundos

# >> Entrada configurável
year_to_ingest = 2023
months_to_ingest = 4




In [0]:
# >>> Validações
ano_atual = datetime.now().year

if not (1 <= months_to_ingest <= 12):
    raise ValueError("Mês inválido! Informe um número entre 1 e 12.")
if year_to_ingest > ano_atual:
    raise ValueError(f"Ano inválido! {year_to_ingest} é maior que o ano atual ({ano_atual})")

# >>> Caminho do ano
bronze_path = f"{bronze_root}{year_to_ingest}/"
dbutils.fs.mkdirs(bronze_path)  # cria pasta se não existir

In [0]:

# >>> Download dos arquivos parquet mês a mês
# Implementa lógica de retries com time.sleep para aguardar antes de tentar novamente.
# O try/except externo evita que o processo pare caso um mês falhe — o loop continua para os demais.

for month in range(1, months_to_ingest + 1):
    try:
        month_str = f"{month:02d}"
        file_date = f"yellow_tripdata_{year_to_ingest}-{month_str}.parquet"
        url = url_base + file_date
        retries = 0

        while retries < max_retries:
            try:
                dbutils.fs.cp(url, bronze_path) #dbutils.fs.cp para copiar o arquivo .parquet da URL para a pasta bronze.
                break
            except Exception as e:
                if "403" in str(e): # Se erro 403 for detectado (arquivo não disponível), ele interrompe a tentativa daquele mês.
                    break
                retries += 1
                if retries >= max_retries:
                    raise e
                time.sleep(retry_delay)

    except Exception as e:
        pass

In [0]:
# >>> Leitura e gravação na tabela Delta
    # - dbutils.fs.ls() para listar os arquivos no diretório da camada Bronze.
    # - try/except permite capturar problemas como caminho inválido, falta de permissões, etc.

try:
    arquivos = dbutils.fs.ls(bronze_path)
    if len(arquivos) == 0:
        raise ValueError(f"Nenhum arquivo encontrado em: {bronze_path}")
except Exception as e:
    raise

# >>> Leitura dos arquivos individualmente e união dos DataFrames
    # - Verifica se o arquivo é do tipo .parquet.
    # - Lê o arquivo com spark.read.parquet().  

dfs = []

for arquivo in arquivos:
    if arquivo.path.endswith(".parquet"):
        try:
            df_temp = spark.read.parquet(arquivo.path)

            # Forçar as colunas a terem o mesmo tipo (DoubleType) com .cast()
            colunas_double = [
                "fare_amount", "extra", "mta_tax", "tip_amount",
                "tolls_amount", "improvement_surcharge",
                "total_amount", "congestion_surcharge",
                "trip_distance"
            ]

            for coluna in colunas_double:
                if coluna in df_temp.columns:
                    df_temp = df_temp.withColumn(coluna, df_temp[coluna].cast(DoubleType()))

            dfs.append(df_temp)

        except Exception as e:
            pass


if not dfs:
    raise ValueError("Nenhum DataFrame carregado com sucesso.")

# União dos DataFrames
# -Se nenhum DataFrame foi carregado com sucesso, gera um erro.
#  -Caso contrário, une todos os DataFrames usando unionByName():
df_final = dfs[0]
for df_temp in dfs[1:]:
    df_final = df_final.unionByName(df_temp, allowMissingColumns=True)

In [0]:

# Adiciona coluna com timestamp
df_final = df_final.withColumn("date_processing", F.current_timestamp())

# Grava na tabela Delta
df_final.write.format("delta").mode("append").saveAsTable("lakehouse.bronze.yellow_trip")

