## 1- ⚙️ Setup Inicial — Diretórios de Trabalho


In [4]:
import os

BASE_DIR = "/tmp/pipeline_api"
RAW_DIR = os.path.join(BASE_DIR, "data", "raw")
os.makedirs(RAW_DIR, exist_ok=True)

print("RAW_DIR:", RAW_DIR)


RAW_DIR: /tmp/pipeline_api/data/raw


## 2- ⚙️ Configuração Inicial do Pipeline

In [None]:
# === Passo 2 — Configuração Inicial do Pipeline (27 capitais BR) ===
import os, unicodedata, re
from datetime import datetime, timezone
from getpass import getpass   # para entrada segura

# 🚨 Forma antiga (chumbada) — comentada para histórico
# OPENWEATHER_KEY = "-"   # <- conexão antiga (não usar em produção)

# ✅ Forma nova (segura) — pede a chave em tempo de execução
OPENWEATHER_KEY = getpass("Cole sua OPENWEATHER_KEY: ")
assert OPENWEATHER_KEY, "Informe a chave"


# 27 capitais brasileiras (com acentos corretos)
CAPITAIS_BR = [
    "Rio Branco","Maceió","Macapá","Manaus","Salvador","Fortaleza","Brasília",
    "Vitória","Goiânia","São Luís","Cuiabá","Campo Grande","Belo Horizonte",
    "Belém","João Pessoa","Curitiba","Recife","Teresina","Rio de Janeiro",
    "Natal","Porto Alegre","Porto Velho","Boa Vista","Florianópolis","São Paulo",
    "Aracaju","Palmas"
]

# função auxiliar para normalizar (tirar acentos e baixar letras)
def strip_accents(s: str) -> str:
    if s is None: return s
    s = unicodedata.normalize("NFKD", s).encode("ascii","ignore").decode("utf-8")
    return s

def city_key(s: str) -> str:
    return strip_accents(s).strip().lower()

# diretórios (usando a base do passo 1)
BASE_DIR = "/tmp/pipeline_api"
RAW_DIR  = os.path.join(BASE_DIR, "data", "raw")
PROC_DIR = os.path.join(BASE_DIR, "data", "processed")
REP_DIR  = os.path.join(BASE_DIR, "data", "reports")
for p in [RAW_DIR, PROC_DIR, REP_DIR]:
    os.makedirs(p, exist_ok=True)

print("Exemplo de normalização:", "Brasília ->", city_key("Brasília"))


Exemplo de normalização: Brasília -> brasilia


## 3- 🌤️ Extração — **OpenWeather (Current Weather)**

In [6]:
# === Passo 3 — Extração OpenWeather (27 capitais BR) ===
import time, json, requests

assert OPENWEATHER_KEY, "Defina OPENWEATHER_KEY"
assert CAPITAIS_BR, "Lista de capitais vazia"

ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")

ok, fail = 0, 0
for city in CAPITAIS_BR:
    url = "https://api.openweathermap.org/data/2.5/weather"
    params = {"q": f"{city},BR", "appid": OPENWEATHER_KEY}
    try:
        r = requests.get(url, params=params, timeout=20)
        r.raise_for_status()
        data = r.json()
        safe_city = re.sub(r"\s+", "", strip_accents(city))  # São Paulo -> SaoPaulo
        fp = os.path.join(RAW_DIR, f"weather_{safe_city}_{ts}.json")
        with open(fp, "w") as f:
            json.dump(data, f)
        print(f"[OK] {city}")
        ok += 1
    except Exception as e:
        print(f"[ERRO] {city}: {e}")
        fail += 1
    time.sleep(1)  # respeita limite da free tier (60/min)

print(f"\nResumo OpenWeather: {ok} OK | {fail} falhas | ts={ts}")

[OK] Rio Branco
[OK] Maceió
[OK] Macapá
[OK] Manaus
[OK] Salvador
[OK] Fortaleza
[OK] Brasília
[OK] Vitória
[OK] Goiânia
[OK] São Luís
[OK] Cuiabá
[OK] Campo Grande
[OK] Belo Horizonte
[OK] Belém
[OK] João Pessoa
[OK] Curitiba
[OK] Recife
[OK] Teresina
[OK] Rio de Janeiro
[OK] Natal
[OK] Porto Alegre
[OK] Porto Velho
[OK] Boa Vista
[OK] Florianópolis
[OK] São Paulo
[OK] Aracaju
[OK] Palmas

Resumo OpenWeather: 27 OK | 0 falhas | ts=20250828_022901


## 4- 🧪 Transformação — **OpenWeather** → Tabela Curada + **Parquet particionado**

In [7]:
# === Passo 4 — Transformação Weather ===
import glob, json
import pandas as pd
from pandas import json_normalize
import pyarrow as pa
import pyarrow.dataset as ds
import shutil

# localizar arquivos do último ts
weather_files = sorted(glob.glob(os.path.join(RAW_DIR, f"weather_*_{ts}.json")))
assert weather_files, "Nenhum weather_* encontrado para este ts."

rows = []
for fp in weather_files:
    with open(fp, "r") as f:
        rows.append(json.load(f))
wdf = json_normalize(rows)

# 5 transformações mínimas
wdf = wdf.dropna(subset=["dt","main.temp","main.humidity","name"])
wdf["timestamp"] = pd.to_datetime(wdf["dt"], unit="s", utc=True)
wdf["temperature_c"] = wdf["main.temp"] - 273.15
wdf["humidity"] = wdf["main.humidity"].astype("Int64")
wdf["city"] = wdf["name"].astype(str).str.strip()
wdf["city_key"] = wdf["city"].apply(city_key)

def hum_cat(x):
    if pd.isna(x): return None
    x = int(x)
    if x < 40: return "Low"
    if x < 70: return "Moderate"
    return "High"
wdf["humidity_category"] = wdf["humidity"].map(hum_cat)

wdf["timestamp_year"] = wdf["timestamp"].dt.year
wdf["timestamp_month"] = wdf["timestamp"].dt.month

weather_curated = wdf[[
    "city","city_key","timestamp","temperature_c","humidity","humidity_category",
    "timestamp_year","timestamp_month"
]]

display(weather_curated.head())

OUT_WEATHER = os.path.join(PROC_DIR, "weather_only.parquet")
if os.path.exists(OUT_WEATHER): shutil.rmtree(OUT_WEATHER)
os.makedirs(OUT_WEATHER, exist_ok=True)

pa_tbl = pa.Table.from_pandas(weather_curated, preserve_index=False)
ds.write_dataset(pa_tbl, base_dir=OUT_WEATHER, format="parquet",
                 partitioning=["timestamp_year","timestamp_month"])

print("Weather parquet salvo em:", OUT_WEATHER)


Unnamed: 0,city,city_key,timestamp,temperature_c,humidity,humidity_category,timestamp_year,timestamp_month
0,Aracaju,aracaju,2025-08-28 02:25:09+00:00,23.97,88,High,2025,8
1,Belém,belem,2025-08-28 02:27:02+00:00,26.02,89,High,2025,8
2,Belo Horizonte,belo horizonte,2025-08-28 02:29:07+00:00,20.24,58,Moderate,2025,8
3,Boa Vista,boa vista,2025-08-28 02:25:11+00:00,27.99,74,High,2025,8
4,Brasília,brasilia,2025-08-28 02:28:24+00:00,22.51,38,Low,2025,8


Weather parquet salvo em: /tmp/pipeline_api/data/processed/weather_only.parquet


## 5- ✅ **Quality Check — `weather_only.parquet`**

In [8]:
# === Passo 5 — Quality Weather Only ===
import json

weather_ds = ds.dataset(OUT_WEATHER, format="parquet")
check_df = weather_ds.to_table().to_pandas()

def null_ratios(df):
    return {c: float(df[c].isna().mean()) for c in df.columns}

q_report = {
    "generated_at_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"),
    "row_count": int(len(check_df)),
    "null_ratios": null_ratios(check_df),
    "temperature_c_range": {
        "min": float(check_df["temperature_c"].min()),
        "max": float(check_df["temperature_c"].max())
    },
    "humidity_range": {
        "min": int(check_df["humidity"].min()),
        "max": int(check_df["humidity"].max())
    },
}

rep_fp = os.path.join(REP_DIR, f"quality_weather_{ts}.json")
with open(rep_fp, "w", encoding="utf-8") as f:
    json.dump(q_report, f, indent=2, ensure_ascii=False)

print("Quality salvo em:", rep_fp)
display(check_df.head())


Quality salvo em: /tmp/pipeline_api/data/reports/quality_weather_20250828_022901.json


Unnamed: 0,city,city_key,timestamp,temperature_c,humidity,humidity_category
0,Aracaju,aracaju,2025-08-28 02:25:09+00:00,23.97,88,High
1,Belém,belem,2025-08-28 02:27:02+00:00,26.02,89,High
2,Belo Horizonte,belo horizonte,2025-08-28 02:29:07+00:00,20.24,58,Moderate
3,Boa Vista,boa vista,2025-08-28 02:25:11+00:00,27.99,74,High
4,Brasília,brasilia,2025-08-28 02:28:24+00:00,22.51,38,Low


## 6- 🌎 Extração & Transformação — **REST Countries**

In [9]:
# === Passo 6 — Extração + Transformação REST Countries (fix p/ 27 capitais) ===
import requests
import pandas as pd
from pandas import json_normalize

# 1) Buscar REST Countries
try:
    r = requests.get("https://restcountries.com/v3.1/name/brazil?fullText=true", timeout=30)
    r.raise_for_status()
    countries_raw = r.json()
except Exception as e:
    print("Erro na API REST Countries, usando fallback mínimo:", e)
    countries_raw = [{"name": {"common": "Brazil"}, "population": 203_000_000, "cca2": "BR"}]

cdf = json_normalize(countries_raw)

# 2) Isolar Brasil
# tenta pelo código BR; se não tiver, usa a primeira linha
if "cca2" in cdf.columns and (cdf["cca2"] == "BR").any():
    br = cdf[cdf["cca2"] == "BR"].iloc[0]
else:
    br = cdf.iloc[0]

country_name = br.get("name.common", "Brazil")
country_pop  = int(br.get("population", 203_000_000))

# 3) Expandir para TODAS as 27 capitais
countries_curated = pd.DataFrame({
    "city": CAPITAIS_BR
})
countries_curated["city_key"]   = countries_curated["city"].apply(city_key)
countries_curated["country"]    = country_name
countries_curated["population"] = country_pop

print("Countries curated (27 linhas esperadas):", len(countries_curated))
display(countries_curated.head(10))

Countries curated (27 linhas esperadas): 27


Unnamed: 0,city,city_key,country,population
0,Rio Branco,rio branco,Brazil,212559409
1,Maceió,maceio,Brazil,212559409
2,Macapá,macapa,Brazil,212559409
3,Manaus,manaus,Brazil,212559409
4,Salvador,salvador,Brazil,212559409
5,Fortaleza,fortaleza,Brazil,212559409
6,Brasília,brasilia,Brazil,212559409
7,Vitória,vitoria,Brazil,212559409
8,Goiânia,goiania,Brazil,212559409
9,São Luís,sao luis,Brazil,212559409


## 7- 🔗 JOIN — **Weather × Countries** + **Parquet Final** + **Quality Check**

In [10]:
# === Passo 7 — JOIN Weather x Countries ===
weather_df = weather_ds.to_table().to_pandas()
weather_df["city_key"] = weather_df["city"].apply(city_key)

final_pd = weather_df.merge(countries_curated, on="city_key", how="inner",
                            suffixes=("_weather","_country"))
final_pd["city"] = final_pd["city_weather"]

final_pd["timestamp_year"] = final_pd["timestamp"].dt.year
final_pd["timestamp_month"] = final_pd["timestamp"].dt.month

final_pd = final_pd[[
    "city","timestamp","temperature_c","humidity","humidity_category",
    "country","population","timestamp_year","timestamp_month"
]]

display(final_pd.head(10))

OUT_FINAL = os.path.join(PROC_DIR, "weather_countries.parquet")
if os.path.exists(OUT_FINAL): shutil.rmtree(OUT_FINAL)
os.makedirs(OUT_FINAL, exist_ok=True)

pa_tbl_final = pa.Table.from_pandas(final_pd, preserve_index=False)
ds.write_dataset(pa_tbl_final, base_dir=OUT_FINAL, format="parquet",
                 partitioning=["timestamp_year","timestamp_month"])

print("Parquet FINAL salvo em:", OUT_FINAL)

q_final = {
    "generated_at_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"),
    "row_count": int(len(final_pd)),
    "null_ratios": null_ratios(final_pd),
    "temperature_c_range": {"min": float(final_pd["temperature_c"].min()), "max": float(final_pd["temperature_c"].max())},
    "humidity_range": {"min": int(final_pd["humidity"].min()), "max": int(final_pd["humidity"].max())},
    "population_range": {"min": int(final_pd["population"].min()), "max": int(final_pd["population"].max())},
}

rep_fp_final = os.path.join(REP_DIR, f"quality_weather_countries_{ts}.json")
with open(rep_fp_final, "w", encoding="utf-8") as f:
    json.dump(q_final, f, indent=2, ensure_ascii=False)

print("Quality FINAL salvo em:", rep_fp_final)

Unnamed: 0,city,timestamp,temperature_c,humidity,humidity_category,country,population,timestamp_year,timestamp_month
0,Aracaju,2025-08-28 02:25:09+00:00,23.97,88,High,Brazil,212559409,2025,8
1,Belém,2025-08-28 02:27:02+00:00,26.02,89,High,Brazil,212559409,2025,8
2,Belo Horizonte,2025-08-28 02:29:07+00:00,20.24,58,Moderate,Brazil,212559409,2025,8
3,Boa Vista,2025-08-28 02:25:11+00:00,27.99,74,High,Brazil,212559409,2025,8
4,Brasília,2025-08-28 02:28:24+00:00,22.51,38,Low,Brazil,212559409,2025,8
5,Campo Grande,2025-08-28 02:24:44+00:00,18.75,59,Moderate,Brazil,212559409,2025,8
6,Cuiabá,2025-08-28 02:26:20+00:00,23.96,57,Moderate,Brazil,212559409,2025,8
7,Curitiba,2025-08-28 02:29:26+00:00,13.0,95,High,Brazil,212559409,2025,8
8,Florianópolis,2025-08-28 02:27:16+00:00,16.33,90,High,Brazil,212559409,2025,8
9,Fortaleza,2025-08-28 02:27:18+00:00,26.07,78,High,Brazil,212559409,2025,8


Parquet FINAL salvo em: /tmp/pipeline_api/data/processed/weather_countries.parquet
Quality FINAL salvo em: /tmp/pipeline_api/data/reports/quality_weather_countries_20250828_022901.json


## 8- 📊 Visualizações — **Plotly a partir do Dataset Final**

In [11]:
# === Passo 8 — Visualizações ===
import os
import pyarrow.dataset as ds
import pandas as pd
import plotly.express as px

# garante o caminho do parquet final
try:
    OUT_FINAL
except NameError:
    OUT_FINAL = os.path.join(PROC_DIR, "weather_countries.parquet")

# carrega o dataset final
dataset_final = ds.dataset(OUT_FINAL, format="parquet")
viz_df = dataset_final.to_table().to_pandas()

# (opcional) remove linhas 100% duplicadas, caso reexecute o join
viz_df = viz_df.drop_duplicates()

# --- Gráfico 1 (novo): Temperatura × Umidade (colorindo por região) ---
# --- Gráfico 1 (ajustado com rótulo da cidade) ---
fig1 = px.scatter(
    viz_df,
    x="temperature_c",
    y="humidity",
    color="city",                
    hover_name="city",
    hover_data=["temperature_c","humidity","humidity_category","timestamp"],
    text="city",  # <<< adiciona rótulo no ponto
    title="Temperatura (°C) × Umidade (%) — Capitais BR"
)
fig1.update_traces(textposition="top center")  # posição do texto em relação ao ponto
fig1.update_xaxes(title="Temperatura (°C)")
fig1.update_yaxes(title="Umidade (%)", range=[0, 100])

# faixa de conforto 30–60%
fig1.add_hrect(
    y0=30, y1=60,
    line_width=0, fillcolor="LightGreen", opacity=0.2,
    annotation_text="Faixa de conforto (30–60%)", annotation_position="top left"
)

fig1.show()


# --- Gráfico 2: Barras - Temperatura atual por capital (com escala de cores) ---
fig2 = px.bar(
    viz_df.sort_values("temperature_c", ascending=False),
    x="city",
    y="temperature_c",
    color="temperature_c",
    color_continuous_scale="RdYlBu",   # azul = frio | vermelho = quente
    text="temperature_c",
    title="Temperatura atual por capital (com valores)"
)
fig2.update_traces(texttemplate="%{text:.1f}°C", textposition="outside")
fig2.update_layout(xaxis_tickangle=-45, coloraxis_colorbar=dict(title="Temperatura (°C)"))
fig2.show()

# salvar em HTML
os.makedirs(REP_DIR, exist_ok=True)
fig1.write_html(os.path.join(REP_DIR, "scatter_temp_humidity.html"))
fig2.write_html(os.path.join(REP_DIR, "bar_temperature_by_city.html"))
print("Gráficos salvos em:", REP_DIR)

Gráficos salvos em: /tmp/pipeline_api/data/reports


## 9- 📂 Estrutura Final — **Raw / Processed / Reports**


In [12]:
# === Passo 9 — Estrutura Final ===
for root, dirs, files in os.walk(BASE_DIR):
    level = root.replace(BASE_DIR, "").count(os.sep)
    indent = " " * (2 * level)
    print(f"{indent}{os.path.basename(root)}/")
    subindent = " " * (2 * (level + 1))
    for f in files:
        print(f"{subindent}{f}")

pipeline_api/
  data/
    processed/
      weather_countries.parquet/
        2025/
          8/
            part-0.parquet
      weather_only.parquet/
        2025/
          8/
            part-0.parquet
    raw/
      weather_PortoVelho_20250828_022901.json
      weather_Goiania_20250828_022901.json
      weather_SaoPaulo_20250828_022901.json
      weather_PortoAlegre_20250828_022901.json
      weather_JoaoPessoa_20250828_022901.json
      weather_RiodeJaneiro_20250828_022901.json
      weather_CampoGrande_20250828_022901.json
      weather_Macapa_20250828_022901.json
      weather_Natal_20250828_022901.json
      weather_Maceio_20250828_022901.json
      weather_Recife_20250828_022901.json
      weather_Teresina_20250828_022901.json
      weather_SaoLuis_20250828_022901.json
      weather_Palmas_20250828_022901.json
      weather_Fortaleza_20250828_022901.json
      weather_Brasilia_20250828_022901.json
      weather_Curitiba_20250828_022901.json
      weather_Aracaju_20250828_0229

## 10- 📦 Empacotamento Final — **ZIP do Projeto**

In [13]:
import shutil, os
ZIP_TARGET = "/tmp/pipeline_output"
BASE_DIR = "/tmp/pipeline_api"  # raiz onde está data/raw, processed, reports
shutil.make_archive(ZIP_TARGET, "zip", BASE_DIR)
print("ZIP gerado em:", ZIP_TARGET + ".zip")

ZIP gerado em: /tmp/pipeline_output.zip
