In [5]:
# save_btc_csv.py
import os
from pathlib import Path
import pandas as pd
import numpy as np
from binance.client import Client
from dotenv import load_dotenv

CSV_PATH = "btc_prices_and_metrics.csv"
SYMBOL = "BTCUSDT"
INTERVAL = Client.KLINE_INTERVAL_1DAY
START = "20 years ago UTC"  # Binance devolverá desde el inicio disponible (2017-08-17 para BTCUSDT spot)

# Halvings como fechas "naive" (sin tz) para que cuadre con el RL script
HALVING_DATES = pd.to_datetime([
    "2012-11-28",
    "2016-07-09",
    "2020-05-11",
    "2024-04-20",
])

def compute_days_since_halving(d: pd.Timestamp) -> int:
    # d será naive; HALVING_DATES también naive
    past = [h for h in HALVING_DATES if h <= d]
    ref = past[-1] if past else HALVING_DATES[0]
    return (d - ref).days

def main():
    # Cargar claves (no son estrictamente necesarias para klines públicos)
    load_dotenv("../../envs/.env")
    api_key = os.getenv("copy_key")
    api_secret = os.getenv("copy_secret")
    client = Client(api_key, api_secret)

    klines = client.get_historical_klines(SYMBOL, INTERVAL, START)

    df = pd.DataFrame(
        klines,
        columns=[
            "open_time","open","high","low","close","volume",
            "close_time","quote_asset_volume","number_of_trades",
            "taker_buy_base","taker_buy_quote","ignore"
        ]
    )

    # --- Mantener columnas necesarias para tu RL ---
    # Convertimos a datetime, primero en UTC y luego quitamos tz para que quede "naive".
    df["date"] = pd.to_datetime(df["open_time"], unit="ms", utc=True).dt.tz_localize(None)
    df["close"] = pd.to_numeric(df["close"], errors="coerce")

    df = df[["date", "close"]].sort_values("date").reset_index(drop=True)

    # Métrica requerida por tu RL (también la calcula ahí, pero así no hay fallos al validar)
    df["days_since_halving"] = df["date"].apply(compute_days_since_halving)

    # (Opcional) Espacios para FEATURES futuras:
    # for col in ["m2_growth", "fed_funds_rate", "active_addresses"]:
    #     df[col] = np.nan

    # Guardar CSV
    out_path = Path(CSV_PATH).resolve()
    df.to_csv(out_path, index=False)

    print(f"✅ Guardado {len(df)} filas en: {out_path}")
    print(f"Rango de fechas: {df['date'].min().date()} → {df['date'].max().date()}")

if __name__ == "__main__":
    main()


✅ Guardado 2916 filas en: /home/bebo/Documents/Cristina01/00/btc_prices_and_metrics.csv
Rango de fechas: 2017-08-17 → 2025-08-10


In [6]:
# pip install pandas numpy gymnasium stable-baselines3==2.3.2 torch scikit-learn

import math
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Optional, Tuple

import gymnasium as gym
from gymnasium import spaces

from sklearn.preprocessing import StandardScaler
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy

# ---------- Config you will likely edit ----------
CSV_PATH = "btc_prices_and_metrics.csv"  # <- your file from Binance + metrics
DATE_COL = "date"
PRICE_COL = "close"

# Add the columns you want in the state (must exist in the CSV).
# The script will also compute 'days_since_halving' automatically and append it.
FEATURE_COLS = [
    # examples (edit to match your CSV):
    # "m2_growth",
    # "fed_funds_rate",
    # "active_addresses",
    # "tx_volume_usd",
]

# ~4 months in trading days: use 120 (you can change it).
HOLD_DAYS = 120

# Train/test split by date: everything strictly before TEST_START is train
TEST_START = "2023-01-01"

# -------------------------------------------------

HALVING_DATES = [
    "2012-11-28",
    "2016-07-09",
    "2020-05-11",
    "2024-04-20",
]
HALVING_DATES = pd.to_datetime(HALVING_DATES).to_list()


def compute_days_since_halving(d: pd.Timestamp) -> int:
    # last halving on or before date d; if none, use first halving distance
    past = [h for h in HALVING_DATES if h <= d]
    ref = past[-1] if past else HALVING_DATES[0]
    return (d - ref).days


@dataclass
class EpisodeResult:
    entered: bool
    entry_idx: Optional[int]
    exit_idx: Optional[int]
    pct_return: float


class BTCEnterOnlyEnv(gym.Env):
    """
    Action space:
      0 = WAIT (do nothing today)
      1 = ENTER (if not in position). Once entered, we auto-hold for HOLD_DAYS, then episode ends.

    Observation:
      Normalized vector of features:
        - FEATURE_COLS (whatever you pass in)
        - days_since_halving (auto-added)
        - days_to_series_end (optional helper for the net, included below)
    Reward:
      0 every step until the auto-exit day.
      On exit day: (price_exit / price_entry) - 1
      If episode ends without entering: reward = 0
    """

    metadata = {"render_modes": []}

    def __init__(
        self,
        df: pd.DataFrame,
        feature_cols: List[str],
        hold_days: int = 120,
        scaler: Optional[StandardScaler] = None,
        start_idx: int = 0,
        end_idx: Optional[int] = None,
    ):
        super().__init__()
        self.raw_df = df.reset_index(drop=True).copy()
        self.feature_cols = list(feature_cols)
        self.hold_days = hold_days
        self.scaler = scaler
        self.start_idx = start_idx
        self.end_idx = len(self.raw_df) if end_idx is None else end_idx

        # internal state
        self.t = None
        self.in_position = False
        self.entry_idx = None
        self.exit_idx = None
        self.entry_price = None

        # Build features matrix (unnormalized)
        feats = self.raw_df.loc[:, self.feature_cols].copy() if self.feature_cols else pd.DataFrame(index=self.raw_df.index)
        # days_since_halving
        feats["days_since_halving"] = self.raw_df["days_since_halving"].astype(float)
        # (Optional) days_to_series_end — can help avoid end-of-series traps
        feats["days_to_series_end"] = (len(self.raw_df) - 1 - np.arange(len(self.raw_df))).astype(float)

        self.full_feature_cols = list(feats.columns)
        self.X = feats.values.astype(np.float32)

        # Normalize with provided scaler (fit outside for train/test consistency)
        if self.scaler is not None:
            self.X = self.scaler.transform(self.X)

        # Observation space
        obs_dim = self.X.shape[1]
        self.observation_space = spaces.Box(low=-5.0, high=5.0, shape=(obs_dim,), dtype=np.float32)
        # Action space: WAIT or ENTER
        self.action_space = spaces.Discrete(2)

    def _get_obs(self) -> np.ndarray:
        return self.X[self.t]

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        # start somewhere that still allows a full hold window
        self.t = self.start_idx
        self.in_position = False
        self.entry_idx = None
        self.exit_idx = None
        self.entry_price = None
        return self._get_obs(), {}

    def step(self, action: int):
        terminated = False
        reward = 0.0
        info = {}

        # if we're not in position, decide to enter or wait
        if not self.in_position:
            if action == 1:
                # enter now if there's room to hold
                if self.t + self.hold_days < self.end_idx:
                    self.in_position = True
                    self.entry_idx = self.t
                    self.exit_idx = self.t + self.hold_days
                    self.entry_price = float(self.raw_df.loc[self.entry_idx, PRICE_COL])
                else:
                    # Cannot enter because there's not enough data left; treat as wait
                    pass
        else:
            # already in position; ignore actions (policy can still output something)
            pass

        # advance time
        self.t += 1

        # If we just reached or passed planned exit day, finalize
        if self.in_position and self.t >= self.exit_idx:
            exit_price = float(self.raw_df.loc[self.exit_idx, PRICE_COL])
            reward = (exit_price / self.entry_price) - 1.0
            terminated = True
            info["entry_idx"] = self.entry_idx
            info["exit_idx"] = self.exit_idx
            info["pct_return"] = reward

        # If end of data and never entered, terminate with zero reward
        if self.t >= self.end_idx - 1 and not terminated:
            terminated = True
            info["entry_idx"] = self.entry_idx
            info["exit_idx"] = self.exit_idx
            info["pct_return"] = 0.0

        obs = self._get_obs() if not terminated else np.zeros_like(self._get_obs(), dtype=np.float32)
        return obs, float(reward), terminated, False, info


def load_and_prepare(csv_path: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)
    # Basic checks
    if DATE_COL not in df.columns or PRICE_COL not in df.columns:
        raise ValueError(f"CSV must contain '{DATE_COL}' and '{PRICE_COL}' columns.")

    df[DATE_COL] = pd.to_datetime(df[DATE_COL])
    df = df.sort_values(DATE_COL).reset_index(drop=True)

    # forward-fill any missing metrics
    for c in FEATURE_COLS:
        if c not in df.columns:
            raise ValueError(f"Feature column '{c}' not found in CSV.")
        df[c] = pd.to_numeric(df[c], errors="coerce")
    use_cols = FEATURE_COLS.copy()

    df["days_since_halving"] = df[DATE_COL].apply(compute_days_since_halving)
    return df


def make_scaler(train_df: pd.DataFrame) -> Tuple[StandardScaler, List[str]]:
    feats = pd.DataFrame(index=train_df.index)
    for c in FEATURE_COLS:
        feats[c] = train_df[c].astype(float)
    feats["days_since_halving"] = train_df["days_since_halving"].astype(float)
    feats["days_to_series_end"] = (len(train_df) - 1 - np.arange(len(train_df))).astype(float)

    scaler = StandardScaler()
    scaler.fit(feats.values.astype(np.float32))
    full_feature_cols = list(feats.columns)
    return scaler, full_feature_cols


def date_to_index(df: pd.DataFrame, dt: str) -> int:
    dt = pd.to_datetime(dt)
    # first index >= dt
    idx = int(np.searchsorted(df[DATE_COL].values.astype("datetime64[ns]"), dt.to_datetime64(), side="left"))
    return idx


def train_and_eval():
    df = load_and_prepare(CSV_PATH)

    split_idx = date_to_index(df, TEST_START)
    train_end_idx = max(split_idx - 1, 0)

    # Fit scaler on train slice only
    scaler, full_cols = make_scaler(df.iloc[:split_idx].reset_index(drop=True))

    # Build envs
    train_env = DummyVecEnv([
        lambda: BTCEnterOnlyEnv(
            df=df,
            feature_cols=FEATURE_COLS,
            hold_days=HOLD_DAYS,
            scaler=scaler,
            start_idx=0,
            end_idx=split_idx,  # up to but not including test period
        )
    ])

    test_env = BTCEnterOnlyEnv(
        df=df,
        feature_cols=FEATURE_COLS,
        hold_days=HOLD_DAYS,
        scaler=scaler,
        start_idx=split_idx,
        end_idx=None,
    )

    # DQN hyperparams (tune as needed)
    model = DQN(
        "MlpPolicy",
        train_env,
        learning_rate=3e-4,
        buffer_size=50_000,
        learning_starts=1_000,
        batch_size=256,
        tau=1.0,
        gamma=0.99,
        train_freq=4,
        target_update_interval=2_000,
        exploration_fraction=0.2,
        exploration_final_eps=0.05,
        verbose=1,
        tensorboard_log=None,
        seed=42,
        policy_kwargs=dict(net_arch=[256, 256]),
    )

    # Train
    timesteps = 200_000  # adjust as needed
    model.learn(total_timesteps=timesteps)

    # Evaluate on test env (single-episode sweep over the test window)
    ep_rewards, ep_returns = run_single_episode(test_env, model)
    print("\n=== Test Episode ===")
    print(f"Entry idx: {ep_returns.entry_idx}, Exit idx: {ep_returns.exit_idx}")
    print(f"Test % return (RL): {ep_returns.pct_return:.4f}")

    # Compare to random-entry baseline on the same test window
    baseline = random_entry_baseline(test_env)
    print(f"Random-entry % return (1 trial): {baseline:.4f}")

    # Optional: multiple random trials
    trials = [random_entry_baseline(test_env, rng=np.random.default_rng(123 + i)) for i in range(50)]
    print(f"Random-entry mean over 50 trials: {np.mean(trials):.4f} (std {np.std(trials):.4f})")


def run_single_episode(env: BTCEnterOnlyEnv, model: DQN) -> Tuple[list, EpisodeResult]:
    obs, info = env.reset()
    done = False
    rewards = []
    while not done:
        action, _ = model.predict(obs, deterministic=True)
        obs, r, terminated, truncated, info = env.step(int(action))
        done = terminated or truncated
        rewards.append(r)

    res = EpisodeResult(
        entered=info.get("entry_idx") is not None,
        entry_idx=info.get("entry_idx"),
        exit_idx=info.get("exit_idx"),
        pct_return=float(info.get("pct_return", 0.0)),
    )
    return rewards, res


def random_entry_baseline(env: BTCEnterOnlyEnv, rng: Optional[np.random.Generator] = None) -> float:
    """
    Baseline: pick a random day within the test window that still allows a full HOLD_DAYS,
    enter, then compute the same 4-month return.
    """
    rng = rng or np.random.default_rng(0)
    # indices allowed for entry:
    valid_start = env.start_idx
    valid_end = env.end_idx - env.hold_days - 1
    if valid_end <= valid_start:
        return 0.0

    entry_idx = rng.integers(valid_start, valid_end + 1)
    exit_idx = entry_idx + env.hold_days
    entry_price = float(env.raw_df.loc[entry_idx, PRICE_COL])
    exit_price = float(env.raw_df.loc[exit_idx, PRICE_COL])
    return (exit_price / entry_price) - 1.0


if __name__ == "__main__":
    train_and_eval()


Using cpu device
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.989    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 8524     |
|    time_elapsed     | 0        |
|    total_timesteps  | 481      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.977    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 8157     |
|    time_elapsed     | 0        |
|    total_timesteps  | 961      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.966    |
| time/               |          |
|    episodes         | 12       |
|    fps              | 897      |
|    time_elapsed     | 1        |
|    total_timesteps  | 1442     |
| train/              |          |
|    learning_rate    | 0.0003   |
|  

In [None]:
"""
Agent 1 — Crypto Entry (Direct Policy RL via PPO) — v3
------------------------------------------------------
Cambios vs tu v2:
  • Integra PPO (on-policy, direct policy learning) sobre CryptoEntryEnv
  • Split temporal train/test por fecha configurable
  • Evaluación del agente y emisión de señales usando la política entrenada
  • Mantiene: halving features, strict alignment, emit_signals estandarizado

Autor base: ChatGPT (adaptado a petición)
"""
from __future__ import annotations
import math
import os
from dataclasses import dataclass, asdict
from typing import Dict, List, Tuple, Optional, Callable

import numpy as np
import pandas as pd

# Optional Gymnasium (no requerido para utilidades, sí para entrenar)
try:
    import gymnasium as gym
    from gymnasium import spaces
except Exception:
    gym = None
    spaces = None

# RL (direct policy)
try:
    from stable_baselines3 import PPO
    from stable_baselines3.common.vec_env import DummyVecEnv
except Exception:
    PPO = None
    DummyVecEnv = None

# ---------------------------------
# Config
# ---------------------------------
@dataclass
class Config:
    # Core horizon/interval
    horizon_days: int = 120
    trading_calendar: str = "UTC"

    # Data expectations
    price_csv: str = "price.csv"
    btc_dominance_csv: Optional[str] = "btc_dominance.csv"
    total_mcap_csv: Optional[str] = "total_mcap.csv"
    stablecoin_supply_csv: Optional[str] = "stablecoin_supply.csv"
    fear_greed_csv: Optional[str] = "fear_greed.csv"
    funding_rate_csv: Optional[str] = None
    open_interest_csv: Optional[str] = None
    dxy_csv: Optional[str] = None
    policy_rate_csv: Optional[str] = None

    # Alignment / verification
    strict_common_period: bool = True       # enforce identical date index across all present series
    require_coverage_pct: float = 0.85
    min_history_days: int = 365 * 4

    # Feature params
    rsi_period: int = 14
    vol_windows: Tuple[int, int] = (7, 30)
    return_windows: Tuple[int, int, int, int] = (1, 7, 14, 30)

    # Env params
    enter_penalty: float = -0.0005
    miss_penalty: float = 0.0
    reward_clip: Optional[float] = 0.5

    # Train/test split
    test_start_date: Optional[str] = "2023-01-01"  # None => usa todo para train

    # PPO hparams
    total_timesteps: int = 1_000_000
    learning_rate: float = 3e-4
    n_steps: int = 2048
    batch_size: int = 256
    gamma: float = 0.99
    gae_lambda: float = 0.95
    ent_coef: float = 0.01
    clip_range: float = 0.2
    seed: int = 42
    net_arch: Tuple[int, int] = (256, 256)

    # Señales
    signal_asset: str = "BTC"
    signals_out_csv: Optional[str] = "/mnt/data/agent1_signals_rl.csv"

CFG = Config()

# ---------------------------------
# Data loading / catalog
# ---------------------------------
class DataCatalog:
    def __init__(self, cfg: Config):
        self.cfg = cfg
        self.frames: Dict[str, pd.DataFrame] = {}
    @staticmethod
    def _read_csv(path: str, value_col: Optional[str] = None) -> pd.DataFrame:
        if not os.path.exists(path):
            raise FileNotFoundError(f"Missing: {path}")
        df = pd.read_csv(path)

        # localizar columna de fecha
        date_col = None
        for c in df.columns:
            if c.lower() in ("date", "timestamp", "time"):
                date_col = c
                break
        if date_col is None:
            raise ValueError(f"{path}: could not find a Date/Timestamp column")

        df[date_col] = pd.to_datetime(df[date_col], utc=True, errors="coerce").dt.tz_convert(None)
        df = df.set_index(date_col).sort_index()

        # 👇 NUEVO: si es un archivo de precios con OHLCV, conservar todas las columnas
        cols_lower = {c.lower() for c in df.columns}
        if {"open", "high", "low", "close", "volume"}.issubset(cols_lower):
            return df  # ← no colapsar a 'value'

        # Para series 1D (macro), elegir una columna numérica y renombrar a 'value'
        if value_col is None:
            for candidate in ("Close", "close", "value", "Value"):
                if candidate in df.columns:
                    value_col = candidate
                    break
        if value_col is None:
            # si no hay 'Close/value', toma la primera numérica
            num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
            if not num_cols:
                raise ValueError(f"{path}: no numeric columns to use as value")
            value_col = num_cols[0]

        return df[[value_col]].rename(columns={value_col: "value"})

    def load(self) -> None:
        price = self._read_csv(self.cfg.price_csv)
        needed = {"Open", "High", "Low", "Close", "Volume"}
        print(pd.read_csv("price.csv").columns)
# Debe imprimir: Index(['Date','Open','High','Low','Close','Volume'], dtype='object')

        if not needed.issubset(set(price.columns)):
            raise ValueError(f"price.csv must contain columns: {needed}")
        
        price = price.resample("1D").last().dropna(subset=["Close"])
        self.frames["price"] = price

        def try_load(name: str, path: Optional[str]):
            if path:
                df = self._read_csv(path)
                if "value" not in df.columns:
                    num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
                    if not num_cols:
                        raise ValueError(f"{name}: no numeric columns to use as value")
                    df = df[[num_cols[0]]].rename(columns={num_cols[0]: "value"})
                df = df.resample("1D").last()
                self.frames[name] = df

        try_load("btc_dominance", self.cfg.btc_dominance_csv)
        try_load("total_mcap", self.cfg.total_mcap_csv)
        try_load("stablecoin_supply", self.cfg.stablecoin_supply_csv)
        try_load("fear_greed", self.cfg.fear_greed_csv)
        try_load("funding_rate", self.cfg.funding_rate_csv)
        try_load("open_interest", self.cfg.open_interest_csv)
        try_load("dxy", self.cfg.dxy_csv)
        try_load("policy_rate", self.cfg.policy_rate_csv)

    def align_daily_panel(self) -> pd.DataFrame:
        """Merge onto a single DAILY index. If strict_common_period is True,
        enforce identical date index across all present series (drop any rows with NA)."""
        if "price" not in self.frames:
            raise RuntimeError("Price not loaded")
        idx = self.frames["price"].index
        panel = pd.DataFrame(index=idx)

        panel["close"] = self.frames["price"]["Close"].copy()
        panel["volume"] = self.frames["price"]["Volume"].copy()

        # merge (left reindex to price)
        for name, df in self.frames.items():
            if name == "price":
                continue
            s = df.reindex(idx)["value"].astype(float)
            panel[name] = s

        # forward-fill truly slow series (optional)
        if "policy_rate" in panel.columns:
            panel["policy_rate"] = panel["policy_rate"].ffill()

        if self.cfg.strict_common_period:
            # keep only rows where ALL present columns are non-null
            panel = panel.dropna(how="any")

        return panel

    def verify(self, panel: pd.DataFrame) -> Dict[str, any]:
        report = {}
        diffs = panel.index.to_series().diff().dropna().value_counts(normalize=True)
        report["is_daily_uniform"] = bool(diffs.index.min() == pd.Timedelta(days=1) and len(diffs) == 1)
        report["start"] = str(panel.index.min().date()) if len(panel) else None
        report["end"] = str(panel.index.max().date()) if len(panel) else None
        report["num_days"] = int(panel.shape[0])
        coverage = panel.notna().mean() if len(panel) else pd.Series(dtype=float)
        report["coverage_per_feature"] = coverage.to_dict()
        report["min_coverage"] = float(coverage.min()) if len(coverage) else 0.0
        report["meets_min_coverage"] = bool(coverage.min() >= self.cfg.require_coverage_pct) if len(coverage) else False
        report["meets_min_history"] = bool(panel.shape[0] >= self.cfg.min_history_days)
        return report

# ---------------------------------
# Feature Builder (+ halving)
# ---------------------------------
BTC_HALVINGS = [
    pd.Timestamp("2012-11-28"),
    pd.Timestamp("2016-07-09"),
    pd.Timestamp("2020-05-11"),
    pd.Timestamp("2024-04-20"),
]

class FeatureBuilder:
    def __init__(self, cfg: Config):
        self.cfg = cfg

    @staticmethod
    def _rsi(close: pd.Series, period: int = 14) -> pd.Series:
        delta = close.diff()
        gain = delta.clip(lower=0).rolling(period).mean()
        loss = (-delta.clip(upper=0)).rolling(period).mean()
        rs = gain / (loss + 1e-12)
        return 100 - (100 / (1 + rs))

    @staticmethod
    def _halving_columns(index: pd.DatetimeIndex) -> pd.DataFrame:
        # For each date, compute days since the most recent halving (non-negative), else NaN
        df = pd.DataFrame(index=index)
        last_halving = []
        for dt in index:
            # find latest halving <= dt
            prior = [h for h in BTC_HALVINGS if h <= dt]
            if prior:
                d = (dt - prior[-1]).days
                last_halving.append(float(d))
            else:
                last_halving.append(np.nan)
        s = pd.Series(last_halving, index=index, name="days_since_halving")
        # z-score over rolling window (180 days) to avoid look-ahead (lag later anyway)
        z = (s - s.rolling(180, min_periods=30).mean()) / (s.rolling(180, min_periods=30).std() + 1e-9)
        out = pd.DataFrame({"days_since_halving": s, "days_since_halving_z": z})
        return out

    def build(self, panel: pd.DataFrame) -> pd.DataFrame:
        df = panel.copy()

        # ---- Technicals
        for w in self.cfg.return_windows:
            df[f"ret_{w}d"] = df["close"].pct_change(w)
        for w in self.cfg.vol_windows:
            df[f"vol_{w}d"] = df["close"].pct_change().rolling(w).std() * math.sqrt(365)
        df["rsi"] = self._rsi(df["close"], self.cfg.rsi_period)
        df["vol_to_vola"] = (df["volume"].rolling(7).mean() / (df["close"].pct_change().rolling(7).std() + 1e-9))

        # ---- Macro-crypto deltas/z
        for name in ("btc_dominance", "total_mcap", "stablecoin_supply", "fear_greed", "funding_rate", "open_interest", "dxy", "policy_rate"):
            if name in df.columns:
                df[f"{name}_chg_1d"] = df[name].pct_change()
                df[f"{name}_chg_7d"] = df[name].pct_change(7)
                df[f"{name}_z"] = (df[name] - df[name].rolling(90).mean()) / (df[name].rolling(90).std() + 1e-9)

        # ---- Halving features
        halv = self._halving_columns(df.index)
        df = df.join(halv)

        # ---- Lag features to prevent look-ahead
        feature_cols = [c for c in df.columns if c not in ("close", "volume")]
        df[feature_cols] = df[feature_cols].shift(1)

        # ---- Target
        H = self.cfg.horizon_days
        df[f"fwd_ret_{H}d"] = df["close"].shift(-H) / df["close"] - 1
        return df

# ---------------------------------
# Gym Env (compatible con PPO; acciones {0: WAIT, 1: ENTER})
# ---------------------------------
class CryptoEntryEnv(gym.Env if gym else object):
    metadata = {"render_modes": []}
    def __init__(self, features: pd.DataFrame, cfg: Config):
        assert gym is not None and spaces is not None, "Gymnasium no disponible. Instala: pip install gymnasium"
        self.cfg = cfg
        self.df = features.copy()
        self.valid = self.df.dropna().copy()
        H = cfg.horizon_days
        self.valid = self.valid.iloc[:-H].copy()
        self.feature_cols = [c for c in self.valid.columns if c not in ("close", "volume", f"fwd_ret_{H}d")]
        self.obs_mat = self.valid[self.feature_cols].values.astype(np.float32)
        self.targets = self.valid[f"fwd_ret_{H}d"].values.astype(np.float32)

        # reemplaza este bloque dentro de __init__ de CryptoEntryEnv
        bound = 1e6  # o ajusta según tus features; 1e4–1e6 suele bastar
        low  = np.full((len(self.feature_cols),), -bound, dtype=np.float32)
        high = np.full((len(self.feature_cols),),  bound, dtype=np.float32)
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)

        self.action_space = spaces.Discrete(2)
        self._t = 0

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        self._t = 0
        return self._get_obs(), {}

    def _get_obs(self):
        return np.nan_to_num(self.obs_mat[self._t], nan=0.0, posinf=1e6, neginf=-1e6).astype(np.float32)

    def step(self, action: int):
        assert action in (0, 1)
        reward = 0.0
        info = {}
        if action == 1:
            r = float(self.targets[self._t])
            if self.cfg.reward_clip is not None:
                r = float(np.clip(r, -self.cfg.reward_clip, self.cfg.reward_clip))
            reward = r
        else:
            reward = self.cfg.enter_penalty
            if self.cfg.miss_penalty != 0.0 and self.targets[self._t] > 0.1:
                reward += -abs(self.cfg.miss_penalty)

        self._t += 1
        terminated = self._t >= (self.obs_mat.shape[0] - 1)
        truncated = False
        obs = self._get_obs() if not terminated else np.zeros_like(self.obs_mat[0], dtype=np.float32)
        return obs, float(reward), terminated, truncated, info

# ---------------------------------
# Baselines
# ---------------------------------
def random_entry_baseline(df: pd.DataFrame, horizon_days: int, n_trials: int = 1000, seed: int = 7) -> Dict[str, float]:
    rng = np.random.default_rng(seed)
    valid = df.dropna(subset=[f"fwd_ret_{horizon_days}d"])
    if valid.empty:
        return {"mean": float("nan"), "std": float("nan"), "p25": float("nan"), "p50": float("nan"), "p75": float("nan")}
    rets = []
    for _ in range(n_trials):
        t = rng.integers(0, len(valid))
        rets.append(float(valid.iloc[t][f"fwd_ret_{horizon_days}d"]))
    arr = np.array(rets)
    return {
        "mean": float(arr.mean()),
        "std": float(arr.std(ddof=1)),
        "p25": float(np.quantile(arr, 0.25)),
        "p50": float(np.quantile(arr, 0.50)),
        "p75": float(np.quantile(arr, 0.75)),
    }

# ---------------------------------
# Signal emission (handoff to Agent 2)
# ---------------------------------
Signal = Dict[str, object]

def emit_signals(features: pd.DataFrame, cfg: Config, policy: Callable[[pd.Series], Tuple[bool, float, float]],
                 asset: str = "BTC", out_csv: Optional[str] = None) -> List[Signal]:
    """
    Itera sobre features diarios válidos, llama `policy(row)` -> (enter_bool, weight, confidence).
    Si enter_bool, emite señal estandarizada para Agent 2.
    """
    H = cfg.horizon_days
    valid = features.dropna().iloc[:-H].copy()
    sigs: List[Signal] = []
    for ts, row in valid.iterrows():
        enter, weight, conf = policy(row)
        if enter:
            sigs.append({
                "ts": ts.isoformat(),
                "asset": asset,
                "side": "LONG",
                "weight": float(np.clip(weight, 0.0, 1.0)),
                "confidence": float(np.clip(conf, 0.0, 1.0)),
                "horizon_days": H,
            })
    if out_csv and len(sigs) > 0:
        pd.DataFrame(sigs).to_csv(out_csv, index=False)
    return sigs

# Política demo por reglas (mantengo por referencia; NO se usa si entrenas PPO)
def simple_rsi_policy(row: pd.Series) -> Tuple[bool, float, float]:
    rsi = row.get("rsi", np.nan)
    fgz = row.get("fear_greed_z", np.nan)
    domz = row.get("btc_dominance_z", np.nan)
    if pd.notna(rsi) and rsi < 30:
        conf = 0.6
        if pd.notna(fgz) and fgz < -0.5:
            conf += 0.2
        if pd.notna(domz) and domz > -1.0:
            conf += 0.1
        return True, 0.2 + 0.6 * min(1.0, max(0.0, (30 - rsi) / 20.0)), min(1.0, conf)
    return False, 0.0, 0.0

# ---------------------------------
# Pipeline utils
# ---------------------------------
def build_dataset(cfg: Config) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, any]]:
    catalog = DataCatalog(cfg)
    catalog.load()
    panel = catalog.align_daily_panel()
    verify = catalog.verify(panel)

    fb = FeatureBuilder(cfg)
    feat = fb.build(panel)
    return panel, feat, verify

def print_report(verify: Dict[str, any]):
    print("\n=== DATA VERIFICATION REPORT ===")
    for k, v in verify.items():
        if k == "coverage_per_feature":
            print(f"{k}:")
            for kk, vv in v.items():
                print(f"  - {kk}: {vv:.3f}")
        else:
            print(f"{k}: {v}")

# ---------------------------------
# Helpers: split temporal y evaluación
# ---------------------------------
def temporal_split(features: pd.DataFrame, test_start: Optional[str]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    if test_start is None:
        return features, pd.DataFrame(index=pd.DatetimeIndex([], name=features.index.name))
    ts = pd.to_datetime(test_start)
    train = features.loc[features.index < ts].copy()
    test = features.loc[features.index >= ts].copy()
    return train, test

def run_policy_episode(env: CryptoEntryEnv, model: PPO, deterministic: bool = True) -> Dict[str, float]:
    obs, _ = env.reset()
    total_r = 0.0
    done = False
    steps = 0
    while not done:
        action, _ = model.predict(obs, deterministic=deterministic)
        obs, r, terminated, truncated, _ = env.step(int(action))
        done = terminated or truncated
        total_r += float(r)
        steps += 1
    return {"total_reward": total_r, "steps": steps}

# Política que usa el modelo PPO para emitir señales (acción==1 => ENTER)
def ppo_policy_from_env_row(env: CryptoEntryEnv, model: PPO) -> Callable[[pd.Series], Tuple[bool, float, float]]:
    feature_cols = env.feature_cols
    def _policy(row: pd.Series) -> Tuple[bool, float, float]:
        x = row[feature_cols].values.astype(np.float32)
        x = np.nan_to_num(x, nan=0.0, posinf=1e6, neginf=-1e6)
        action, _ = model.predict(x, deterministic=True)
        enter = int(action) == 1
        # Heurística simple para weight/confidence (puedes refinarla con logits/probabilidades si lo deseas)
        weight = 0.5 if enter else 0.0
        conf = 0.7 if enter else 0.0
        return enter, weight, conf
    return _policy

# ---------------------------------
# Main
# ---------------------------------
if __name__ == "__main__":
    print("Config:", asdict(CFG))

    # 1) Datos y features
    try:
        panel, features, verify = build_dataset(CFG)
    except FileNotFoundError as e:
        print("\n[WARN] Missing CSVs — generating synthetic demo data (do NOT use for real training).\n", e)
        dates = pd.date_range("2016-01-01", periods=365*10, freq="D")
        rng = np.random.default_rng(0)
        # synthetic close & volume
        ret = rng.normal(0, 0.02, size=len(dates))
        close = 10000 * np.exp(np.cumsum(ret))
        volume = rng.integers(1_000, 50_000, size=len(dates))
        panel = pd.DataFrame({"close": close, "volume": volume}, index=dates)
        # synthetic macro series
        for name in ("btc_dominance", "total_mcap", "stablecoin_supply", "fear_greed"):
            panel[name] = pd.Series(rng.normal(0, 1, size=len(dates)), index=dates).cumsum() + 50

        fb = FeatureBuilder(CFG)
        features = fb.build(panel)
        verify = {
            "is_daily_uniform": True,
            "start": str(dates.min().date()),
            "end": str(dates.max().date()),
            "num_days": int(len(dates)),
            "coverage_per_feature": features.notna().mean().to_dict(),
            "min_coverage": float(features.notna().mean().min()),
            "meets_min_coverage": True,
            "meets_min_history": True,
        }

    print_report(verify)

    # 2) Baseline (random)
    H = CFG.horizon_days
    stats = random_entry_baseline(features, H, n_trials=500)
    print("\nRandom-entry baseline (forward {}d returns):".format(H), stats)

    # 3) Split temporal
    train_features, test_features = temporal_split(features, CFG.test_start_date)
    if train_features.empty:
        raise RuntimeError("Train set vacío tras el split; ajusta test_start_date.")

    # 4) Construir entornos (Gym + VecEnv)
    if gym is None or spaces is None:
        raise RuntimeError("Gymnasium no disponible. Instala: pip install gymnasium")

    if PPO is None or DummyVecEnv is None:
        raise RuntimeError("stable-baselines3 no disponible. Instala: pip install stable-baselines3 torch")

    train_env = DummyVecEnv([lambda: CryptoEntryEnv(train_features, CFG)])
    # Para evaluación y señales, usamos un env NO vectorizado con test (si hay)
    test_env = CryptoEntryEnv(test_features if not test_features.empty else train_features, CFG)

    # 5) Entrenar PPO (direct policy)
    model = PPO(
        "MlpPolicy",
        train_env,
        learning_rate=CFG.learning_rate,
        n_steps=CFG.n_steps,
        batch_size=CFG.batch_size,
        gamma=CFG.gamma,
        gae_lambda=CFG.gae_lambda,
        ent_coef=CFG.ent_coef,
        clip_range=CFG.clip_range,
        verbose=1,
        seed=CFG.seed,
        policy_kwargs=dict(net_arch=list(CFG.net_arch)),
    )
    model.learn(total_timesteps=CFG.total_timesteps)

    # 6) Evaluación rápida
    train_eval = run_policy_episode(CryptoEntryEnv(train_features, CFG), model)
    print("\n[Eval] Train episode:", train_eval)
    if not test_features.empty:
        test_eval = run_policy_episode(test_env, model)
        print("[Eval] Test episode:", test_eval)
    else:
        print("[Eval] Sin test set; se evaluó sólo en train.")

    # 7) Emitir señales con la política PPO
    ppo_policy = ppo_policy_from_env_row(test_env, model)
    out_path = CFG.signals_out_csv
    sigs = emit_signals(test_features if not test_features.empty else train_features,
                        CFG, ppo_policy, asset=CFG.signal_asset, out_csv=out_path)
    print(f"\nSeñales RL emitidas: {len(sigs)} filas -> {out_path if out_path else '(no CSV)'}")


Config: {'horizon_days': 120, 'trading_calendar': 'UTC', 'price_csv': 'price.csv', 'btc_dominance_csv': 'btc_dominance.csv', 'total_mcap_csv': 'total_mcap.csv', 'stablecoin_supply_csv': 'stablecoin_supply.csv', 'fear_greed_csv': 'fear_greed.csv', 'funding_rate_csv': None, 'open_interest_csv': None, 'dxy_csv': None, 'policy_rate_csv': None, 'strict_common_period': True, 'require_coverage_pct': 0.85, 'min_history_days': 1460, 'rsi_period': 14, 'vol_windows': (7, 30), 'return_windows': (1, 7, 14, 30), 'enter_penalty': -0.0005, 'miss_penalty': 0.0, 'reward_clip': 0.5, 'test_start_date': '2023-01-01', 'total_timesteps': 1000000, 'learning_rate': 0.0003, 'n_steps': 2048, 'batch_size': 256, 'gamma': 0.99, 'gae_lambda': 0.95, 'ent_coef': 0.01, 'clip_range': 0.2, 'seed': 42, 'net_arch': (256, 256), 'signal_asset': 'BTC', 'signals_out_csv': '/mnt/data/agent1_signals_rl.csv'}
Index(['Date', 'Open', 'High', 'Low', 'Close', 'Volume'], dtype='object')

[WARN] Missing CSVs — generating synthetic demo

In [15]:
import os
import time
import math
import requests
import pandas as pd
from datetime import datetime, timezone

# =========================
# Config
# =========================
OUT_BTC_DOM   = "btc_dominance.csv"
OUT_TOTAL_MC  = "total_mcap.csv"
OUT_STABLECAP = "stablecoin_supply.csv"
OUT_FNG       = "fear_greed.csv"

# CoinGecko: usa PRO si pones tu API key en esta variable de entorno
CG_API_KEY  = os.getenv("COINGECKO_API_KEY", "").strip()
# Cambia base según tengas PRO o no:
CG_BASE = "https://pro-api.coingecko.com/api/v3" if CG_API_KEY else "https://api.coingecko.com/api/v3"

# Stablecoins a sumar (puedes ajustar la lista)
STABLE_IDS = [
    "tether",               # USDT
    "usd-coin",             # USDC
    "dai",                  # DAI
    "paxos-standard",       # USDP (antiguo PAX)
    "true-usd",             # TUSD (si no hay datos, se ignora)
    "first-digital-usd",    # FDUSD
    "frax",                 # FRAX
]

HEADERS = {"accept": "application/json"}
if CG_API_KEY:
    HEADERS["x-cg-pro-api-key"] = CG_API_KEY

# Util:
def unixts_to_date(ts_ms):
    return datetime.utcfromtimestamp(ts_ms/1000).date()

def save_series_to_csv(series: pd.Series, out_path: str):
    df = series.reset_index()
    df.columns = ["Date", "value"]
    df.to_csv(out_path, index=False)
    print(f"✅ Guardado {len(df):,} filas en: {out_path}")
    print(f"    Rango: {df['Date'].min()} → {df['Date'].max()}\n")

def get_json(url, params=None, max_retries=3, sleep_sec=1.5):
    for i in range(max_retries):
        r = requests.get(url, params=params or {}, headers=HEADERS, timeout=60)
        if r.status_code == 200:
            return r.json()
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(sleep_sec * (i+1))
            continue
        r.raise_for_status()
    r.raise_for_status()

# =========================
# 1) TOTAL MARKET CAP (USD)
# =========================
# CoinGecko global market cap history
# /global/market_cap_chart?vs_currency=usd&days=max
total_url = f"{CG_BASE}/global/market_cap_chart"
j = get_json(total_url, params={"vs_currency": "usd", "days": "max"})
# Estructura: {"market_cap_chart": {"market_cap": [[ts, val], ...], "volume": [...]} } en PRO;
# en público suele venir {"market_cap_chart":{"market_cap":[...]} } o {"market_cap":[...]}
pairs = None
if "market_cap_chart" in j and "market_cap" in j["market_cap_chart"]:
    pairs = j["market_cap_chart"]["market_cap"]
elif "market_cap" in j:
    pairs = j["market_cap"]
else:
    raise ValueError("Respuesta inesperada para total market cap en CoinGecko.")

total_cap = (
    pd.Series({unixts_to_date(ts): float(val) for ts, val in pairs})
      .sort_index()
      .asfreq("D")              # frecuencia diaria
      .ffill()                  # forward-fill si hay gaps diarios
)
save_series_to_csv(total_cap, OUT_TOTAL_MC)

# =========================
# 2) BTC MARKET CAP (USD) → BTC DOMINANCE %
# =========================
# /coins/bitcoin/market_chart?vs_currency=usd&days=max
btc_url = f"{CG_BASE}/coins/bitcoin/market_chart"
bj = get_json(btc_url, params={"vs_currency": "usd", "days": "max"})
btc_pairs = bj.get("market_caps") or bj.get("market_cap")
if btc_pairs is None:
    # algunos responses usan "market_caps"
    btc_pairs = bj.get("market_caps")
if btc_pairs is None:
    raise ValueError("Respuesta inesperada para BTC market cap en CoinGecko.")

btc_cap = (
    pd.Series({unixts_to_date(ts): float(val) for ts, val in btc_pairs})
      .sort_index()
      .asfreq("D")
      .ffill()
)

# Alinear índices y calcular dominancia
idx = total_cap.index.intersection(btc_cap.index)
btc_cap = btc_cap.reindex(idx)
total_cap2 = total_cap.reindex(idx)
btc_dom = (btc_cap / total_cap2) * 100.0
save_series_to_csv(btc_dom, OUT_BTC_DOM)

# =========================
# 3) STABLECOIN SUPPLY (≈ sum of market caps USD)
# =========================
stable_frames = []
for cid in STABLE_IDS:
    try:
        url = f"{CG_BASE}/coins/{cid}/market_chart"
        sj = get_json(url, params={"vs_currency": "usd", "days": "max"})
        pairs = sj.get("market_caps") or sj.get("market_cap")
        if not pairs:
            continue
        s = pd.Series({unixts_to_date(ts): float(val) for ts, val in pairs}).sort_index()
        stable_frames.append(s)
        time.sleep(1.2)  # cuida rate limit
        print(f"  • OK {cid}: {len(s)} puntos")
    except Exception as e:
        print(f"  • WARN {cid}: {e}")

if not stable_frames:
    raise RuntimeError("No se pudo obtener ninguna serie de market cap de stablecoins.")

stable_sum = pd.concat(stable_frames, axis=1).sum(axis=1).sort_index()
stable_sum = stable_sum.asfreq("D").ffill()
save_series_to_csv(stable_sum, OUT_STABLECAP)

# =========================
# 4) FEAR & GREED (Alternative.me)
# =========================
# API pública: CSV completo con limit=0
# https://api.alternative.me/fng/?limit=0&format=csv
fng_csv_url = "https://api.alternative.me/fng/?limit=0&format=csv"
fng = pd.read_csv(fng_csv_url)
# columnas típicas: timestamp, value, value_classification, time_until_update
# nos quedamos con date y value (int)
fng["Date"] = pd.to_datetime(fng["timestamp"], unit="s", utc=True).dt.tz_convert(None).dt.date
fear_greed = fng.groupby("Date")["value"].last().astype(float).sort_index()
save_series_to_csv(fear_greed, OUT_FNG)

# =========================
# Validaciones rápidas (¿~9+ años?)
# =========================
def span(series):
    return str(min(series.index)), str(max(series.index)), len(series)

print("Coberturas aproximadas:")
print("  Total Mcap   :", span(total_cap))
print("  BTC Cap      :", span(btc_cap))
print("  BTC Dominance:", span(btc_dom))
print("  Stable Sum   :", span(stable_sum))
print("  Fear&Greed   :", span(fear_greed))


HTTPError: 401 Client Error: Unauthorized for url: https://api.coingecko.com/api/v3/global/market_cap_chart?vs_currency=usd&days=max

In [16]:
import os
import time
import requests
import pandas as pd
from datetime import datetime, timezone

# =========================
# Config
# =========================
OUT_BTC_DOM   = "btc_dominance.csv"
OUT_TOTAL_MC  = "total_mcap.csv"
OUT_STABLECAP = "stablecoin_supply.csv"
OUT_FNG       = "fear_greed.csv"

CMC_KEY = os.getenv("CMC_API_KEY", "").strip()
assert CMC_KEY, "Falta CMC_API_KEY en variables de entorno."
CMC_BASE_V1 = "https://pro-api.coinmarketcap.com/v1"
CMC_BASE_V3 = "https://pro-api.coinmarketcap.com/v3"

HEADERS = {
    "Accept": "application/json",
    "X-CMC_PRO_API_KEY": CMC_KEY,
}

# Rango sugerido (~10+ años)
TIME_START = "2015-01-01T00:00:00Z"
TIME_END   = datetime.utcnow().replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
INTERVAL   = "daily"  # 'daily' para series 1D

# Stablecoins a sumar (IDs/símbolos de CMC)
STABLE_SYMBOLS = [
    "USDT",   # Tether
    "USDC",   # USD Coin
    "DAI",    # DAI
    "USDP",   # Pax Dollar
    "TUSD",   # TrueUSD
    "FDUSD",  # First Digital USD
    "FRAX",   # FRAX
]

# =========================
# Helpers
# =========================
def save_series_to_csv(series: pd.Series, out_path: str):
    df = series.reset_index()
    df.columns = ["Date", "value"]
    df.to_csv(out_path, index=False)
    print(f"✅ Guardado {len(df):,} filas en: {out_path}")
    print(f"   Rango: {df['Date'].min()} → {df['Date'].max()}\n")

def get_json(url, params=None, max_retries=3, sleep_sec=1.5):
    params = params or {}
    for i in range(max_retries):
        r = requests.get(url, headers=HEADERS, params=params, timeout=60)
        if r.status_code == 200:
            return r.json()
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(sleep_sec * (i+1))
            continue
        # si da 400/401/403 etc, muestra el body para diagnosticar cuota/plan
        try:
            print("Error body:", r.json())
        except Exception:
            print("Error text:", r.text[:500])
        r.raise_for_status()
    r.raise_for_status()

def to_date(s: str):
    # CMC retorna timestamps ISO; normalizamos a YYYY-MM-DD
    return pd.to_datetime(s, utc=True, errors="coerce").tz_convert(None).date()

# =========================
# 1) GLOBAL METRICS: Total Market Cap (USD) & BTC Dominance (%)
# =========================
# /v1/global-metrics/quotes/historical
gm_url = f"{CMC_BASE_V1}/global-metrics/quotes/historical"
gm = get_json(gm_url, params={
    "time_start": TIME_START,
    "time_end": TIME_END,
    "interval": INTERVAL,
    "convert": "USD",
})
quotes = (gm.get("data") or {}).get("quotes", [])
assert quotes, "Sin datos en global-metrics/quotes/historical. Revisa límites de plan o parámetros."

rows = []
for q in quotes:
    ts = q.get("timestamp")
    dom = q.get("btc_dominance")
    total = (((q.get("quote") or {}).get("USD")) or {}).get("total_market_cap")
    if ts is None:
        continue
    rows.append({"Date": to_date(ts), "btc_dominance": dom, "total_mcap": total})

gdf = pd.DataFrame(rows).dropna(subset=["Date"]).sort_values("Date")
gdf = gdf.groupby("Date").last()  # por si hay duplicados intra-día
btc_dominance = gdf["btc_dominance"].astype(float)
total_mcap = gdf["total_mcap"].astype(float)

save_series_to_csv(btc_dominance, OUT_BTC_DOM)
save_series_to_csv(total_mcap, OUT_TOTAL_MC)

# =========================
# 2) STABLECOIN SUPPLY: suma de market caps (USD) por stablecoin
# =========================
# Para cada símbolo: /v1/cryptocurrency/quotes/historical (interval=daily)
# NOTA: Este endpoint histórico puede requerir plan superior en CMC.
def fetch_symbol_marketcap_history(symbol: str) -> pd.Series:
    url = f"{CMC_BASE_V1}/cryptocurrency/quotes/historical"
    js = get_json(url, params={
        "symbol": symbol,
        "time_start": TIME_START,
        "time_end": TIME_END,
        "interval": INTERVAL,
        "convert": "USD",
    })
    data = js.get("data") or {}
    quotes = data.get("quotes", [])
    if not quotes:
        # Algunos responses agrupan por símbolo en 'data' dict
        # Estructura alternativa: {"data":{"symbol":"USDT","id":..,"quotes":[...]}}
        # Ya manejado arriba, pero si retorna por ID/símbolo distinto, avisar
        print(f"  • WARN {symbol}: sin quotes (posible limit de plan).")
        return pd.Series(dtype=float)
    pairs = []
    for q in quotes:
        ts = q.get("timestamp")
        mc = (((q.get("quote") or {}).get("USD")) or {}).get("market_cap")
        if ts and mc is not None:
            pairs.append((to_date(ts), float(mc)))
    if not pairs:
        print(f"  • WARN {symbol}: sin datos útiles.")
        return pd.Series(dtype=float)
    s = pd.Series(dict(pairs)).sort_index()
    return s

stable_frames = []
for sym in STABLE_SYMBOLS:
    s = fetch_symbol_marketcap_history(sym)
    if not s.empty:
        stable_frames.append(s)
        print(f"  • OK {sym}: {len(s)} puntos")
    time.sleep(0.8)  # ser amable con el rate limit

if not stable_frames:
    raise RuntimeError("No se pudo obtener ninguna serie histórica de market cap para stablecoins. Verifica tu plan CMC.")

stable_sum = pd.concat(stable_frames, axis=1).sum(axis=1).sort_index()
# frecuencia diaria y ffill
stable_sum = stable_sum.asfreq("D").ffill()
save_series_to_csv(stable_sum, OUT_STABLECAP)

# =========================
# 3) FEAR & GREED (CMC)
# =========================
# /v3/fear-and-greed/historical
fng_url = f"{CMC_BASE_V3}/fear-and-greed/historical"
# doc pública muestra 'limit' como parámetro; intentamos con 'limit=0' para todo histórico
fj = get_json(fng_url, params={"limit": 0})
fdata = (fj.get("data") or {}).get("points") or fj.get("data") or []
# El formato puede ser una lista de dicts o un dict 'points'
pairs = []
if isinstance(fdata, dict):
    # e.g., {"2025-08-10": {"value": 61, ...}, ...}
    for k, v in fdata.items():
        val = v.get("value") if isinstance(v, dict) else v
        pairs.append((pd.to_datetime(k).date(), float(val)))
elif isinstance(fdata, list):
    # e.g., [{"timestamp":"...", "value":61}, ...]
    for it in fdata:
        ts = it.get("timestamp") or it.get("timestamp_ms") or it.get("time") or it.get("date")
        val = it.get("value")
        if ts is None or val is None:
            continue
        pairs.append((to_date(ts), float(val)))
else:
    # Algunos despliegues pueden devolver {"data":{"values":[...]}}
    values = (fj.get("data") or {}).get("values", [])
    for it in values:
        ts = it.get("timestamp") or it.get("date")
        val = it.get("value")
        if ts and val is not None:
            pairs.append((to_date(ts), float(val)))

if not pairs:
    raise RuntimeError("Fear & Greed (CMC) sin datos. Verifica tu plan o el formato de respuesta.")

fear_greed = pd.Series(dict(pairs)).sort_index().asfreq("D").ffill()
save_series_to_csv(fear_greed, OUT_FNG)

# =========================
# Validación rápida
# =========================
def span(series):
    return str(series.index.min()), str(series.index.max()), len(series)

print("Coberturas aproximadas (después de limpieza/ffill):")
print("  BTC Dominance:", span(btc_dominance))
print("  Total Mcap   :", span(total_mcap))
print("  Stable Sum   :", span(stable_sum))
print("  Fear&Greed   :", span(fear_greed))


AssertionError: Falta CMC_API_KEY en variables de entorno.

In [14]:
pip install requests

Collecting requests
  Downloading requests-2.32.4-py3-none-any.whl.metadata (4.9 kB)
Collecting charset_normalizer<4,>=2 (from requests)
  Downloading charset_normalizer-3.4.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (36 kB)
Collecting idna<4,>=2.5 (from requests)
  Using cached idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting urllib3<3,>=1.21.1 (from requests)
  Downloading urllib3-2.5.0-py3-none-any.whl.metadata (6.5 kB)
Collecting certifi>=2017.4.17 (from requests)
  Downloading certifi-2025.8.3-py3-none-any.whl.metadata (2.4 kB)
Downloading requests-2.32.4-py3-none-any.whl (64 kB)
Downloading charset_normalizer-3.4.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (151 kB)
Using cached idna-3.10-py3-none-any.whl (70 kB)
Downloading urllib3-2.5.0-py3-none-any.whl (129 kB)
Downloading certifi-2025.8.3-py3-none-any.whl (161 kB)
Installing collected packages: urllib3, idna, charset_normalizer, certi

In [8]:
import pandas as pd

df = pd.read_csv("price.csv")
df.columns = [c.capitalize() if c.lower() != "date" else "Date" for c in df.columns]
df.to_csv("price.csv", index=False)


In [5]:
import pandas as pd
import numpy as np

# Generate 10 years of daily BTC-like data
dates = pd.date_range("2016-01-01", periods=365*10, freq="D")
rng = np.random.default_rng(42)

# Simulate OHLCV with random walk
close = 10000 * np.exp(np.cumsum(rng.normal(0, 0.02, size=len(dates))))
open_ = close * (1 + rng.normal(0, 0.002, size=len(dates)))
high = np.maximum(open_, close) * (1 + rng.normal(0, 0.005, size=len(dates)))
low = np.minimum(open_, close) * (1 - rng.normal(0, 0.005, size=len(dates)))
volume = rng.integers(1_000, 50_000, size=len(dates))

# Assemble DataFrame
df = pd.DataFrame({
    "Date": dates,
    "Open": open_,
    "High": high,
    "Low": low,
    "Close": close,
    "Volume": volume
})

# Save as price.csv in current directory
df.to_csv("price.csv", index=False)
print("price.csv created with shape:", df.shape)


price.csv created with shape: (3650, 6)


In [18]:
import requests
import pandas as pd

# Endpoint JSON (histórico completo)
url = "https://api.alternative.me/fng/?limit=0"

r = requests.get(url, timeout=60)
r.raise_for_status()
data = r.json()["data"]  # lista de dicts

# Construir DataFrame robusto (algunos campos pueden faltar)
df = pd.DataFrame([{
    "timestamp": int(d.get("timestamp")),
    "value": float(d.get("value")),
} for d in data if d.get("timestamp") is not None and d.get("value") is not None])

# Convertir a fecha diaria y ordenar
df["Date"] = pd.to_datetime(df["timestamp"], unit="s", utc=True).dt.tz_convert(None).dt.date
daily = (
    df.groupby("Date")["value"]
      .last()        # usa .mean() si prefieres promediar intradía
      .sort_index()
)

# Guardar CSV en el formato que espera tu pipeline: Date,value
out_path = "fear_greed.csv"
daily.reset_index().to_csv(out_path, index=False)
print(f"✅ Guardado {len(daily):,} filas en: {out_path}")
print(f"   Rango: {daily.index.min()} → {daily.index.max()}")

# Vista rápida
display(daily.tail())


✅ Guardado 2,747 filas en: fear_greed.csv
   Rango: 2018-02-01 → 2025-08-13


Date
2025-08-09    67.0
2025-08-10    69.0
2025-08-11    70.0
2025-08-12    68.0
2025-08-13    73.0
Name: value, dtype: float64

In [20]:
import requests
import pandas as pd

# ---- Config ----
COIN_ID = "tether"             # USDT en CoinGecko
VS_CURRENCY = "usd"
DAYS = "max"                    # histórico máximo
OUT_USD = "stablecoin_supply.csv"          # USD (para tu pipeline actual)
OUT_UNITS = "stablecoin_supply_units.csv"  # unidades de USDT (tokens)

# ---- Descarga desde CoinGecko ----
url = f"https://api.coingecko.com/api/v3/coins/{COIN_ID}/market_chart"
params = {"vs_currency": VS_CURRENCY, "days": DAYS}
r = requests.get(url, params=params, timeout=60)
r.raise_for_status()
j = r.json()

# 'market_caps' y 'prices' son listas de [timestamp_ms, valor]
mc_pairs = j.get("market_caps", [])
px_pairs = j.get("prices", [])

if not mc_pairs or not px_pairs:
    raise RuntimeError("Respuesta inesperada de CoinGecko para USDT (faltan market_caps o prices).")

# ---- A DataFrames ----
mc = pd.DataFrame(mc_pairs, columns=["ts", "market_cap"])
px = pd.DataFrame(px_pairs, columns=["ts", "price"])

# Fecha diaria (UTC) y merge
mc["Date"] = pd.to_datetime(mc["ts"], unit="ms", utc=True).dt.tz_convert(None).dt.date
px["Date"] = pd.to_datetime(px["ts"], unit="ms", utc=True).dt.tz_convert(None).dt.date

df = (mc[["Date", "market_cap"]]
      .merge(px[["Date", "price"]], on="Date", how="inner")
      .sort_values("Date"))

# Relleno diario por si hay huecos
df = (df.set_index(pd.to_datetime(df["Date"]))
        .asfreq("D")
        .ffill()
        .reset_index(drop=True))
df["Date"] = df["Date"].astype(str)

# ---- Salidas ----
# 1) USD: usar market cap como "supply" en USD (consistente con tu pipeline)
df[["Date", "market_cap"]].rename(columns={"market_cap": "value"}).to_csv(OUT_USD, index=False)

# 2) Unidades: supply ≈ market_cap / price
df["units"] = df["market_cap"] / df["price"]
df[["Date", "units"]].rename(columns={"units": "value"}).to_csv(OUT_UNITS, index=False)

print(f"✅ Guardado USD:   {OUT_USD}  ({len(df):,} filas)")
print(f"✅ Guardado UNITS: {OUT_UNITS} ({len(df):,} filas)")
print(f"Rango: {df['Date'].min()} → {df['Date'].max()}")


HTTPError: 401 Client Error: Unauthorized for url: https://api.coingecko.com/api/v3/coins/tether/market_chart?vs_currency=usd&days=max