In [0]:
%sql
--Padronizar um catalogo com o nome do dado
CREATE CATALOG IF NOT EXISTS bh;
CREATE SCHEMA  IF NOT EXISTS bh.mobilidade;

-- Criar um Volume para armazenamento dos dados. 
CREATE VOLUME IF NOT EXISTS bh.mobilidade.data_mobility_bh
COMMENT 'Mobilidade Urbana BH (Bronze/Silver/Gold)';


In [0]:
#biblioteca
import requests
from pyspark.sql.functions import current_timestamp, lit
import os


In [0]:
#Configuração dso volumes e apontando na variavel o local do csv. 
catalog = "bh"
schema  = "mobilidade"
volume  = "data_mobility_bh"

base_path   = f"/Volumes/{catalog}/{schema}/{volume}"
raw_dir     = f"{base_path}/bronze/raw"
parquet_dir = f"{base_path}/bronze/parquet"

file_name = "mco-09-2025.csv"
local_csv = f"{raw_dir}/{file_name}"

url = ("https://ckan.pbh.gov.br/dataset/7ae4d4b4-6b52-4042-b021-0935a1db3814/"
       "resource/123b7a8a-ceb1-4f8c-9ec6-9ce76cdf9aab/download/mco-09-2025.csv")

In [0]:
#Criação de pasta para armazenamento
os.makedirs(raw_dir, exist_ok=True)
os.makedirs(parquet_dir, exist_ok=True)

In [0]:
# realizar downlaod e verificar se obteve algum erro durante a request. 
resp = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=60)
resp.raise_for_status()

with open(local_csv, "wb") as f:
    f.write(resp.content)

In [0]:
# Leitura do csv com spark 
df_bronze = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(local_csv)
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("source_file", lit(file_name))
)

In [0]:
#Salvando o parquet na camada bronze. 
(df_bronze.write
    .mode("overwrite")
    .parquet(parquet_dir)
)
