In [1]:
!pip install pandas
!pip install pyathena

!pip install yfinance
!pip uninstall --yes pypdf
!pip install --upgrade fpdf2
!pip install matplotlib

!pip install --upgrade numexpr



**Define a class to manage the technical indicators**

In [2]:
from dataclasses import dataclass


@dataclass(frozen=True)
class TechnicalFactors:
    # EMA crossover lengths
    ema_short: int = 12
    ema_long: int = 26

    # MACD periods (fast, slow, signal)
    macd_fast: int = 12
    macd_slow: int = 26
    macd_signal: int = 9

    # RSI lookback and decision threshold
    rsi_period: int = 14
    rsi_threshold: float = 50.0

    # ATR lookback and how wide your strike buffer is
    atr_period: int = 14
    volatility_multiplier: float = 0.5


**Define a function to parse the expiration date**

In [3]:
from datetime import datetime, date

def parse_expiration(
    contract_date: date,
    exp_time_str: str,
) -> datetime:
    """
    Given:
      - contract_date: a datetime.date (e.g. 2025-07-18)
      - exp_time_str : a string like "07/17/2025 11:00 pm"
    Return a datetime for the full expiration.
    """
    # Parse the MM/DD/YYYY hh:mm am/pm pattern
    return datetime.strptime(exp_time_str, "%m/%d/%Y %I:%M %p")


**Define a function to get the historical data**

In [4]:
import io
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any

import boto3
import pandas as pd

def load_recent_history(
    s3_client,
    bucket: str,
    prefix: str,
    days: int,
) -> pd.DataFrame:
    """
    Load the last `days` of tradingResults CSVs from S3 into one DataFrame.
    Assumes each key under `prefix` is named <YYYYMMDD>_tradingResults.csv.
    Picks up Date, Name, Exp Time, Strike Price, In the Money, Ticker.
    """
    cutoff = datetime.utcnow().date() - timedelta(days=days)
    paginator = s3_client.get_paginator("list_objects_v2")
    dfs = []

    # Define all 7 raw columns, then pick a subset
    all_cols = [
        "date",        # Date
        "name",        # Name
        "exp_time",    # Exp Time
        "_exp_value",  # Exp Value (not used)
        "strike",      # Strike Price
        "in_the_money",# In the Money
        "ticker",      # Ticker
    ]

    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            if key.endswith("/"):
                continue

            # Derive partition date from filename
            file_dt = key.split("/")[-1].split("_", 1)[0]
            dt = datetime.strptime(file_dt, "%Y%m%d").date()
            if dt < cutoff:
                continue

            body = s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
            df = pd.read_csv(
                io.BytesIO(body),
                skiprows=1,                    # skip the header row
                names=all_cols,
                usecols=["date","name","exp_time","strike","in_the_money","ticker"],
                dtype={"in_the_money": "Int64"},
                na_filter=False,
            )
            # Parse the Date column into a date object
            df["dt"] = pd.to_datetime(df["date"], format="%d-%b-%y").dt.date
            dfs.append(df)

    if not dfs:
        return pd.DataFrame(columns=["date","name","exp_time","strike","in_the_money","ticker","dt"])

    return pd.concat(dfs, ignore_index=True)

def write_to_s3(
    df: pd.DataFrame,
    bucket: str,
    key: str,
) -> pd.DataFrame:
    """
    Write DataFrame to S3 (CSV) and return it, so it
    can stay inside a .pipe() chain.
    """
    path = f"s3://{bucket}/{key}"
    df.to_csv(path, index=False)
    return df

**Define a function to load the pricing**

In [5]:
import pandas as pd
import yfinance as yf
from typing import Union
from datetime import datetime

DateLike = Union[str, datetime]

def fetch_price(
    ticker: str,
    start: DateLike,
    end: DateLike,
    interval: str
) -> pd.DataFrame:
    """
    Download OHLCV for `ticker` between `start` and `end` at `interval`.
    
    Parameters
    ----------
    ticker : str
        e.g. "AAPL"
    start : str or datetime
        Inclusive start date, e.g. "2025-01-01"
    end : str or datetime
        Exclusive end date, e.g. "2025-07-14"
    interval : str
        e.g. "1d", "1h", "5m"
    
    Returns
    -------
    pd.DataFrame
        Columns: [Date, Open, High, Low, Close, Adj Close, Volume, ticker]
    """
    df = yf.download(
        tickers=ticker,
        start=start,
        end=end,
        interval=interval,
        auto_adjust=True,
        progress=False
    )
    # Drop multi‐level column if present (yfinance sometimes nests tickers)
    df.columns = df.columns.get_level_values(0)  
    df = df.reset_index()
    df.columns.name = None
    df["ticker"] = ticker
    return df

**Define a function to compute the technical indicators**
1. Compute the MACD
2. Compute the RSI
3. Compute the ATR

In [6]:
import pandas as pd

def compute_macd(df: pd.DataFrame, factors: TechnicalFactors) -> pd.DataFrame:
    """
    Compute EMA{short}, EMA{long}, MACD, Signal, and MACD_hist
    using the spans defined in factors.
    """
    price = df["Close"]
    ema_short = price.ewm(span=factors.ema_short, adjust=False).mean()
    ema_long  = price.ewm(span=factors.ema_long,  adjust=False).mean()
    macd      = ema_short - ema_long
    signal    = macd.ewm(span=factors.macd_signal, adjust=False).mean()
    hist      = macd - signal

    return df.assign(
        **{
            f"EMA{factors.ema_short}": ema_short,
            f"EMA{factors.ema_long}":  ema_long,
            "MACD":       macd,
            "Signal":     signal,
            "MACD_hist":  hist,
        }
    )

def compute_rsi(df: pd.DataFrame, factors: TechnicalFactors) -> pd.DataFrame:
    """Compute RSI over factors.rsi_period."""
    delta = df["Close"].diff()
    gain  = delta.clip(lower=0)
    loss  = -delta.clip(upper=0)
    avg_gain = gain.ewm(com=factors.rsi_period - 1, adjust=False).mean()
    avg_loss = loss.ewm(com=factors.rsi_period - 1, adjust=False).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return df.assign(RSI=rsi)


def compute_atr(df: pd.DataFrame, factors: TechnicalFactors) -> pd.DataFrame:
    """Compute ATR over factors.atr_period."""
    high_low = df["High"] - df["Low"]
    high_pc  = (df["High"] - df["Close"].shift(1)).abs()
    low_pc   = (df["Low"]  - df["Close"].shift(1)).abs()
    tr = pd.concat([high_low, high_pc, low_pc], axis=1).max(axis=1)
    atr = tr.rolling(window=factors.atr_period).mean()
    return df.assign(ATR=atr)


*Define a function to determine the trading signal**
1. Determine the Trend
2. Check the Momentum
3. Determine the Trigger
4. Check the Volatility
5. Generate the signal

In [7]:
import pandas as pd

def determine_trend(row: pd.Series, factors: TechnicalFactors):
    short = row[f"EMA{factors.ema_short}"]
    long_ = row[f"EMA{factors.ema_long}"]
    diff  = short - long_

    if (short > long_) and (row.Close > short) and (row.Close > long_):
        return "up", diff
    if (short < long_) and (row.Close < short) and (row.Close < long_):
        return "down", diff
    return "sideways", diff


def momentum_check(row: pd.Series, trend: str, factors: TechnicalFactors):
    rsi = row.RSI
    macd_diff = row.MACD - row.Signal
    thr = factors.rsi_threshold

    if trend == "up":
        ok = (rsi > thr) and (macd_diff > 0)
    elif trend == "down":
        ok = (rsi < thr) and (macd_diff < 0)
    else:
        ok = False
    return ok, rsi

def volatility_check(row: pd.Series, strike_diff: float, factors: TechnicalFactors):
    ok = strike_diff <= factors.volatility_multiplier * row.ATR
    return ok, strike_diff   

def signal_trigger(row: pd.Series, trend: str, factors: TechnicalFactors):
    macd_diff = row.MACD - row.Signal
    thr = factors.rsi_threshold

    if trend == "up":
        ok = (macd_diff > 0) and (row.RSI > thr)
    elif trend == "down":
        ok = (macd_diff < 0) and (row.RSI < thr)
    else:
        ok = False
    return ok, macd_diff

def generate_trade_signal(
    df: pd.DataFrame,
    asset_symbol: str,
    strike_price: float,
    factors: TechnicalFactors,
    expiry: str = "EOD",
) -> pd.DataFrame:
    """
    Append:
      - signal (1=Buy, 0=Sell, <NA>=No trade)
      - recommendation (text)
      - TrendValue, MomentumValue, SignalValue, VolatilityValue
    """
    last = df.iloc[-1]
    trend, trend_val       = determine_trend(last, factors)
    mom_ok, mom_val        = momentum_check(last, trend, factors)
    sig_ok, sig_val        = signal_trigger(last, trend, factors)
    strike_diff            = abs(last.Close - strike_price)
    vol_ok, vol_val        = volatility_check(last, strike_diff, factors)

    # Decide Buy / Sell / No trade
    if trend == "up" and mom_ok and sig_ok and vol_ok:
        signal_val = 1
        rec_text   = f"Buy {asset_symbol} @ {strike_price:.4f} ({expiry})"
    elif trend == "down" and mom_ok and sig_ok and vol_ok:
        signal_val = 0
        rec_text   = f"Sell {asset_symbol} @ {strike_price:.4f} ({expiry})"
    else:
        signal_val = pd.NA
        rec_text   = "No trade"

    out = df.copy()
    out["signal"]          = signal_val
    out["recommendation"]  = rec_text
    out["TrendValue"]      = trend_val
    out["MomentumValue"]   = mom_val
    out["SignalValue"]     = sig_val
    out["VolatilityValue"] = vol_val
    return out


**Define a function to run the backtesting pipeline**
1. Get historical information
2. Compute historical recommendations

In [8]:
import pandas as pd
import io

from tqdm.auto import tqdm
from contextlib import redirect_stdout, redirect_stderr
from typing import Union, Tuple, Optional, List
from datetime import date, datetime, timedelta

DateLike = Union[str, date]

def get_date_range(
    days_back: int,
    start: Optional[DateLike] = None,
    end: Optional[DateLike] = None,
) -> Tuple[str, str]:
    """
    Return (start_date, end_date) as ISO strings.

    If `start` is given, use it (parsed as ISO/string). Otherwise
    compute start = end - days_back.
    If `end` is given, use it; otherwise end = today.

    Parameters
    ----------
    days_back : int
        Number of days to go back when `start` is not provided.
    start : str or date, optional
        The explicit start date (inclusive). ISO format 'YYYY-MM-DD' or date.
    end : str or date, optional
        The explicit end date (inclusive). ISO format or date.

    Returns
    -------
    (start_iso, end_iso) : Tuple[str, str]
    """
    # Resolve end_date
    if end is None:
        end_date = date.today()
    else:
        end_date = (
            datetime.fromisoformat(end).date()
            if isinstance(end, str)
            else end
        )
    # Resolve start_date
    if start is not None:
        start_date = (
            datetime.fromisoformat(start).date()
            if isinstance(start, str)
            else start
        )
    else:
        start_date = end_date - timedelta(days=days_back)

    return start_date.isoformat(), end_date.isoformat()

def run_backtest_pipeline(
    client: Any,
    s3_bucket: str,
    s3_prefix: str,
    tickers: List[str],
    factors: TechnicalFactors,      
    days_back: int = 60,
    price_interval: str = "1h",
) -> pd.DataFrame:

    """
    For each historical contract in S3, fetch price data up to 24h before
    its expiration, compute indicators and a trade signal, then compare
    the signal to the actual in-the-money flag. Returns a DataFrame with:
      ticker, contract_dt, exp_dt, fetch_start, fetch_end,
      strike_price, signal, in_the_money, actual
    """
    # 1) Load your historical contracts
    historical = load_recent_history(client, s3_bucket, s3_prefix, days_back)
    results = []

    for _, row in tqdm(
        historical.iterrows(),
        total=len(historical),
        desc="Backtesting contracts"
    ):
        ticker       = row["ticker"]
        strike_price = row["strike"]
        contract_dt  = row["dt"]
        exp_time_str = row["exp_time"]

        # 2) Build expiration datetime & 24h‐prior cutoff
        exp_dt     = parse_expiration(contract_dt, exp_time_str)
        fetch_end  = exp_dt - timedelta(hours=24)

        # 3) Clamp lookback for sub-daily to max 60 days
        lookback_days = days_back if price_interval == "1d" else min(days_back, 60)
        fetch_start   = fetch_end - timedelta(days=lookback_days)

        if fetch_start >= fetch_end:
            print(f"⚠️  Skipping {ticker} @ {contract_dt}: "
                  f"fetch_start {fetch_start} ≥ fetch_end {fetch_end}")
            continue

        # 4) Download price data (skip on any error, fully silencing yfinance)
        try:
            with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
                price_df = fetch_price(
                    ticker,
                    start=fetch_start,   # datetime.date or datetime.datetime
                    end=fetch_end,       # datetime.datetime
                    interval=price_interval,
                )
        except Exception as e:
            continue

        if price_df.empty:
            continue

        # 5) Compute indicators
        ind_df = (
            price_df
            .pipe(compute_macd, factors)
            .pipe(compute_rsi, factors)
            .pipe(compute_atr, factors)
        )

        if ind_df.empty:
            print(f"⚠️  {ticker} @ {contract_dt}: insufficient data for indicators → skipping")
            continue

        # 6) Generate trade signal
        sig_df = ind_df.pipe(
            generate_trade_signal,
            asset_symbol=ticker,
            strike_price=strike_price,
            factors=factors,
            expiry="24h-prior",
        )
        
        # Extract the raw signal (might be pd.NA)
        raw_signal = sig_df["signal"].iloc[-1]
        
        # If it's NA, that's “No trade” → skip
        if pd.isna(raw_signal):
            continue
        
        # Otherwise safe to cast
        signal = int(raw_signal)
        
        # 7) Compute actual outcome
        actual = row["in_the_money"] if signal == 1 else 1 - row["in_the_money"]

        results.append({
            "ticker":       ticker,
            "contract_dt":  contract_dt,
            "exp_dt":       exp_dt,
            "fetch_start":  fetch_start,
            "fetch_end":    fetch_end,
            "strike_price": strike_price,
            "signal":       signal,
            "in_the_money": row["in_the_money"],
            "actual":       actual,
        })

    # 8) Build summary DataFrame + print accuracy
    out   = pd.DataFrame(results)
    total = len(out)
    wins  = int(out["actual"].sum()) if total else 0
    accuracy = wins / total if total else 0.0
    
    print(f"✅ Backtest: {wins}/{total} correct → {accuracy:.1%}")

    return out

**Define a function to report on the results of the backtesting**
1. Overall wins/ total/ accuracy
2. Per Ticker wins/ total/ accuracy

In [9]:
import pandas as pd
from tabulate import tabulate

def report_backtest_results(backtest_df: pd.DataFrame) -> None:
    """
    Prints two tables:
      1) Summary by Category (Overall/Buy/Sell)
      2) Per‐Ticker with detailed counts & percentages

    This function will automatically detect 'signal' (any case) and 'actual'
    columns by lowercasing them internally, so you won’t get a KeyError.
    """
    # — 1) Normalize column casing —
    col_map = {}
    for c in backtest_df.columns:
        lc = c.lower()
        if lc == "signal":
            col_map[c] = "signal"
        if lc == "actual":
            col_map[c] = "actual"
    df = backtest_df.rename(columns=col_map).copy()

    # now fail fast if still missing
    missing = [x for x in ("signal","actual") if x not in df.columns]
    if missing:
        raise KeyError(f"report_backtest_results needs columns {missing}; got {backtest_df.columns.tolist()}")

    # — 2) Drop no‐trade rows & ensure integer signals —
    df = df.dropna(subset=["signal"])
    df["signal"] = df["signal"].astype(int)

    # — helper to compute rec/win/pct —
    def stats(sub: pd.DataFrame):
        rec = len(sub)
        win = int(sub["actual"].sum())
        pct = win / rec if rec else pd.NA
        return rec, win, pct

    # — 3) Build & print Summary table —
    overall_rec, overall_win, overall_pct = stats(df)
    buy_rec,     buy_win,     buy_pct     = stats(df[df.signal == 1])
    sell_rec,    sell_win,    sell_pct    = stats(df[df.signal == 0])

    summary_df = pd.DataFrame([
        {"Category":"Overall", "Recommended": overall_rec, "Correct": overall_win, "Accuracy": overall_pct},
        {"Category":"Buy",     "Recommended":    buy_rec, "Correct":    buy_win,    "Accuracy": buy_pct},
        {"Category":"Sell",    "Recommended":   sell_rec, "Correct":   sell_win,   "Accuracy": sell_pct},
    ])

    # human‐friendly rename + percentage formatting
    summary_print = summary_df.rename(columns={
        "Recommended": "# Recommended",
        "Correct":     "# Correct",
        "Accuracy":    "% Correct",
    })
    summary_print["% Correct"] = summary_print["% Correct"].map(
        lambda x: f"{x:.1%}" if pd.notna(x) else pd.NA
    )

    print("\n=== Summary by Category ===")
    print(tabulate(summary_print, headers="keys", tablefmt="fancy_grid", showindex=False))


    # — 4) Build & print Per‐Ticker table —
    rows = []
    for ticker, grp in df.groupby("ticker"):
        rec_buy,  win_buy,  pct_buy  = stats(grp[grp.signal == 1])
        rec_sell, win_sell, pct_sell = stats(grp[grp.signal == 0])
        rec_tot,  win_tot,  pct_tot  = stats(grp)

        rows.append({
            "Ticker":     ticker,
            "Rec_Total":  rec_tot,  "Corr_Total": win_tot,  "Pct_Total": pct_tot,
            "Rec_Buy":    rec_buy,  "Corr_Buy":  win_buy,  "Pct_Buy":   pct_buy,
            "Rec_Sell":   rec_sell, "Corr_Sell": win_sell, "Pct_Sell":  pct_sell,
        })

    per_ticker_df = (
        pd.DataFrame(rows)
          .sort_values("Pct_Total", ascending=False, na_position="last")
          .reset_index(drop=True)
    )

    per_ticker_print = per_ticker_df.rename(columns={
        "Rec_Total":  "Total Rec",
        "Corr_Total": "Total Corr",
        "Pct_Total":  "% Total",
        "Rec_Buy":    "Buy Rec",
        "Corr_Buy":   "Buy Corr",
        "Pct_Buy":    "% Buy",
        "Rec_Sell":   "Sell Rec",
        "Corr_Sell":  "Sell Corr",
        "Pct_Sell":   "% Sell",
    })
    for pct_col in ["% Total", "% Buy", "% Sell"]:
        per_ticker_print[pct_col] = per_ticker_print[pct_col].map(
            lambda x: f"{x:.1%}" if pd.notna(x) else pd.NA
        )

    print("\n=== By Ticker (Detailed) ===")
    print(tabulate(per_ticker_print, headers="keys", tablefmt="fancy_grid", showindex=False))



**Define a function to create and upload a report**

**Define a function to summarize all the backtests**

In [10]:
import pandas as pd
from typing import Dict

def summarize_variations(
    variation_results: Dict[str, pd.DataFrame]
) -> pd.DataFrame:
    """
    Given a dict mapping variation names to backtest DataFrames
    (each with an 'actual' column of 1 for correct, 0 for incorrect),
    return a summary DataFrame indexed by variation with columns:
      - Recommended : total trades recommended
      - Correct     : number of correct trades
      - Accuracy    : Correct / Recommended (NaN if Recommended == 0)
    """
    rows = []
    for name, df in variation_results.items():
        rec   = len(df)
        corr  = int(df["actual"].sum()) if rec else 0
        acc   = corr / rec if rec else pd.NA
        rows.append({
            "Variation":  name,
            "Recommended": rec,
            "Correct":     corr,
            "Accuracy":    acc,
        })

    summary = (
        pd.DataFrame(rows)
          .set_index("Variation")
          .sort_values("Accuracy", ascending=False, na_position="last")
    )
    # format Accuracy as percentage, leave NaN for zero-recommendation
    summary["Accuracy"] = summary["Accuracy"].map(
        lambda x: f"{x:.1%}" if pd.notna(x) else pd.NA
    )
    return summary


**Run backtesting pipeine**

In [12]:
from tabulate import tabulate
from IPython.display import display

TICKERS = {
    'CL=F',
    'ES=F',
    'GC=F',
    'NQ=F',
    'RTY=F',
    'YM=F',
    'NG=F',
    'AUDUSD=X',
    'EURJPY=X',
    'EURUSD=X',
    'GBPJPY=X',
    'GBPUSD=X',
    'USDCAD=X',
    'USDCHF=X',
    'USDJPY=X'
}

factor_variations = {
    "baseline": TechnicalFactors(),
    "fast_emas": TechnicalFactors(ema_short=8, ema_long=20),
    "slow_emas": TechnicalFactors(ema_short=20, ema_long=50),
    # "tight_vol": TechnicalFactors(volatility_multiplier=0.3),
}

from tabulate import tabulate

all_results = {}

for name, factors in factor_variations.items():
    print(f"\n── Running backtest: {name} ──")
    s3_client = boto3.client("s3", region_name="us-east-1")
    df = run_backtest_pipeline(
        client=s3_client,
        s3_bucket="nadex-daily-results",
        s3_prefix="historical",
        tickers=TICKERS,
        factors=factors,              # <— pass the variation
        days_back=20,
        price_interval="1d",
    )
    all_results[name] = df

    print(f"DEBUG — backtest DF for {name}:")
    print("  shape =", df.shape)
    print("  cols  =", df.columns.tolist())
    print(df.head(3))


    # You can reuse your report helper:
    print(f"\nResults for {name}:")
    report_backtest_results(df)

    summary_df = summarize_variations(all_results)
    print(summary_df)



── Running backtest: baseline ──


Backtesting contracts: 0it [00:00, ?it/s]

✅ Backtest: 0/0 correct → 0.0%
DEBUG — backtest DF for baseline:
  shape = (0, 0)
  cols  = []
Empty DataFrame
Columns: []
Index: []

Results for baseline:


KeyError: "report_backtest_results needs columns ['signal', 'actual']; got []"