In [0]:
%pip install requests pandas
dbutils.library.restartPython()

import requests
import pandas as pd
import time
from datetime import datetime, timedelta
from pyspark.sql import functions as F

# --- Fun√ß√£o com retry para evitar 429 ---
def get_with_retry(url, params, retries=5, wait=5):
    for i in range(retries):
        resp = requests.get(url, params=params)
        if resp.status_code == 200:
            return resp.json()
        elif resp.status_code == 429:
            print(f"‚ö†Ô∏è Limite atingido. Tentativa {i+1}/{retries}. Esperando {wait*(i+1)}s...")
            time.sleep(wait * (i+1))
        else:
            resp.raise_for_status()
    raise Exception("‚ùå Falhou ap√≥s v√°rias tentativas.")

# --- 1. Buscar Top 50 moedas ---
url_markets = "https://api.coingecko.com/api/v3/coins/markets"
params = {"vs_currency": "usd", "order": "market_cap_desc", "per_page": 50, "page": 1, "sparkline": False}
resp = requests.get(url_markets, params=params)
resp.raise_for_status()
top50_data = resp.json()

coin_meta = {c["id"]: {"symbol": c["symbol"], "name": c["name"]} for c in top50_data}
top50 = list(coin_meta.keys())
print(f"Total de moedas capturadas: {len(top50)}")

# --- 2. Descobrir √∫ltimo dia carregado ---
output_path = "/Volumes/coingecko/raw/raw"

try:
    sdf_exist = spark.read.parquet(output_path)
    ultima_data = sdf_exist.agg(F.max("dt")).collect()[0][0]
    print(f"üìÖ √öltima data encontrada: {ultima_data}")
except Exception:
    ultima_data = None
    print("üìÇ Nenhum dado encontrado ainda. Come√ßando do zero.")

# --- 3. Calcular intervalo de datas a processar ---
hoje = datetime.now().date()
dmenos2 = hoje - timedelta(days=2)   # limite m√°ximo = D-2

if ultima_data is None:
    dt_inicio = dmenos2  # primeira execu√ß√£o: j√° come√ßa do D-2
else:
    dt_inicio = ultima_data + timedelta(days=1)  # dia seguinte ao √∫ltimo carregado

# Se j√° estamos al√©m do D-2, nada a fazer
if dt_inicio > dmenos2:
    print("‚úÖ Dados j√° atualizados at√© D-2.")
    dbutils.notebook.exit("Done")

# --- 4. Fun√ß√£o para processar moedas em um intervalo de dias ---
def processar_dias(coins, start_date, end_date, output_path):
    all_data = []

    for single_date in pd.date_range(start_date, end_date):
        dt_from = int(datetime.combine(single_date, datetime.min.time()).timestamp())
        dt_to   = int(datetime.combine(single_date, datetime.max.time()).timestamp())

        print(f"\nüìÖ Processando data: {single_date.date()}")

        for coin in coins:
            url_hist = f"https://api.coingecko.com/api/v3/coins/{coin}/market_chart/range"
            params = {"vs_currency": "usd", "from": dt_from, "to": dt_to}

            data = get_with_retry(url_hist, params, retries=5, wait=10)

            for i in range(len(data.get("prices", []))):
                ts, price = data["prices"][i]
                _, mcap = data["market_caps"][i]
                _, vol = data["total_volumes"][i]

                all_data.append({
                    "id": coin,
                    "symbol": coin_meta[coin]["symbol"],
                    "name": coin_meta[coin]["name"],
                    "timestamp": datetime.fromtimestamp(ts/1000),
                    "current_price": price,
                    "market_cap": mcap,
                    "total_volume": vol
                })

            print(f"‚úî Hist√≥rico coletado para {coin}")
            time.sleep(2)

    # --- Converter em Spark DF e salvar particionado ---
    df = pd.DataFrame(all_data)
    if df.empty:
        print("‚ö†Ô∏è Nenhum dado retornado.")
        return

    sdf = spark.createDataFrame(df)
    sdf = (
        sdf.withColumn("dt", F.to_date(F.col("timestamp")))
           .withColumn("mes", F.date_format("dt", "yyyy-MM"))
    )

    (sdf.write
        .mode("append")
        .partitionBy("mes", "dt")
        .parquet(output_path)
    )

    print(f"üíæ Dados salvos em: {output_path}")

# --- 5. Rodar execu√ß√£o ---
processar_dias(top50, dt_inicio, dmenos2, output_path)
