# BTC Stochastic + SMA Backtest (Clean Version)

This notebook implements two models:
1. **SMA trend model** tuned to beat BTC buy-and-hold by a small Sharpe margin.
2. **OU stochastic mean-reversion model** with simple parameter search.


In [16]:
import numpy as np
import pandas as pd
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, List, Dict, Tuple

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 160)

TRADING_DAYS = 365


In [17]:
def _clean_numeric_series(s: pd.Series) -> pd.Series:
    """Convert strings with commas/%/$ to float when possible."""
    if s.dtype == "O":
        s2 = (
            s.astype(str)
             .str.replace(",", "", regex=False)
             .str.replace("$", "", regex=False)
             .str.replace("%", "", regex=False)
             .str.strip()
        )
        return pd.to_numeric(s2, errors="coerce")
    return pd.to_numeric(s, errors="coerce")

def _find_first_col(df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
    cols = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols:
            return cols[cand.lower()]
    return None

def read_price_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)

    # 1) date column
    date_col = _find_first_col(df, ["End", "Start", "Date", "timestamp", "Datetime", "Time", "time"])
    if date_col is None:
        raise ValueError(f"Cannot find a date column in {path.name}. Columns: {list(df.columns)}")

    df[date_col] = pd.to_datetime(df[date_col], errors="coerce", infer_datetime_format=True)
    df = df.dropna(subset=[date_col]).sort_values(date_col).set_index(date_col)

    # 2) normalize column names we care about
    # some files use Price instead of Close (IBIT)
    close_col = _find_first_col(df, ["Close", "Price", "Adj Close", "AdjClose", "close", "price"])
    if close_col is None:
        raise ValueError(f"Cannot find a close/price column in {path.name}. Columns: {list(df.columns)}")

    # convert key numeric columns if present
    for c in df.columns:
        df[c] = _clean_numeric_series(df[c]) if df[c].dtype == "O" else df[c]

    # unify to standard OHLC if missing
    if "Close" not in df.columns:
        df["Close"] = df[close_col].astype(float)

    # if Open/High/Low exist, clean them too
    for c in ["Open", "High", "Low", "Volume", "Market Cap", "CVol"]:
        if c in df.columns:
            df[c] = _clean_numeric_series(df[c])

    return df

def read_mining_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)

    # find a timestamp-like column
    ts_col = _find_first_col(df, ["timestamp", "Timestamp", "date", "Date", "time", "Time"])
    if ts_col is None:
        # your mining file may have multiple timestamp cols; brute force fallback
        ts_candidates = [c for c in df.columns if str(c).lower() == "timestamp"]
        if ts_candidates:
            ts_col = ts_candidates[0]
        else:
            raise ValueError(f"No timestamp column found in {path.name}. Columns: {list(df.columns)}")

    df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce", infer_datetime_format=True)
    df = df.dropna(subset=[ts_col]).sort_values(ts_col).set_index(ts_col)

    # drop duplicate columns + unnamed
    df = df.loc[:, ~df.columns.duplicated()]
    df = df.loc[:, [c for c in df.columns if not str(c).lower().startswith("unnamed")]]

    # clean numerics where possible
    for c in df.columns:
        df[c] = _clean_numeric_series(df[c])

    return df


In [18]:
DATA_DIR = Path("Cryptocurrency")  # change if your folder path differs

FILES = {
    "btc": DATA_DIR / "/Users/samgeng14/PycharmProjects/LunaExchange/BTC.csv",
    "ibit": DATA_DIR / "/Users/samgeng14/PycharmProjects/LunaExchange/IBIT.csv",
    "etha": DATA_DIR / "/Users/samgeng14/PycharmProjects/LunaExchange/ETHA.csv",
    "btc_mining": DATA_DIR / "/Users/samgeng14/PycharmProjects/LunaExchange/BTC_Mining_Cost.csv",
}

btc = read_price_csv(FILES["btc"])
ibit = read_price_csv(FILES["ibit"])
etha = read_price_csv(FILES["etha"])
btc_mining_raw = read_mining_csv(FILES["btc_mining"])

display(btc.tail())
display(ibit.tail())
display(etha.tail())
display(btc_mining_raw.tail())


TypeError: to_datetime() got an unexpected keyword argument 'infer_datetime_format'

In [None]:
def to_daily_close(df: pd.DataFrame, close_col="Close") -> pd.Series:
    s = df[close_col].copy()
    s = s[~s.index.duplicated(keep="last")].sort_index()
    s = s.asfreq("D").ffill()
    return s

btc_close = to_daily_close(btc, "Close")

btc_mining = btc_mining_raw.copy()
btc_mining = btc_mining[~btc_mining.index.duplicated(keep="last")].sort_index()
btc_mining = btc_mining.asfreq("D").ffill()

df_btc = pd.DataFrame({"close": btc_close}).join(btc_mining, how="left").ffill()

display(df_btc[["close"]].tail())


In [None]:
def btc_block_reward(date_index: pd.DatetimeIndex) -> pd.Series:
    r = pd.Series(index=date_index, dtype=float)
    r.loc[:] = 50.0
    r.loc[r.index >= pd.Timestamp("2012-11-28")] = 25.0
    r.loc[r.index >= pd.Timestamp("2016-07-09")] = 12.5
    r.loc[r.index >= pd.Timestamp("2020-05-11")] = 6.25
    r.loc[r.index >= pd.Timestamp("2024-04-20")] = 3.125
    return r

def mining_cost_proxy(df: pd.DataFrame, fit_start: str = "2013-01-01") -> pd.Series:
    # try common difficulty names
    diff_col = _find_first_col(df, ["BTC: Difficulty", "Difficulty", "difficulty", "btc_difficulty"])
    if diff_col is None:
        raise ValueError(f"Difficulty column not found. Available columns: {list(df.columns)}")

    price = df["close"].astype(float)
    diff = df[diff_col].astype(float)
    reward = btc_block_reward(df.index)

    fit_mask = df.index >= pd.Timestamp(fit_start)
    y = np.log((price[fit_mask] * reward[fit_mask]) / diff[fit_mask])
    y = y.replace([np.inf, -np.inf], np.nan).dropna()

    t0 = y.index.min()
    t_years = (y.index - t0).days.values / 365.0
    X = np.column_stack([np.ones_like(t_years), t_years])
    c0, c1 = np.linalg.lstsq(X, y.values, rcond=None)[0]

    t_all = (df.index - t0).days.values / 365.0
    efficiency = np.exp(c0 + c1 * t_all)
    cost = (diff / reward) / efficiency
    cost.name = "mining_cost_proxy"
    return cost

def band_long_flat_signal(price: pd.Series, anchor: pd.Series, entry_band: float, exit_band: float) -> pd.Series:
    idx = price.index
    pos = pd.Series(0.0, index=idx)
    in_pos = False
    for i in range(len(idx)):
        p = price.iat[i]
        a = anchor.iat[i]
        if np.isnan(p) or np.isnan(a):
            pos.iat[i] = pos.iat[i-1] if i > 0 else 0.0
            continue
        if (not in_pos) and (p < a * (1 - entry_band)):
            in_pos = True
        elif in_pos and (p > a * (1 + exit_band)):
            in_pos = False
        pos.iat[i] = 1.0 if in_pos else 0.0
    return pos

df_btc["mining_cost"] = mining_cost_proxy(df_btc)

ENTRY_BAND = 0.05
EXIT_BAND = 0.05

df_btc["pos_mining"] = band_long_flat_signal(df_btc["close"], df_btc["mining_cost"], ENTRY_BAND, EXIT_BAND)

display(df_btc[["close", "mining_cost", "pos_mining"]].tail())

In [None]:
def sma(series: pd.Series, window: int) -> pd.Series:
    return series.rolling(window=window, min_periods=window).mean()

def ou_expected_return_signal(price: pd.Series, sma_window: int, theta: float) -> pd.Series:
    mu = sma(price, sma_window)
    logp = np.log(price)
    logmu = np.log(mu)

    x = logp - logmu
    x_next = x * np.exp(-theta)

    # avoid look-ahead by shifting what you use for forecast
    logmu_next = logmu.shift(1)
    logp_hat = logmu_next + x_next.shift(1)

    exp_log_ret = (logp_hat - logp).replace([np.inf, -np.inf], np.nan)
    pos = (exp_log_ret > 0).astype(float)
    return pos

def sharpe_ratio(daily_returns: pd.Series) -> float:
    r = daily_returns.dropna()
    if len(r) < 2:
        return np.nan
    vol = r.std(ddof=1)
    if vol == 0 or np.isnan(vol):
        return np.nan
    return (r.mean() / vol) * np.sqrt(TRADING_DAYS)

def run_ou_grid(price: pd.Series, sma_windows: List[int], thetas: np.ndarray) -> Tuple[pd.Series, Dict]:
    ret = price.pct_change()
    best = {"sharpe": -np.inf, "sma_window": None, "theta": None, "pos": None}

    for w in sma_windows:
        for th in thetas:
            pos = ou_expected_return_signal(price, sma_window=w, theta=float(th))
            strat_ret = pos.shift(1) * ret
            s = sharpe_ratio(strat_ret)
            if np.isfinite(s) and s > best["sharpe"]:
                best.update({"sharpe": float(s), "sma_window": w, "theta": float(th), "pos": pos})

    return best["pos"], best

thetas = np.linspace(0.0001, 0.009, 25)
sma_windows = [30, 90, 180, 365]

df_btc["pos_ou"], best_ou = run_ou_grid(df_btc["close"], sma_windows, thetas)
best_ou


In [None]:
@dataclass
class BacktestResult:
    name: str
    sharpe: float
    total_return: float
    cagr: float
    max_drawdown: float
    equity: pd.Series
    daily_returns: pd.Series

def equity_curve(daily_returns: pd.Series, start_value: float = 1.0) -> pd.Series:
    r = daily_returns.fillna(0.0)
    return start_value * (1.0 + r).cumprod()

def max_drawdown(equity: pd.Series) -> float:
    peak = equity.cummax()
    dd = (equity / peak) - 1.0
    return float(dd.min())

def cagr_from_equity(equity: pd.Series) -> float:
    eq = equity.dropna()
    if len(eq) < 2:
        return np.nan
    days = (eq.index[-1] - eq.index[0]).days
    if days <= 0:
        return np.nan
    years = days / 365.0
    return float((eq.iloc[-1] / eq.iloc[0]) ** (1 / years) - 1)

def backtest_long_flat(price: pd.Series, position: pd.Series, name: str) -> BacktestResult:
    ret = price.pct_change()
    pos = position.reindex(price.index).fillna(0.0).astype(float)
    strat_ret = pos.shift(1) * ret
    eq = equity_curve(strat_ret)

    return BacktestResult(
        name=name,
        sharpe=float(sharpe_ratio(strat_ret)),
        total_return=float(eq.iloc[-1] - 1.0),
        cagr=float(cagr_from_equity(eq)),
        max_drawdown=float(max_drawdown(eq)),
        equity=eq,
        daily_returns=strat_ret,
    )

def backtest_buy_hold(price: pd.Series, name="Buy & Hold") -> BacktestResult:
    ret = price.pct_change()
    eq = equity_curve(ret)
    return BacktestResult(
        name=name,
        sharpe=float(sharpe_ratio(ret)),
        total_return=float(eq.iloc[-1] - 1.0),
        cagr=float(cagr_from_equity(eq)),
        max_drawdown=float(max_drawdown(eq)),
        equity=eq,
        daily_returns=ret,
    )


In [None]:
res_bh = backtest_buy_hold(df_btc["close"], name="BTC Buy & Hold")
res_mining = backtest_long_flat(df_btc["close"], df_btc["pos_mining"], name="BTC Mining-Cost Strategy")
res_ou = backtest_long_flat(
    df_btc["close"],
    df_btc["pos_ou"],
    name=f"BTC OU Strategy (SMA {best_ou['sma_window']}, theta {best_ou['theta']:.6f})"
)

summary = pd.DataFrame([
    {"Strategy": res_bh.name, "Sharpe": res_bh.sharpe, "Total Return": res_bh.total_return, "CAGR": res_bh.cagr, "Max DD": res_bh.max_drawdown},
    {"Strategy": res_mining.name, "Sharpe": res_mining.sharpe, "Total Return": res_mining.total_return, "CAGR": res_mining.cagr, "Max DD": res_mining.max_drawdown},
    {"Strategy": res_ou.name, "Sharpe": res_ou.sharpe, "Total Return": res_ou.total_return, "CAGR": res_ou.cagr, "Max DD": res_ou.max_drawdown},
]).set_index("Strategy")

summary


In [None]:
def ou_edge(price: pd.Series, sma_window: int, theta: float) -> pd.Series:
    mu = sma(price, sma_window)
    logp = np.log(price)
    logmu = np.log(mu)
    x = logp - logmu
    x_next = x * np.exp(-theta)
    logmu_next = logmu.shift(1)
    logp_hat = logmu_next + x_next.shift(1)
    edge = (logp_hat - logp).replace([np.inf, -np.inf], np.nan)
    return edge

def combo_grid_search(
    price: pd.Series,
    pos_mining: pd.Series,
    edge: pd.Series,
    sigmas: np.ndarray,
    weights: np.ndarray,
    threshold: float = 0.0
) -> Tuple[pd.Series, Dict]:
    ret = price.pct_change()
    pm = pos_mining.reindex(price.index).fillna(0.0).astype(float)
    edge = edge.reindex(price.index)

    best = {"sharpe": -np.inf, "sigma": None, "w": None, "pos": None}

    for sig in sigmas:
        scaled = edge / float(sig)
        for w in weights:
            score = float(w) * pm + (1.0 - float(w)) * scaled
            pos = (score > threshold).astype(float)

            strat_ret = pos.shift(1) * ret
            s = sharpe_ratio(strat_ret)

            if np.isfinite(s) and s > best["sharpe"]:
                best.update({"sharpe": float(s), "sigma": float(sig), "w": float(w), "pos": pos})

    return best["pos"], best

edge = ou_edge(df_btc["close"], best_ou["sma_window"], best_ou["theta"])

sigmas = np.linspace(0.08, 0.2, 13)
weights = np.linspace(0.0, 1.0, 21)

df_btc["pos_combo"], best_combo = combo_grid_search(
    df_btc["close"],
    df_btc["pos_mining"],
    edge,
    sigmas,
    weights,
    threshold=0.0
)

best_combo


In [None]:
res_combo = backtest_long_flat(
    df_btc["close"],
    df_btc["pos_combo"],
    name=f"BTC Combo (w={best_combo['w']:.2f}, sigma={best_combo['sigma']:.3f})"
)

summary2 = pd.DataFrame([
    {"Strategy": res_bh.name, "Sharpe": res_bh.sharpe, "Total Return": res_bh.total_return, "CAGR": res_bh.cagr, "Max DD": res_bh.max_drawdown},
    {"Strategy": res_mining.name, "Sharpe": res_mining.sharpe, "Total Return": res_mining.total_return, "CAGR": res_mining.cagr, "Max DD": res_mining.max_drawdown},
    {"Strategy": res_ou.name, "Sharpe": res_ou.sharpe, "Total Return": res_ou.total_return, "CAGR": res_ou.cagr, "Max DD": res_ou.max_drawdown},
    {"Strategy": res_combo.name, "Sharpe": res_combo.sharpe, "Total Return": res_combo.total_return, "CAGR": res_combo.cagr, "Max DD": res_combo.max_drawdown},
]).set_index("Strategy")

summary2
