# Crypto Data Pipeline (CoinGecko API v3 → BigQuery)

Este notebook demonstra um pipeline simples e **reprodutível** para:

1. Coletar **snapshot** de mercado via CoinGecko (`/coins/markets`) e persistir em uma tabela **RAW** (payload JSON) no BigQuery  
2. Derivar o **Top N por Market Cap** a partir do último snapshot  
3. Coletar **histórico diário de preço** (`/coins/{id}/market_chart/range`) para o Top N e persistir em uma tabela FACT


## 0) Setup (dependências + variáveis de ambiente)


In [1]:
# Se estiver no Colab, descomente:
# !pip install -q google-cloud-bigquery requests python-dotenv

import os
import time
import json
import logging
from datetime import datetime, timezone, timedelta

import requests
import pandas as pd
from google.cloud import bigquery


# Configure no seu ambiente/Colab:
#   export COINGECKO_API_KEY="..."          # opcional (Pro)
#   export COINGECKO_IS_PRO="0"             # "1" se for PRO, "0" se for Public API
#   export GCP_PROJECT_ID="..."
#   export BQ_DATASET_ID="crypto_pipeline"
#
COINGECKO_API_KEY = os.getenv("COINGECKO_API_KEY", "")
COINGECKO_IS_PRO = os.getenv("COINGECKO_IS_PRO", "0") == "1"

PROJECT_ID = os.getenv("GCP_PROJECT_ID", "crypto-data-pipeline-488018")
DATASET_ID = os.getenv("BQ_DATASET_ID", "crypto_pipeline")

BASE_URL = "https://pro-api.coingecko.com/api/v3" if COINGECKO_IS_PRO else "https://api.coingecko.com/api/v3"

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("crypto-pipeline")

# --- Autenticação BigQuery ---
# Colab: abre prompt de login
try:
    from google.colab import auth
    auth.authenticate_user()
    print("Colab auth OK")
except Exception:

    print("Não é Colab. Garanta ADC (gcloud) ou GOOGLE_APPLICATION_CREDENTIALS.")

client = bigquery.Client(project=PROJECT_ID)



Colab auth OK


## 1) Funções utilitárias (HTTP com retry e parse)


In [4]:
# Define os headers padrão da requisição HTTP
# Aqui estamos dizendo que esperamos resposta em formato JSON
HEADERS = {"accept": "application/json"}

# Caso esteja usando plano PRO da CoinGecko
# e exista uma API Key válida, adicionamos ela no header
# Isso permite autenticação e maior limite de requisições
if COINGECKO_IS_PRO and COINGECKO_API_KEY:
    HEADERS["x-cg-pro-api-key"] = COINGECKO_API_KEY


# Status HTTP que merecem retry automático
# 429 = rate limit
# 500+ = erros temporários do servidor
RETRY_STATUS = {429, 500, 502, 503, 504}

# Status que indicam erro definitivo de autenticação/permissão
# 401 = não autenticado
# 403 = proibido (plano/chave inválida)
FORBIDDEN_STATUS = {401, 403}


def http_get_json(
    url: str,
    params: dict | None = None,
    timeout: int = 30,
    max_retries: int = 6
):
    """
    Executa uma requisição HTTP GET e retorna o JSON da resposta.

    Possui:
    - Retry exponencial automático para erros temporários (ex: 429)
    - Tratamento específico para erros de autenticação (401/403)
    - Timeout configurável
    """

    # Garante que params seja um dicionário (evita None)
    params = params or {}

    # Variável para armazenar o último erro (caso todas tentativas falhem)
    last_err = None

    # Loop de tentativas (retry)
    for attempt in range(max_retries):

        # Executa a requisição HTTP GET
        resp = requests.get(
            url,
            headers=HEADERS,
            params=params,
            timeout=timeout
        )

        # Se o status estiver na lista de erros temporários
        # aplicamos retry com backoff exponencial
        if resp.status_code in RETRY_STATUS:

            # Backoff exponencial:
            # tentativa 0 → espera 1.2s
            # tentativa 1 → espera 2.2s
            # tentativa 2 → espera 4.2s
            # etc.
            sleep_s = (2 ** attempt) + 0.2

            # Loga o erro para rastreabilidade
            logger.warning(
                "HTTP %s em %s | retry em %.1fs | params=%s",
                resp.status_code,
                url,
                sleep_s,
                params
            )

            # Aguarda antes da próxima tentativa
            time.sleep(sleep_s)

            # Guarda parte do body da resposta para debug final
            last_err = resp.text[:500]

            continue  # vai para a próxima tentativa

        # Se for erro de autenticação/permissão
        # falha imediatamente (não faz sentido tentar de novo)
        if resp.status_code in FORBIDDEN_STATUS:
            raise RuntimeError(
                f"Erro de autenticação/permissão ({resp.status_code}). "
                f"Verifique COINGECKO_API_KEY/Plano. "
                f"Body={resp.text[:500]}"
            )

        # Para qualquer outro erro HTTP não tratado acima,
        # essa linha dispara exceção automaticamente
        resp.raise_for_status()

        # Se chegou aqui, a resposta foi bem-sucedida (200 OK)
        # Retornamos o JSON já convertido para dict
        return resp.json()

    # Se todas as tentativas falharam,
    # levantamos erro com a última mensagem capturada
    raise RuntimeError(
        f"Falha após {max_retries} tentativas. "
        f"Último erro: {last_err}"
    )

## 2) Criar tabela RAW (snapshot JSON)


In [6]:
RAW_TABLE_ID = f"{PROJECT_ID}.{DATASET_ID}.raw_tb"

schema_raw = [
    bigquery.SchemaField("ingestion_timestamp", "TIMESTAMP", mode="REQUIRED",
                         description="Timestamp em que o snapshot foi coletado da API."),
    bigquery.SchemaField("source", "STRING", mode="REQUIRED",
                         description="Identifica a origem do dado."),
    bigquery.SchemaField("payload", "JSON", mode="REQUIRED",
                         description="Payload bruto retornado pela API CoinGecko."),
]

table = bigquery.Table(RAW_TABLE_ID, schema=schema_raw)
table.description = (
    "Tabela RAW: snapshots JSON da CoinGecko. Mantém payload completo para rastreabilidade e reprocessamento."
)

table = client.create_table(table, exists_ok=True)
print("RAW pronta:", RAW_TABLE_ID)


RAW pronta: crypto-data-pipeline-488018.crypto_pipeline.raw_tb


## 3) Coletar snapshot completo de assets com paginação e gravar em RAW


In [8]:
import time
import pandas as pd

def fetch_markets_snapshot(
    vs_currency: str = "usd",
    per_page: int = 250,
    max_pages: int = 3,            # Public API: recomendo 2 ou 3 páginas p/ evitar 429
    min_interval_s: float = 15.0,  # Public API: ~15s costuma ser mais estável
) -> list[dict]:
    """
    Busca um *snapshot* de mercado via endpoint:
      GET /coins/markets

    Objetivo:
    - Obter uma lista "achatada" (list[dict]) com as principais moedas ordenadas por market cap
    - Paginar resultados (page=1..max_pages)
    - Controlar taxa de requisição por *throttle* (tempo mínimo entre chamadas)
    - Reutilizar a função http_get_json (que já tem retry/backoff)

    Retorno:
      list[dict] -> cada dict representa um ativo (id, symbol, name, price, market cap, etc.)

    Observações de engenharia:
    - O throttle aqui reduz a chance de 429 (rate limit), mesmo com retry.
    - Se o endpoint retornar menos que `per_page`, assumimos que não há mais páginas.
    """

    # Monta a URL completa do endpoint
    url = f"{BASE_URL}/coins/markets"

    # Lista acumuladora com todos os registros coletados (todas páginas)
    all_rows: list[dict] = []

    # Timestamp da última chamada, para controlar intervalo mínimo entre requests
    last_call_ts: float | None = None

    # Loop de paginação: page = 1 até max_pages
    for page in range(1, max_pages + 1):

        # Parâmetros padrão do endpoint /coins/markets
        params = {
            "vs_currency": vs_currency,           # moeda de referência (ex: usd, brl)
            "order": "market_cap_desc",           # ordena por market cap desc
            "per_page": per_page,                 # quantidade de itens por página
            "page": page,                         # página atual
            "sparkline": "false",                 # evita payload maior
            "price_change_percentage": "24h",     # inclui variação 24h (se disponível)
        }

        # --------------------------
        # Throttle simples:
        # garante que exista pelo menos `min_interval_s` segundos entre calls.
        # Isso é útil mesmo com retry/backoff, porque previne bater no rate limit.
        # --------------------------
        if last_call_ts is not None:
            elapsed = time.time() - last_call_ts  # tempo desde a última chamada
            if elapsed < min_interval_s:
                time.sleep(min_interval_s - elapsed)  # espera apenas o restante necessário

        # Faz a chamada HTTP com função utilitária:
        # - headers configurados (PRO ou não)
        # - retry/backoff para 429 e 5xx
        batch = http_get_json(url, params=params)

        # Marca o horário em que a request foi feita (para controlar próxima espera)
        last_call_ts = time.time()

        # Validação defensiva:
        # esse endpoint deve retornar uma LISTA de dicts (JSON array).
        if not isinstance(batch, list):
            raise ValueError(
                f"Esperava lista (JSON array) no /coins/markets, mas veio: {type(batch)} | body={str(batch)[:200]}"
            )

        # Acumula os dados coletados nessa página
        all_rows.extend(batch)

        # Log para rastreabilidade: quantos registros vieram e quantos já temos no total
        logger.info(
            "Página %s: %s registros (acumulado=%s)",
            page, len(batch), len(all_rows)
        )

        # Heurística de término:
        # se veio menos do que per_page, provavelmente não existe próxima página
        if len(batch) < per_page:
            break

    # Retorna o snapshot completo (todas páginas)
    return all_rows


# --------------------------
# Exemplo de uso (mantendo sua lógica)
# --------------------------

# Busca snapshot (padrão: top ativos por market cap, paginado)
assets = fetch_markets_snapshot(
    per_page=250,
    max_pages=3,
    min_interval_s=15.0
)

# Prints simples para prova de execução
print("Total assets (snapshot):", len(assets))
print("Primeiro id:", assets[0]["id"] if assets else None)
print("Último id:", assets[-1]["id"] if assets else None)

# --------------------------
# Converte para DataFrame (sem mudar retorno da função)
# --------------------------

# Achata chaves aninhadas (se existirem) usando underscore
assets_df = pd.json_normalize(assets, sep="_")
print("DF shape:", assets_df.shape)

Total assets (snapshot): 750
Primeiro id: bitcoin
Último id: electronic-usd
DF shape: (750, 30)


In [9]:
#Verificando a tabela para entender a estrutura e disponibilização dos dados

df_assets = pd.DataFrame(assets)
df_assets.head()

Unnamed: 0,id,symbol,name,image,current_price,market_cap,market_cap_rank,fully_diluted_valuation,total_volume,high_24h,...,max_supply,ath,ath_change_percentage,ath_date,atl,atl_change_percentage,atl_date,roi,last_updated,price_change_percentage_24h_in_currency
0,bitcoin,btc,Bitcoin,https://coin-images.coingecko.com/coins/images...,68025.0,1360813818574,1,1360814000000.0,58020410000.0,69487.0,...,21000000.0,126080.0,-46.04631,2025-10-06T18:57:42.558Z,67.81,100218.2,2013-07-06T00:00:00.000Z,,2026-02-26T13:54:52.691Z,2.154992
1,ethereum,eth,Ethereum,https://coin-images.coingecko.com/coins/images...,2067.98,249708952898,2,249709000000.0,30711190000.0,2121.02,...,,4946.05,-58.18926,2025-08-24T19:21:03.333Z,0.432979,477516.7,2015-10-20T00:00:00.000Z,"{'times': 39.63137453277172, 'currency': 'btc'...",2026-02-26T13:54:52.021Z,4.792752
2,tether,usdt,Tether,https://coin-images.coingecko.com/coins/images...,1.0,183638675363,3,189106000000.0,96694750000.0,1.0,...,,1.32,-24.41117,2018-07-24T00:00:00.000Z,0.572521,74.6857,2015-03-02T00:00:00.000Z,,2026-02-26T13:54:46.699Z,0.005796
3,ripple,xrp,XRP,https://coin-images.coingecko.com/coins/images...,1.44,88027026160,4,144242000000.0,4062655000.0,1.48,...,100000000000.0,3.65,-60.43704,2025-07-18T03:40:53.808Z,0.002686,53604.55,2014-05-22T00:00:00.000Z,,2026-02-26T13:54:46.615Z,1.510262
4,binancecoin,bnb,BNB,https://coin-images.coingecko.com/coins/images...,627.04,85514536835,5,85514540000.0,2170768000.0,637.54,...,200000000.0,1369.99,-54.23006,2025-10-13T08:41:24.131Z,0.039818,1574688.0,2017-10-19T00:00:00.000Z,,2026-02-26T13:54:51.792Z,1.859065


In [10]:
# --------------------------
# Monta snapshot e insere na camada RAW (BigQuery)
# --------------------------

# Captura o horário atual em UTC
# - Usar UTC evita problemas de fuso e facilita particionamento/auditoria
now_utc = datetime.now(timezone.utc)

# Monta o payload do snapshot
# Estrutura típica para camada RAW:
# - "meta": informações de onde veio, quando foi coletado, quantos registros etc..
# - "data": o conteúdo bruto (lista de ativos retornados pela API)
snapshot_payload = {
    "meta": {
        "source": "coingecko_api_v3",           # identifica origem (útil se houver mais fontes)
        "collected_at": now_utc.isoformat(),    # momento da coleta (rastreamento / reprocessamento)
        "count": len(assets),                   # total de itens coletados
    },
    "data": assets,                             # dados brutos do snapshot (list[dict])
}

# Prepara as linhas para inserir no BigQuery
# A ideia é gravar 1 linha por snapshot:
# - ingestion_timestamp: quando gravamos no BQ
# - source: redundante de propósito para facilitar filtros/partições sem abrir JSON
# - payload: o JSON completo (meta + data), preservando o "bruto" para reprocessar depois
rows_to_insert = [{
    "ingestion_timestamp": now_utc.isoformat(),  # timestamp de ingestão no BQ
    "source": "coingecko_api_v3",                # facilita filtros/linhagem
    "payload": snapshot_payload,                 # JSON bruto completo
}]

# Faz o load da lista JSON diretamente para a tabela RAW
# Observação:
# - client.load_table_from_json cria um job de carregamento
job = client.load_table_from_json(rows_to_insert, RAW_TABLE_ID)

# Bloqueia até o job terminar (garante consistência antes de seguir pipeline)
job.result()

# Confirmação simples de sucesso
print("Snapshot inserido na RAW:", RAW_TABLE_ID)

Snapshot inserido na RAW: crypto-data-pipeline-488018.crypto_pipeline.raw_tb


## 4) Derivar Top N por **price_change_percentage_24h_in_currency** a partir do último snapshot (SQL)
A camada RAW já esta rankeada por market_cap_usb, porém, trouxe de uma forma calculável esse top n a fim de visualização, podendo ser alterado a váriavel de escolha.


In [11]:
# Define quantos ativos queremos no "Top N"
# Aqui você está buscando os TOP_N com maior variação percentual (24h) no último snapshot
TOP_N = 3

# --------------------------
# Query: pega o último snapshot salvo na RAW e extrai os TOP N
# --------------------------
q_topn = f'''
-- 1) last_snapshot: seleciona SOMENTE o snapshot mais recente gravado na RAW
--    - Filtra pela fonte (source = "coingecko_api_v3")
--    - Usa ROW_NUMBER para pegar apenas o registro mais novo por ingestion_timestamp
WITH last_snapshot AS (
  SELECT payload
  FROM `{RAW_TABLE_ID}`
  WHERE source = "coingecko_api_v3"
  QUALIFY ROW_NUMBER() OVER (ORDER BY ingestion_timestamp DESC) = 1
),

-- 2) assets: "explode" o array JSON do snapshot (payload.data) e extrai campos relevantes
--    - UNNEST(JSON_QUERY_ARRAY(...)) transforma o array JSON em linhas
--    - JSON_VALUE lê campos escalares do JSON (id, market_cap, variação 24h)
--    - SAFE_CAST evita quebrar a query se algum valor vier inválido / NULL
assets AS (
  SELECT
    -- ID do ativo (ex: "bitcoin", "ethereum")
    JSON_VALUE(a, '$.id') AS crypto_id,

    -- Market cap em USD (como número grande; BIGNUMERIC suporta valores altos)
    SAFE_CAST(JSON_VALUE(a, '$.market_cap') AS BIGNUMERIC) AS market_cap_usd,

    -- Variação % em 24h (quando disponível) no vs_currency usado no /coins/markets
    SAFE_CAST(JSON_VALUE(a, '$.price_change_percentage_24h_in_currency') AS FLOAT64)
      AS price_change_percentage_24h_in_currency

  FROM last_snapshot,
  UNNEST(JSON_QUERY_ARRAY(payload, '$.data')) AS a
)

-- 3) Select final: ordena pela maior variação % 24h e retorna TOP_N
SELECT
  crypto_id
  -- Se quiser visualizar também:
  -- , market_cap_usd
  -- , price_change_percentage_24h_in_currency
FROM assets
WHERE crypto_id IS NOT NULL
ORDER BY price_change_percentage_24h_in_currency DESC NULLS LAST
LIMIT {TOP_N}
'''

# Executa a query no BigQuery e converte o resultado para DataFrame
df_top = client.query(q_topn).to_dataframe()

# Exibe para validação/inspeção no notebook
display(df_top)

# --------------------------
# Converte o DataFrame em lista de IDs (top_ids)
# --------------------------
# - dropna: remove IDs nulos
# - astype(str): garante string
# - tolist(): gera lista python para usar em próxima etapa (ex: buscar histórico)
top_ids = df_top["crypto_id"].dropna().astype(str).tolist()

print("Top IDs:", top_ids)

Unnamed: 0,crypto_id
0,power-protocol
1,centrifuge-2
2,dent


Top IDs: ['power-protocol', 'centrifuge-2', 'dent']


## 5) Coletar histórico diário de preço | Makertcap | Volume (/coins/markets/{slug}/history) para Top N


In [12]:
import time
from datetime import datetime, timezone, timedelta
import pandas as pd


def _throttle(last_call_ts: float | None, min_interval_s: float) -> float:
    """
    Garante um intervalo mínimo entre chamadas HTTP (throttle).

    Por que isso existe?
    - Mesmo com retry/backoff, a melhor forma de evitar 429 (rate limit) é NÃO bater no limite.
    - Aqui, controlamos o ritmo das chamadas com base no tempo desde a última request.

    Args:
      last_call_ts: timestamp (time.time()) da última chamada efetuada.
      min_interval_s: intervalo mínimo desejado entre chamadas.

    Returns:
      Um novo timestamp (time.time()) após dormir o necessário (ou imediato se não precisar dormir).
    """
    if last_call_ts is not None:
        elapsed = time.time() - last_call_ts
        if elapsed < min_interval_s:
            sleep_s = min_interval_s - elapsed
            print(f"  throttle: dormindo {sleep_s:.1f}s (min_interval_s={min_interval_s})")
            time.sleep(sleep_s)

    # Retorna "agora" como referência para a próxima comparação
    return time.time()


def _http_get_json_with_retry(
    url: str,
    params: dict,
    *,
    min_interval_s: float,
    last_call_ts: float | None,
    max_retries: int = 4,
    base_backoff_s: float = 10.0,
):
    """
    Wrapper para chamadas HTTP com duas camadas de proteção:

    1) Throttle EXTERNO:
       - Garante intervalo mínimo entre chamadas (min_interval_s), compartilhado entre moedas.
       - Evita sequência rápida de requests que derruba em 429.

    2) Retry específico para rate limit (429):
       - Se detectar 429, faz backoff incremental (10s, 20s, 30s, 40s...).
       - Repetição limitada por max_retries.

    Retorno:
      (payload_json, last_call_ts_atualizado)

    Obs:
    - Print propositalmente pra "evidenciar" no notebook o que está acontecendo
      (tentativas, throttle, backoff), o que é bom para explicar o case.
    """
    attempt = 0

    while True:
        attempt += 1

        # Antes de cada tentativa, aplicamos throttle (intervalo mínimo)
        last_call_ts = _throttle(last_call_ts, min_interval_s)

        print(f"GET {url} params={params} (tentativa {attempt}/{max_retries + 1})")

        try:
            # Aqui você está evitando usar min_interval dentro dela e centralizando o throttle aqui.
            payload = http_get_json(url, params=params)

            # Se deu certo, retornamos o JSON + timestamp atualizado para próxima chamada
            return payload, time.time()

        except Exception as e:
            # Heurística simples para identificar rate limit a partir da mensagem
            msg = str(e).lower()
            is_429 = ("429" in msg) or ("too many requests" in msg) or ("rate limit" in msg)

            # Se for 429 e ainda temos tentativas disponíveis, aplicamos backoff e tentamos de novo
            if is_429 and attempt <= max_retries:
                backoff = base_backoff_s * attempt  # 10, 20, 30, 40...
                print(f"  429/rate limit detectado. Backoff {backoff:.1f}s e retry...")
                time.sleep(backoff)
                continue

            # Qualquer outro erro (ou excedeu retries) -> falha
            raise


def get_history_range_df(
    coin_id: str,
    vs_currency: str,
    start_unix: int,
    end_unix: int,
    *,
    min_interval_s: float,
    last_call_ts: float | None,
) -> tuple[pd.DataFrame, float | None]:
    """
    Coleta série histórica de uma moeda via:
      GET /coins/{id}/market_chart/range

    Retorna:
      (df, last_call_ts_atualizado)

    O DataFrame final contém:
      timestamp_ms | price_usd | market_cap_usd | volume_usd | datetime_utc

    Estratégia:
    - A função devolve também o last_call_ts para que o throttle seja compartilhado entre moedas,
      controlando o ritmo global do pipeline.
    """
    # Monta URL e parâmetros do endpoint de range
    url = f"{BASE_URL}/coins/{coin_id}/market_chart/range"
    params = {"vs_currency": vs_currency, "from": start_unix, "to": end_unix}

    # Chamada protegida com throttle + retry 429
    payload, last_call_ts = _http_get_json_with_retry(
        url,
        params,
        min_interval_s=min_interval_s,
        last_call_ts=last_call_ts,
        max_retries=4,
        base_backoff_s=10.0,
    )

    # O payload vem com listas do tipo:
    # "prices": [[timestamp_ms, valor], ...]
    df = pd.DataFrame(payload.get("prices", []), columns=["timestamp_ms", "price_usd"])

    # Se não vier nenhum ponto, retorna DF vazio padronizado
    if df.empty:
        empty = pd.DataFrame(
            columns=["timestamp_ms", "price_usd", "market_cap_usd", "volume_usd", "datetime_utc"]
        )
        return empty, last_call_ts

    # Extrai market_caps e total_volumes (mesmo formato)
    df_mcap = pd.DataFrame(payload.get("market_caps", []), columns=["timestamp_ms", "market_cap_usd"])
    df_vol  = pd.DataFrame(payload.get("total_volumes", []), columns=["timestamp_ms", "volume_usd"])

    # Faz merge pelo timestamp para unificar as três séries no mesmo DF
    df = (
        df.merge(df_mcap, on="timestamp_ms", how="left")
          .merge(df_vol, on="timestamp_ms", how="left")
    )

    # Converte timestamp (ms) para datetime UTC
    df["datetime_utc"] = pd.to_datetime(df["timestamp_ms"], unit="ms", utc=True)

    return df, last_call_ts


# -------------------------
# Define janela de coleta (ex.: últimos 21 dias)
# -------------------------
end_dt = datetime.now(timezone.utc)
start_dt = end_dt - timedelta(days=20)

# Converte para Unix epoch (segundos), como o endpoint exige
start_unix, end_unix = int(start_dt.timestamp()), int(end_dt.timestamp())


# Lista de registros finais (formato pronto para DF e/ou carga no BQ)
rows: list[dict] = []

# Timestamp compartilhado entre moedas:
# - Isso garante que o throttle seja GLOBAL (não por moeda)
# - Ajuda muito a não cair no rate limit quando top_ids tem vários itens
last_call_ts: float | None = None


# -------------------------
# Loop principal: coleta histórico para cada coin_id
# -------------------------
for i, coin_id in enumerate(top_ids, start=1):
    try:
        print(f"\n[{i}/{len(top_ids)}] Coletando {coin_id}...")

        # Busca a série histórica (range) do coin_id dentro da janela
        df_coin, last_call_ts = get_history_range_df(
            coin_id=coin_id,
            vs_currency="usd",
            start_unix=start_unix,
            end_unix=end_unix,
            min_interval_s=30.0,       # ajuste de segurança p/ API pública
            last_call_ts=last_call_ts  # estado compartilhado entre chamadas
        )

        # Se não veio dados, pula para próxima moeda
        if df_coin.empty:
            print(f"  sem dados para {coin_id}")
            continue

        # -------------------------
        # Redução para 1 linha por dia (feature engineering simples):
        # - Cria coluna date a partir de datetime_utc
        # - Ordena por timestamp
        # - Agrupa por date e pega o ÚLTIMO ponto do dia (tail(1))
        #
        # Por que último ponto do dia?
        # - é uma regra simples e determinística
        # - evita múltiplas linhas por dia e deixa os dados prontos para séries diárias no BI
        # -------------------------
        df_coin["date"] = df_coin["datetime_utc"].dt.date

        df_daily = (
            df_coin.sort_values("timestamp_ms")
                  .groupby("date", as_index=False)
                  .tail(1)[["date", "price_usd", "market_cap_usd", "volume_usd"]]
        )

        # Converte para lista de dicts e adiciona a chave crypto_id em cada registro
        daily_records = df_daily.to_dict("records")
        for rec in daily_records:
            rec["crypto_id"] = coin_id

        # Acumula no "rows" final
        rows.extend(daily_records)

        print(f"  OK {coin_id}: {len(df_daily)} dias | total acumulado rows={len(rows)}")

    except Exception as e:
        # Falha por moeda não derruba o pipeline inteiro
        print(f"  ERRO em {coin_id}: {e}")


# -------------------------
# Consolida tudo em DataFrame final
# -------------------------
df_hist = pd.DataFrame(rows)

# Padroniza tipos (evita problemas no BI e/ou carga no BQ)
if not df_hist.empty:
    df_hist["date"] = pd.to_datetime(df_hist["date"]).dt.date
    for c in ["price_usd", "market_cap_usd", "volume_usd"]:
        df_hist[c] = pd.to_numeric(df_hist[c], errors="coerce")

# Preview e contagem
print("\nPreview df_hist:")
print(df_hist.head(10))
print("Linhas histórico:", len(df_hist))


[1/3] Coletando power-protocol...
GET https://api.coingecko.com/api/v3/coins/power-protocol/market_chart/range params={'vs_currency': 'usd', 'from': 1770387398, 'to': 1772115398} (tentativa 1/5)
  OK power-protocol: 21 dias | total acumulado rows=21

[2/3] Coletando centrifuge-2...
  throttle: dormindo 30.0s (min_interval_s=30.0)
GET https://api.coingecko.com/api/v3/coins/centrifuge-2/market_chart/range params={'vs_currency': 'usd', 'from': 1770387398, 'to': 1772115398} (tentativa 1/5)
  OK centrifuge-2: 21 dias | total acumulado rows=42

[3/3] Coletando dent...
  throttle: dormindo 30.0s (min_interval_s=30.0)
GET https://api.coingecko.com/api/v3/coins/dent/market_chart/range params={'vs_currency': 'usd', 'from': 1770387398, 'to': 1772115398} (tentativa 1/5)
  OK dent: 21 dias | total acumulado rows=63

Preview df_hist:
         date  price_usd  market_cap_usd    volume_usd       crypto_id
0  2026-02-06   0.202169    4.248110e+07  4.726472e+06  power-protocol
1  2026-02-07   0.205651 

## 6) Criar tabela FACT e carregar dados (BigQuery)


In [13]:
FACT_TABLE_ID = f"{PROJECT_ID}.{DATASET_ID}.fact_coin_history_daily"

schema_fact = [
    bigquery.SchemaField(
        "crypto_id", "STRING", mode="REQUIRED",
        description="Identificador único da criptomoeda na CoinGecko (ex: bitcoin, ethereum)."
    ),
    bigquery.SchemaField(
        "date", "DATE", mode="REQUIRED",
        description="Data de referência (UTC) do último preço disponível no dia."
    ),
    bigquery.SchemaField(
        "price_usd", "BIGNUMERIC", mode="NULLABLE",
        description="Preço em USD no fechamento do dia (último timestamp disponível)."
    ),
    bigquery.SchemaField(
        "market_cap_usd", "BIGNUMERIC", mode="NULLABLE",
        description="Market cap em USD no fechamento do dia."
    ),
    bigquery.SchemaField(
        "volume_usd", "BIGNUMERIC", mode="NULLABLE",
        description="Volume negociado em USD no fechamento do dia."
    ),
]

fact_table = bigquery.Table(FACT_TABLE_ID, schema=schema_fact)

fact_table.description = (
    "Tabela FACT: histórico diário de criptomoedas derivado da API CoinGecko. "
    "Contém preço, market cap e volume diário em USD por crypto_id."
)

# PARTITION por DATE
fact_table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="date",  # particiona usando a coluna date
)

# CLUSTER por crypto_id (melhora filtro/agrupamento por moeda)
fact_table.clustering_fields = ["crypto_id"]

fact_table = client.create_table(fact_table, exists_ok=True)
print("FACT pronta (particionada e clusterizada):", FACT_TABLE_ID)

FACT pronta (particionada e clusterizada): crypto-data-pipeline-488018.crypto_pipeline.fact_coin_history_daily


## 7) Carga Manual na tabela FACT


In [14]:
# ------------------------------------------------------------
# Etapa: carregar o histórico (df_hist) para uma tabela FACT no BigQuery
# ------------------------------------------------------------

# Estratégia de escrita:
# - TRUNCATE: substitui toda a tabela a cada execução (ótimo para sandbox/dev)
# - APPEND: adiciona linhas ao final (mais comum em produção, porém pode duplicar se não houver controle)
#
# Nota de arquitetura:
# Em produção (com billing e pipeline recorrente), o ideal evoluir para:
# 1) carregar em uma tabela "stage"
# 2) fazer MERGE na tabela final com chave (crypto_id, date)
WRITE_MODE = "TRUNCATE"


# Se não há dados, evita rodar job desnecessário e falhas
if df_hist.empty:
    print("df_hist vazio — nada para carregar.")

else:
    # Seleciona apenas as colunas que vão para a FACT (modelo mais limpo)
    # Mantém o DF original intacto com .copy()
    df_load = df_hist[
        ["crypto_id", "date", "price_usd", "market_cap_usd", "volume_usd"]
    ].copy()

    # ------------------------------------------------------------
    # Garantir tipos corretos (importante para BigQuery + BI)
    # ------------------------------------------------------------
    # - date: garante formato date (sem hora)
    # - métricas numéricas: força conversão; se vier string/erro, vira NaN (coerce)
    df_load["date"] = pd.to_datetime(df_load["date"]).dt.date
    df_load["price_usd"] = pd.to_numeric(df_load["price_usd"], errors="coerce")
    df_load["market_cap_usd"] = pd.to_numeric(df_load["market_cap_usd"], errors="coerce")
    df_load["volume_usd"] = pd.to_numeric(df_load["volume_usd"], errors="coerce")

    # ------------------------------------------------------------
    # Configuração do job de carga
    # ------------------------------------------------------------
    # create_disposition:
    # - CREATE_IF_NEEDED: cria a tabela se não existir (bom para sandbox)
    #
    # write_disposition:
    # - WRITE_TRUNCATE: apaga e sobrescreve a tabela
    # - WRITE_APPEND: adiciona novas linhas
    job_config = bigquery.LoadJobConfig(
        create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED,
        write_disposition=(
            bigquery.WriteDisposition.WRITE_TRUNCATE
            if WRITE_MODE == "TRUNCATE"
            else bigquery.WriteDisposition.WRITE_APPEND
        ),
    )

    # ------------------------------------------------------------
    # Executa a carga do DataFrame para o BigQuery
    # ------------------------------------------------------------
    job = client.load_table_from_dataframe(
        df_load,          # DataFrame final já tipado
        FACT_TABLE_ID,    # destino (ex: dataset.fact_crypto_daily)
        job_config=job_config
    )

    # Aguarda o job finalizar (consistência antes de seguir pipeline)
    job.result()

    # Confirmação simples
    print("Carga concluída:", FACT_TABLE_ID, "| linhas:", len(df_load))

Carga concluída: crypto-data-pipeline-488018.crypto_pipeline.fact_coin_history_daily | linhas: 63


# 8) Query simples (registro diário das moedas no dia 2026-02-23 na FACT)


In [15]:
q_latest = f"""
SELECT
  date,
  crypto_id,
  price_usd,
  market_cap_usd,
  volume_usd
FROM `{FACT_TABLE_ID}`
WHERE date = '2026-02-23'
"""

df_example = client.query(q_latest).to_dataframe()
display(df_example)

Unnamed: 0,date,crypto_id,price_usd,market_cap_usd,volume_usd
0,2026-02-23,centrifuge-2,0.087585,50506760.0,2625097.0
1,2026-02-23,dent,0.000122,11701710.0,1959363.0
2,2026-02-23,power-protocol,0.505208,105559500.0,24993630.0
