<center>
    <img src="https://images2.imgbox.com/20/d8/CeRGGK5y_o.png"/>
</center>

---

<center>

# Data Warehouse com BigQuery - Iniciante

</center>

---

## Configuração de Bibliotecas

In [1]:
# Se precisar, descomente:
# %pip install -q google-cloud-bigquery google-cloud-storage requests

In [2]:
#import
import os
from datetime import datetime
import requests, pathlib, zipfile, io
from google.cloud import storage
from google.api_core.exceptions import NotFound
from google.cloud import bigquery

## Parâmetros do ETL

In [3]:
# ==== PROJETO/REGIÃO ====
PROJECT_ID   = "bigquery-iniciante-roxschool"
LOCATION     = "US"  # mantenha igual ao dataset

# ==== BUCKET E PASTA NO GCS ====
BUCKET       = "roxschool_bq_iniciante"
GCS_PREFIX   = f"banco_central_demo/ingest_dt={datetime.utcnow().date()}"  # partição lógica (opcional)

# ==== FONTE (RFB) ====
# Exemplo: um zip com CSVs da base de Cnaes ( Opção mais simples para a nossa demo do School ). Substitua pelo arquivo que você quer usar na sua demo.
SOURCE_URL   = "https://api.bcb.gov.br/dados/serie/bcdata.sgs.7384/dados?formato=csv"  # exemplo; troque se preferir

# ==== DESTINO NO BIGQUERY ====
DATASET_ID   = "roxschool_dados_abertos_gov_br"
TABLE_ID     = "raw_emplacamentos"    # será criado/atualizado por load job

# ==== CSV interno do zip (formato típico BCB) ====
CSV_DELIM    = ";"          # a RFB costuma usar ';' ou '|'. Ajuste conforme o arquivo.
CSV_ENCODING = "ISO-8859-1" # comum nos arquivos da BCB OU RFB
CSV_SKIP_HDR = 1            # geralmente sem header (0). Se tiver, use 1.

# ==== Esquema simplificado (ajuste conforme o dataset real que escolher) ====
from google.cloud import bigquery
SCHEMA = [
    bigquery.SchemaField("data", "STRING"),
    bigquery.SchemaField("vendas", "INTEGER")
]

## Download ( streaming ) do arquivo da Banco Central

In [4]:
TMP_DIR = pathlib.Path("/tmp/bcb").resolve()
TMP_DIR.mkdir(parents=True, exist_ok=True)

print(f"Baixando: {SOURCE_URL}")
resp = requests.get(SOURCE_URL, stream=True, timeout=120)
resp.raise_for_status()

content_type = resp.headers.get("content-type", "").lower()
data = resp.content if "zip" in SOURCE_URL.lower() or "zip" in content_type else resp.content

local_path = TMP_DIR / ("fonte.zip" if (SOURCE_URL.lower().endswith(".zip")) else "fonte")
with open(local_path, "wb") as f:
    f.write(data)

print(f"Salvo em: {local_path}  ({len(data):,} bytes)")


Baixando: https://api.bcb.gov.br/dados/serie/bcdata.sgs.7384/dados?formato=csv
Salvo em: /tmp/bcb/fonte  (9,722 bytes)


## Extração do ZIP para CSV

In [5]:
def pick_csv_from_zip(zip_path: pathlib.Path) -> pathlib.Path:
    with zipfile.ZipFile(zip_path, "r") as z:
        # pega o primeiro .csv/.txt (ou o maior), conforme existir
        members = [m for m in z.infolist() if m.filename.lower().endswith((".csv", ".txt",".cnaecsv"))]
        if not members:
            raise RuntimeError("Nenhum CSV/TXT encontrado dentro do ZIP.")
        # regra simples: pega o MAIOR arquivo (normalmente é o correto)
        members.sort(key=lambda m: m.file_size, reverse=True)
        chosen = members[0]
        out_path = TMP_DIR / pathlib.Path(chosen.filename).name
        with z.open(chosen, "r") as src, open(out_path, "wb") as dst:
            dst.write(src.read())
        return out_path

if local_path.suffix.lower() == ".zip":
    csv_local = pick_csv_from_zip(local_path)
else:
    csv_local = local_path

print(f"Arquivo CSV selecionado: {csv_local}  ({csv_local.stat().st_size:,} bytes)")

Arquivo CSV selecionado: /tmp/bcb/fonte  (9,722 bytes)


## Upload dos dados para o BUCKET no Google Cloud Storage

In [6]:
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(BUCKET)
gcs_name = f"{GCS_PREFIX}/{csv_local.name}"
blob = bucket.blob(gcs_name)
blob.upload_from_filename(str(csv_local), content_type="text/csv")

GCS_URI = f"gs://{BUCKET}/{gcs_name}"
print("Enviado para:", GCS_URI)

Enviado para: gs://roxschool_bq_iniciante/banco_central_demo/ingest_dt=2025-09-25/fonte


## Criar ou substituir dataset

In [7]:
bq = bigquery.Client(project=PROJECT_ID, location=LOCATION)

try:
    bq.get_dataset(f"{PROJECT_ID}.{DATASET_ID}")
    print("Dataset já existe.")
except NotFound:
    ds = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
    ds.location = LOCATION
    bq.create_dataset(ds)
    print("Dataset criado:", f"{PROJECT_ID}.{DATASET_ID}")


Dataset criado: bigquery-iniciante-roxschool.roxschool_dados_abertos_gov_br


## Load dos dados ( Criar ou Atualizar tabela )

In [8]:
table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    schema=SCHEMA,                 # use schema explícito (ou troque para autodetect=True)
    field_delimiter=CSV_DELIM,
    encoding=CSV_ENCODING,
    skip_leading_rows=CSV_SKIP_HDR,
    write_disposition="WRITE_TRUNCATE",  # sobrescreve; para append: WRITE_APPEND
    quote_character='"',
    allow_quoted_newlines=True,
    allow_jagged_rows=False,
    autodetect=False,              # para testes rápidos, você pode usar True
)

load_job = bq.load_table_from_uri(
    GCS_URI,
    table_ref,
    job_config=job_config,
    location=LOCATION,
)
print("Iniciando load job:", load_job.job_id)
load_job.result()  # espera terminar
table = bq.get_table(table_ref)
print(f"Carregado: {table_ref} | {table.num_rows:,} linhas | {table.num_bytes:,} bytes")


Iniciando load job: 3e02cb04-e359-44bc-86b1-d81464ac3edf
Carregado: bigquery-iniciante-roxschool.roxschool_dados_abertos_gov_br.raw_emplacamentos | 427 linhas | 8,540 bytes
