# Import Libraries

In [47]:
import pandas as pd 
import numpy as np
import logging
from datetime import datetime
import requests
from io import StringIO
from scipy.stats import beta

# Global Configuration

In [48]:
# Back-test date range
BACKTEST_START     = '2021-01-01' 
BACKTEST_END       = '2025-11-07' 
# BACKTEST_START     = '2011-06-01' 
# BACKTEST_END       = '2025-06-01' 

# Rolling window length (in months)
INVESTMENT_WINDOW  = 12

# Step frequency for window start-dates: 'Daily', 'Weekly' or 'Monthly'
PURCHASE_FREQ      = 'Daily'

# Minimum per-period weight (to avoid zero allocations)
MIN_WEIGHT         = 1e-5

PURCHASE_FREQ_TO_OFFSET = {
    'Daily':   '1D',
    'Weekly':  '7D',
    'Monthly': '1M',
}

In [49]:
logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Download BTC Data

In [50]:
try:
    from coinmetrics.api_client import CoinMetricsClient
except ImportError:
    raise ImportError("coinmetrics.api_client module is required. Install it via pip:\n\n    pip install coinmetrics-api-client")

def extract_btc_data_to_csv(local_path='btc_data.csv'):
    # Coin Metrics BTC CSV (raw GitHub URL)
    url = "https://raw.githubusercontent.com/coinmetrics/data/master/csv/btc.csv"
    
    # Download the content
    response = requests.get(url)
    response.raise_for_status()  # raises an error for bad responses
    
    # Parse CSV content
    btc_df = pd.read_csv(StringIO(response.text))

    btc_df['time'] = pd.to_datetime(btc_df['time']).dt.normalize()
    btc_df['time'] = btc_df['time'].dt.tz_localize(None)
    btc_df.set_index('time', inplace=True)

    btc_df.to_csv(local_path)
    
    # Show the df
    btc_df

btc_df = extract_btc_data_to_csv("btc_data.csv")

# Load Data

In [51]:
def load_data():
    df = pd.read_csv("btc_data.csv", index_col=0, parse_dates=True)
    df = df.loc[~df.index.duplicated(keep='last')]
    df = df.sort_index()
    return df

def validate_price_data(df):
    if df.empty or 'PriceUSD' not in df.columns:
        raise ValueError("Invalid BTC price data.")
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("Index must be datetime.")

# Strategy

## Hyper Parameters

In [52]:
_FULL_FEATURES = None

MIN_W = 1e-5
WINS = [30, 90, 180, 365, 1461]
FEATS = [f"z{w}" for w in WINS]
PROTOS = [(0.5, 5.0), (1.0, 1.0), (5.0, 0.5)]

# Optimized theta parameters from the final model run
THETA = np.array([
    1.3507, 1.073, -1.226, 2.5141, 2.9946, -0.4083, -0.1082, -0.6809,
    0.3465, -0.6804, -2.9974, -2.9991, -1.2658, -0.368, 0.7567, -1.9627,
    -1.9124, 2.9983, 0.5704, 0.0, 0.8669, 1.2546, 5.0
])

## Helper Functions

In [53]:
def softmax(x: np.ndarray) -> np.ndarray:
    """Converts a vector of scores into a probability distribution."""
    ex = np.exp(x - x.max())
    return ex / ex.sum()

def allocate_sequential(raw: np.ndarray) -> np.ndarray:
    """Strict left-to-right 'drain' allocator."""
    n = len(raw)
    floor = n * MIN_W
    rem_budget, rem_raw = 1 - floor, raw.sum()
    w = np.empty_like(raw)
    for i, x in enumerate(raw):
        share = 0 if rem_raw == 0 else (x / rem_raw) * rem_budget
        w[i] = MIN_W + share
        rem_budget -= share
        rem_raw -= x
    return w / w.sum()

def beta_mix_pdf(n: int, mix: np.ndarray) -> np.ndarray:
    """Generates a smooth baseline curve from a mixture of Beta distributions."""
    t = np.linspace(0.5 / n, 1 - 0.5 / n, n)
    return (mix[0] * beta.pdf(t, *PROTOS[0]) +
            mix[1] * beta.pdf(t, *PROTOS[1]) +
            mix[2] * beta.pdf(t, *PROTOS[2])) / n

def zscore(s: pd.Series, win: int) -> pd.Series:
    """Calculates the rolling z-score for a given series and window."""
    m = s.rolling(win, win // 2).mean()
    sd = s.rolling(win, win // 2).std()
    return ((s - m) / sd).fillna(0)

## Main Strategy Functions

In [54]:
def construct_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculates features on the full historical data ONCE and returns the
    relevant slice. This robustly handles calls from different boilerplate
    functions and avoids boundary errors that cause leakage.
    """
    global _FULL_FEATURES

    # Only compute the full feature set if it hasn't been done yet.
    if _FULL_FEATURES is None:
        try:
            # Assumes 'btc_data.csv' is in the same directory.
            full_price_df = pd.read_csv("btc_data.csv", index_col=0, parse_dates=True)
        except FileNotFoundError:
            raise FileNotFoundError("btc_data.csv not found. Please ensure it's in the correct directory.")
        
        # Select only the PriceUSD column before doing anything else.
        full_price_df = full_price_df[['PriceUSD']]
        
        # We need history from before the backtest start date for rolling windows.
        full_price_df = full_price_df.loc["2020-02-18":]
        # full_price_df = full_price_df.loc["2010-07-18":] 

        log_prices = np.log(full_price_df['PriceUSD'])
        
        z_all = pd.DataFrame({f"z{w}": zscore(log_prices, w).clip(-4, 4) for w in WINS}, index=log_prices.index)
        
        # The strategy uses lagged features to avoid look-ahead bias.
        z_lag = z_all.shift(1).fillna(0)
        
        _FULL_FEATURES = full_price_df.join(z_lag)

    # Return the portion of the pre-computed features that matches the input index.
    return _FULL_FEATURES.reindex(df.index).fillna(0)


def compute_weights(df_window: pd.DataFrame) -> pd.Series:
    """
    Given a slice of data, computes portfolio weights that sum to 1.
    Minimal change: use MSTR buy dates as the only signal.
      - If the window has ≥1 signal day: everyone gets MIN_WEIGHT,
        leftover is split equally across signal days.
      - If 0 signal days: daily uniform.
    Always returns weights >= MIN_WEIGHT and sum = 1.
    """
    if df_window.empty:
        return pd.Series(dtype=float)

    # Keep feature pipeline untouched
    feat_slice = construct_features(df_window)

    idx = pd.DatetimeIndex(feat_slice.index).normalize()
    n = len(idx)

    mstr_dates = set(
        pd.to_datetime(
            pd.read_csv("mstr_buy_signal.csv")["purchase_date"],
            errors="coerce"
        ).dropna().dt.normalize()
    )

    # ---- signal days in this window ----
    signal_mask = idx.isin(mstr_dates)
    signal_days = np.where(signal_mask)[0]
    n_signal = len(signal_days)

    # ---- no signal → uniform ----
    if n_signal == 0:
        w = np.full(n, 1.0 / n, dtype=float)
        w = np.clip(w, MIN_WEIGHT, None)
        w = w / w.sum()
        return pd.Series(w, index=feat_slice.index)

    # ---- has signal → MIN_WEIGHT + leftover equally on signals ----
    w = np.full(n, MIN_WEIGHT, dtype=float)
    base = n * MIN_WEIGHT
    leftover = max(0.0, 1.0 - base)
    add_per_signal = leftover / n_signal if leftover > 0 else 0.0
    w[signal_days] += add_per_signal

    # numerical safety
    w = np.clip(w, MIN_WEIGHT, None)

    return pd.Series(w, index=feat_slice.index)


# Run Strategy

In [55]:
def _make_window_label(window_start: pd.Timestamp, window_end: pd.Timestamp) -> str:
    """
    Format "YYYY-MM-DD → YYYY-MM-DD" for a rolling window.
    """
    start_str = pd.to_datetime(window_start).strftime("%Y-%m-%d")
    end_str   = pd.to_datetime(window_end).strftime("%Y-%m-%d")
    return f"{start_str} → {end_str}"

In [56]:
def compute_cycle_spd(
    dataframe: pd.DataFrame,
    strategy_function
) -> pd.DataFrame:
    """
    Compute sats‐per‐dollar (SPD) stats over rolling windows.

    - Uses full‐history features (no look‐ahead).
    - Window length = INVESTMENT_WINDOW months.
    - Step every PURCHASE_FREQ.
    - Returns a DataFrame indexed by window label, with:
        min_sats_per_dollar, max_sats_per_dollar,
        uniform_sats_per_dollar, dynamic_sats_per_dollar,
        uniform_percentile, dynamic_percentile, excess_percentile.
    """
    # 1) Precompute full-history features & restrict to backtest
    full_feat = construct_features(dataframe).loc[BACKTEST_START:BACKTEST_END]

    # 2) Window parameters
    window_offset  = pd.DateOffset(months=INVESTMENT_WINDOW)
    step_freq      = PURCHASE_FREQ_TO_OFFSET[PURCHASE_FREQ]

    results = []
    for window_start in pd.date_range(
        start=pd.to_datetime(BACKTEST_START),
        end=pd.to_datetime(BACKTEST_END) - window_offset,
        freq=step_freq
    ):
        window_end  = window_start + window_offset
        feat_slice  = full_feat.loc[window_start:window_end]
        price_slice = dataframe["PriceUSD"].loc[window_start:window_end]

        if price_slice.empty:
            continue

        label       = _make_window_label(window_start, window_end)
        inv_price   = (1.0 / price_slice) * 1e8  # sats per dollar

        # Compute weights on this slice
        weight_slice = strategy_function(feat_slice)

        # Uniform vs. dynamic SPD
        uniform_spd = inv_price.mean()
        dynamic_spd = (weight_slice * inv_price).sum()

        # Min/max for percentile scaling
        min_spd = inv_price.min()   # low price → high SPD
        max_spd = inv_price.max()   # high price → low SPD
        span    = max_spd - min_spd

        uniform_pct = (uniform_spd - min_spd) / span * 100
        dynamic_pct = (dynamic_spd - min_spd) / span * 100

        results.append({
            "window":                   label,
            "min_sats_per_dollar":      min_spd,
            "max_sats_per_dollar":      max_spd,
            "uniform_sats_per_dollar":  uniform_spd,
            "dynamic_sats_per_dollar":  dynamic_spd,
            "uniform_percentile":       uniform_pct,
            "dynamic_percentile":       dynamic_pct,
            "excess_percentile":        dynamic_pct - uniform_pct,
        })

    return pd.DataFrame(results).set_index("window")

# Backtest

In [57]:
def backtest_dynamic_dca(
    dataframe: pd.DataFrame,
    strategy_function,
    *,
    strategy_label: str = "strategy"
) -> pd.DataFrame:
    """
    1) Runs compute_cycle_spd(...)
    2) Prints aggregated min/max/mean/median of dynamic SPD
    3) Prints aggregated SPD percentiles
    4) Computes & prints exponentially-decayed average SPD and percentile
    5) Returns the full SPD table.

    Exponential decay:
      • decay_rate ∈ (0,1): lower → faster decay
      • most recent window has highest weight
      • weights normalized to sum to 1
    """
    # --- run the rolling-window SPD backtest
    spd_table   = compute_cycle_spd(dataframe, strategy_function)
    dynamic_spd = spd_table["dynamic_sats_per_dollar"]
    dynamic_pct = spd_table["dynamic_percentile"]

    # --- print standard aggregated metrics
    print(f"\nAggregated Metrics for {strategy_label}:")
    print("Dynamic Sats-per-Dollar:")
    for stat in ("min", "max", "mean", "median"):
        val = getattr(dynamic_spd, stat)()
        print(f"  {stat}: {val:.2f}")

    print("\nDynamic SPD Percentiles:")
    for stat in ("min", "max", "mean", "median"):
        val = getattr(dynamic_pct, stat)()
        print(f"  {stat}: {val:.2f}%")

    # --- exponential decay weighting
    decay_rate = 0.9
    N = len(dynamic_spd)
    # weight for window i (0 = oldest, N-1 = newest)
    raw_weights = np.array([decay_rate ** (N - 1 - i) for i in range(N)])
    exp_weights = raw_weights / raw_weights.sum()

    # --- compute decayed averages
    exp_avg_spd = (dynamic_spd.values * exp_weights).sum()
    exp_avg_pct = (dynamic_pct.values * exp_weights).sum()

    # --- print decayed metrics
    print(f"\nExponential-Decay Average SPD: {exp_avg_spd:.2f}")
    print(f"Exponential-Decay Average SPD Percentile: {exp_avg_pct:.2f}%")

    return spd_table

In [58]:
def check_strategy_submission_ready(
    dataframe: pd.DataFrame,
    strategy_function
) -> None:
    """
    Sanity-check that `strategy_function`:
      1. Uses no future data (forward-leakage test).
      2. Produces weights ≥ MIN_WEIGHT.
      3. Sums to 1.0 in each rolling window.
      4. Outperforms uniform DCA in at least 50% of rolling windows.
    """
    passed = True

    # ──────────────────────────────────────────────────────────
    # 1) Forward-leakage test
    # ──────────────────────────────────────────────────────────
    backtest_df  = dataframe.loc[BACKTEST_START:BACKTEST_END]
    full_weights = strategy_function(dataframe) \
                       .reindex(backtest_df.index) \
                       .fillna(0.0)

    step_dates = max(len(backtest_df) // 50, 1)
    probe_dates = backtest_df.index[::step_dates]

    for probe in probe_dates:
        masked = dataframe.copy()
        masked.loc[masked.index > probe, :] = np.nan

        masked_wt = strategy_function(masked) \
                        .reindex(full_weights.index) \
                        .fillna(0.0)

        if not np.isclose(masked_wt.loc[probe],
                          full_weights.loc[probe],
                          rtol=1e-9, atol=1e-12):
            delta = abs(masked_wt.loc[probe] - full_weights.loc[probe])
            print(f"[{probe.date()}] ❌ Forward-leakage detected (Δ={delta:.2e})")
            passed = False
            break

    # ──────────────────────────────────────────────────────────
    # 2) Weight checks per rolling window
    # ──────────────────────────────────────────────────────────
    window_offset = pd.DateOffset(months=INVESTMENT_WINDOW)
    step_freq     = PURCHASE_FREQ_TO_OFFSET[PURCHASE_FREQ]

    for window_start in pd.date_range(
        start=pd.to_datetime(BACKTEST_START),
        end=pd.to_datetime(BACKTEST_END) - window_offset,
        freq=step_freq
    ):
        window_end = window_start + window_offset
        label      = _make_window_label(window_start, window_end)

        w_slice = strategy_function(dataframe.loc[window_start:window_end])

        if (w_slice <= 0).any():
            print(f"[{label}] ❌ Non-positive weights detected.")
            passed = False

        if (w_slice < MIN_WEIGHT).any():
            print(f"[{label}] ❌ Weight below MIN_WEIGHT = {MIN_WEIGHT}.")
            passed = False

        total = w_slice.sum()
        if not np.isclose(total, 1.0, rtol=1e-5, atol=1e-8):
            print(f"[{label}] ❌ Sum-to-1 check failed: {total:.4f}")
            passed = False

    # ──────────────────────────────────────────────────────────
    # 3) Performance vs. Uniform DCA (RELAXED)
    # ──────────────────────────────────────────────────────────
    spd_table = compute_cycle_spd(dataframe, strategy_function)

    underperf_records = []
    for label, row in spd_table.iterrows():
        dp, up = row["dynamic_percentile"], row["uniform_percentile"]
        if dp < up:
            underperf_records.append({
                "Window": label,
                "Dynamic Percentile": dp,
                "Uniform Percentile": up,
                "Delta": dp - up
            })

    total = len(spd_table)
    failed = len(underperf_records)
    pass_ratio = (total - failed) / total

    if underperf_records:
        df_underperf = pd.DataFrame(underperf_records)
        print("\n⚠️ Windows where strategy underperformed Uniform DCA:")
        display(df_underperf)

    print(f"\nSummary: Your strategy underperformed uniform DCA in {failed} out of {total} windows "
          f"({100 * pass_ratio:.2f}% win rate)")

    if pass_ratio >= 0.5:
        print("✅ Strategy meets performance requirement (≥ 50% win rate vs. uniform DCA).")
    else:
        print("❌ Strategy failed performance requirement (< 50% win rate vs. uniform DCA).")
        passed = False

    # ──────────────────────────────────────────────────────────
    # Final verdict
    # ──────────────────────────────────────────────────────────
    if passed:
        print("\n✅ Strategy is ready for submission.")
    else:
        print("\n⚠️ Please address the above issues before submitting.")

# Run main workflow

In [59]:
btc_df = load_data()
validate_price_data(btc_df)
btc_df = btc_df.loc[BACKTEST_START:BACKTEST_END]

In [60]:
# Rolling-window SPD backtest:
df_spd = backtest_dynamic_dca(
    btc_df,
    compute_weights,
    strategy_label="Dynamic DCA"
)


Aggregated Metrics for Dynamic DCA:
Dynamic Sats-per-Dollar:
  min: 985.96
  max: 4710.91
  mean: 2627.27
  median: 2416.72

Dynamic SPD Percentiles:
  min: 17.60%
  max: 59.63%
  mean: 33.99%
  median: 32.92%

Exponential-Decay Average SPD: 987.99
Exponential-Decay Average SPD Percentile: 28.71%


In [61]:
# Sanity checks (each window inside):
check_strategy_submission_ready(btc_df, compute_weights)


⚠️ Windows where strategy underperformed Uniform DCA:


Unnamed: 0,Window,Dynamic Percentile,Uniform Percentile,Delta
0,2021-01-01 → 2022-01-01,34.854574,37.703488,-2.848915
1,2021-01-02 → 2022-01-02,35.672560,38.401232,-2.728672
2,2021-01-03 → 2022-01-03,35.672044,38.260164,-2.588120
3,2021-01-04 → 2022-01-04,35.671604,38.139957,-2.468353
4,2021-01-05 → 2022-01-05,35.671153,38.016756,-2.345602
...,...,...,...,...
1152,2024-11-03 → 2025-11-03,27.415506,28.902610,-1.487105
1153,2024-11-04 → 2025-11-04,27.414811,28.712959,-1.298148
1154,2024-11-05 → 2025-11-05,29.020276,30.175202,-1.154927
1155,2024-11-06 → 2025-11-06,35.549011,36.728025,-1.179014



Summary: Your strategy underperformed uniform DCA in 1157 out of 1407 windows (17.77% win rate)
❌ Strategy failed performance requirement (< 50% win rate vs. uniform DCA).

⚠️ Please address the above issues before submitting.


In [62]:
# win_rate = 99.79
# exp_decay_percentile = 68.77

# score = 0.5 * win_rate + 0.5 * exp_decay_percentile
# print(f"Final Model Score (50/50 weighting): {score:.2f}%")