# Notebook to ingest data from ONS

---

In [None]:
# CMP259 - Ingestão de dados da ONS (SE/CO)

import requests
import pandas as pd
from datetime import date, timedelta
from pathlib import Path

pd.set_option("display.max_columns", 100)

BASE_DIR = Path("data_ons")
BASE_DIR.mkdir(exist_ok=True)


## Funções Auxiliares
---

In [None]:
# Funções auxiliares

CKAN_BASE_URL = "https://dados.ons.org.br/api/3/action"
CARGA_API_BASE = "https://apicarga.ons.org.br/prd"
COD_AREACARGA_SECO = "SECO"  # Sudeste/Centro-Oeste, conforme dicionário de dados


def get_ckan_resources(dataset_id: str):
    """
    Busca metadados de um conjunto de dados no CKAN da ONS e retorna a lista de recursos.
    """
    resp = requests.get(f"{CKAN_BASE_URL}/package_show", params={"id": dataset_id})
    resp.raise_for_status()
    payload = resp.json()
    if not payload.get("success", False):
        raise RuntimeError(f"Erro CKAN para dataset {dataset_id}: {payload}")
    return payload["result"]["resources"]


def pick_parquet_resource_by_year(resources, year: int):
    """
    Escolhe recurso PARQUET cujo nome ou URL contenha o ano desejado.
    """
    year_str = str(year)
    candidates = [
        r
        for r in resources
        if r.get("format", "").upper() == "PARQUET"
        and (year_str in r.get("name", "") or year_str in r.get("url", ""))
    ]
    if not candidates:
        raise ValueError(f"Nenhum recurso PARQUET encontrado para ano {year}")
    # se houver mais de um, pega o primeiro
    return candidates[0]["url"]


def fetch_carga_api(endpoint: str,
                    dat_inicio: str,
                    dat_fim: str,
                    cod_areacarga: str = COD_AREACARGA_SECO) -> pd.DataFrame:
    """
    Chamada genérica para as APIs de carga (verificada ou programada).
    endpoint: 'cargaverificada' ou 'cargaprogramada'
    datas em formato YYYY-MM-DD
    """
    url = f"{CARGA_API_BASE}/{endpoint}"
    params = {
        "dat_inicio": dat_inicio,
        "dat_fim": dat_fim,
        "cod_areacarga": cod_areacarga,
    }
    r = requests.get(url, params=params)
    r.raise_for_status()
    data = r.json()

    # Estrutura flexível, pois o swagger pode mudar o wrapper
    if isinstance(data, dict):
        if "data" in data:
            rows = data["data"]
        elif "items" in data:
            rows = data["items"]
        else:
            # assume que a resposta já é uma lista de registros
            rows = data.get("result", data)
    else:
        rows = data

    return pd.json_normalize(rows)


def month_range(start: date, end: date):
    """
    Gera pares (inicio_mes, fim_mes) cobrindo o intervalo de forma contínua.
    """
    current = date(start.year, start.month, 1)
    while current <= end:
        if current.month == 12:
            next_month = date(current.year + 1, 1, 1)
        else:
            next_month = date(current.year, current.month + 1, 1)
        yield max(current, start), min(next_month - timedelta(days=1), end)
        current = next_month


## Ingestão de Carga Verificada via API
---

In [None]:
# Carga Verificada e Carga Programada (semi-horária, API ONS)

def ingest_carga_verificada(start: date, end: date) -> pd.DataFrame:
    if (BASE_DIR / f"carga_verificada_SECO_{start.year}_{end.year}.parquet").exists():
        return pd.read_parquet(BASE_DIR / f"carga_verificada_SECO_{start.year}_{end.year}.parquet")
    dfs = []
    for ini, fim in month_range(start, end):
        df = fetch_carga_api(
            endpoint="cargaverificada",
            dat_inicio=ini.isoformat(),
            dat_fim=fim.isoformat(),
        )
        dfs.append(df)
    full = pd.concat(dfs, ignore_index=True)
    full.to_parquet(BASE_DIR / f"carga_verificada_SECO_{start.year}_{end.year}.parquet", index=False)
    return full


def ingest_carga_programada(start: date, end: date) -> pd.DataFrame:
    if (BASE_DIR / f"carga_programada_SECO_{start.year}_{end.year}.parquet").exists():
        return pd.read_parquet(BASE_DIR / f"carga_programada_SECO_{start.year}_{end.year}.parquet")
    dfs = []
    for ini, fim in month_range(start, end):
        df = fetch_carga_api(
            endpoint="cargaprogramada",
            dat_inicio=ini.isoformat(),
            dat_fim=fim.isoformat(),
        )
        dfs.append(df)
    full = pd.concat(dfs, ignore_index=True)
    full.to_parquet(BASE_DIR / f"carga_programada_SECO_{start.year}_{end.year}.parquet", index=False)
    return full


# Exemplo de uso: 2023-01-01 até 2025-12-31
start_date = date(2023, 1, 1)
end_date = date(2025, 12, 31)

df_carga_verificada = ingest_carga_verificada(start_date, end_date)
df_carga_programada = ingest_carga_programada(start_date, end_date)

df_carga_verificada.head()

## Ingestão do Balanço de Energia nos Subsistemas
---

In [None]:
# Balanço de Energia nos Subsistemas (horário, Parquet)

BALANCO_S3_PREFIX = (
    "https://ons-aws-prod-opendata.s3.amazonaws.com/"
    "dataset/balanco_energia_subsistema_ho/"
)


def ingest_balanco_subsistemas(years, filtro_seco=True) -> pd.DataFrame:
    if (BASE_DIR / f"balanco_subsistemas_{years[0]}_{years[-1]}.parquet").exists():
        return pd.read_parquet(BASE_DIR / f"balanco_subsistemas_{years[0]}_{years[-1]}.parquet")
    
    dfs = []
    for y in years:
        url = f"{BALANCO_S3_PREFIX}BALANCO_ENERGIA_SUBSISTEMA_{y}.parquet"
        df_y = pd.read_parquet(url)
        dfs.append(df_y)

    df = pd.concat(dfs, ignore_index=True)

    # Identifica colunas que deveriam ser numéricas
    # A ONS geralmente usa prefixos val_ para valores
    numeric_cols = [c for c in df.columns if c.startswith("val_")]

    # Converte qualquer string do tipo "0E-8" ou texto para float
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # Filtra SE/CO
    possible_cols = [c for c in df.columns if "subsist" in c.lower() or "submerc" in c.lower()]
    if filtro_seco and possible_cols:
        col_subs = possible_cols[0]
        df = df[df[col_subs].astype(str).str.contains("SE", case=False, na=False)]

    df.to_parquet(BASE_DIR / f"balanco_subsistemas_{years[0]}_{years[-1]}.parquet", index=False)
    return df


years_balanco = list(range(2015, 2026))
df_balanco = ingest_balanco_subsistemas(years_balanco)
df_balanco.head()

## Ingestão do CMO Semi Horário
---

In [None]:
# CMO Semi Horário (semi-horário, Excel ou Parquet)

CMO_S3_PREFIX = (
    "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/cmo_tm/"
)


def ingest_cmo_semi_horario(years, filtro_seco=True) -> pd.DataFrame:
    dfs = []
    for y in years:
        url = f"{CMO_S3_PREFIX}CMO_SEMIHORARIO_{y}.xlsx"
        df_y = pd.read_excel(url)
        dfs.append(df_y)

    df = pd.concat(dfs, ignore_index=True)

    # Filtrar SE/CO. Em geral a coluna se chama algo como 'nom_submercado'
    possible_cols = [c for c in df.columns if "submerc" in c.lower() or "subsist" in c.lower()]
    if filtro_seco and possible_cols:
        col_subs = possible_cols[0]
        df = df[df[col_subs].str.contains("SE", case=False, na=False)]

    df.to_parquet(BASE_DIR / f"cmo_semi_horario_SECO_{years[0]}_{years[-1]}.parquet", index=False)
    return df


years_cmo = list(range(2020, 2026))
df_cmo = ingest_cmo_semi_horario(years_cmo)
df_cmo.head()


In [7]:
df_cmo

Unnamed: 0,id_subsistema,nom_subsistema,din_instante,val_cmo
2,SE,SUDESTE,2020-01-01 00:00:00,285.76
6,SE,SUDESTE,2020-01-01 00:30:00,285.74
10,SE,SUDESTE,2020-01-01 01:00:00,285.71
14,SE,SUDESTE,2020-01-01 01:30:00,285.67
18,SE,SUDESTE,2020-01-01 02:00:00,285.59
...,...,...,...,...
406446,SE,SUDESTE,2025-11-24 21:30:00,379.75
406450,SE,SUDESTE,2025-11-24 22:00:00,378.62
406454,SE,SUDESTE,2025-11-24 22:30:00,375.28
406458,SE,SUDESTE,2025-11-24 23:00:00,366.12
