In [None]:
#!/usr/bin/env python3
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

def load_prices(prices_file="prices.txt"):
    """
    Load (timesteps × instruments) price matrix.
    Assumes whitespace-delimited, oldest row first.
    """
    return np.loadtxt(Path(prices_file), delimiter=None)

def find_pips(prices: np.ndarray, threshold: float):
    """
    Zig-zag pivot finder, 100% causal:
    uses only prices up to the current bar.

    Returns a list of (idx, price) pairs where each
    pivot is detected exactly at the bar it occurs.
    """
    n = len(prices)
    pips = [(0, prices[0])]
    last_extreme = prices[0]
    direction = None

    for i in range(1, n):
        move = (prices[i] - last_extreme) / last_extreme

        if direction is None:
            if abs(move) >= threshold:
                direction     = 'up' if move > 0 else 'down'
                last_extreme  = prices[i]
                pips.append((i, prices[i]))
        else:
            # extend the current swing
            if direction=='up' and prices[i] > last_extreme:
                last_extreme = prices[i]
            elif direction=='down' and prices[i] < last_extreme:
                last_extreme = prices[i]

            # check for a reversal
            rev = ((last_extreme - prices[i]) / last_extreme
                   if direction=='up'
                   else (prices[i] - last_extreme) / last_extreme)
            if rev >= threshold:
                pips.append((i, last_extreme))
                direction    = 'down' if direction=='up' else 'up'
                last_extreme = prices[i]

    # always include the final bar as a pivot
    if pips[-1][0] != n-1:
        pips.append((n-1, prices[-1]))

    return pips

def causal_signal(series: np.ndarray, threshold: float):
    """
    One-pass, fully causal up/down signal:
      +1 = long, -1 = short, 0 = flat until first swing confirms.
    """
    n = len(series)
    pos = np.zeros(n, dtype=int)
    last_extreme = series[0]
    direction = None

    for i in range(1, n):
        move = (series[i] - last_extreme) / last_extreme

        if direction is None:
            if abs(move) >= threshold:
                direction    = 'up' if move > 0 else 'down'
                last_extreme = series[i]
        else:
            if direction=='up':
                if series[i] > last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (last_extreme - series[i]) / last_extreme
                    if rev >= threshold:
                        direction    = 'down'
                        last_extreme = series[i]
            else:  # direction == 'down'
                if series[i] < last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (series[i] - last_extreme) / last_extreme
                    if rev >= threshold:
                        direction    = 'up'
                        last_extreme = series[i]

        pos[i] = 1 if direction=='up' else (-1 if direction=='down' else 0)

    return pos

if __name__ == "__main__":
    # ───── Load Data ─────
    try:
        prices = load_prices("prices-2024.txt")
        print("Loaded prices.txt")
    except FileNotFoundError:
        np.random.seed(0)
        prices = np.cumsum(np.random.randn(1000, 50), axis=0) + 100.0
        print("Using synthetic data (1000×50)")

    # ───── Parameters ─────
    instr     = 15      # instrument to plot
    t1, t2    = 600, 700
    threshold = 0.01   # 1% pivot threshold

    # ───── Slice & Compute ─────
    series = prices[t1:t2+1, instr]
    times  = np.arange(t1, t2+1)

    pos    = causal_signal(series, threshold)
    pivots = find_pips(series, threshold)

    # ───── Shift PIP markers left by one bar (for display only) ─────
    # This does NOT affect the trading logic!
    shifted = []
    for idx, price in pivots:
        if idx > 0:
            shifted.append((idx-1, series[idx-1]))
    # if the final pivot was at the last bar, make sure it shows up:
    if shifted and shifted[-1][0] != len(series)-1:
        shifted.append((len(series)-1, series[-1]))
    if not shifted:
        shifted = [(0, series[0])]

    shift_idxs, shift_vals = zip(*shifted)

    # ───── Plot ─────
    fig, ax1 = plt.subplots(figsize=(12,5))

    # shade price-underlay
    ax1.fill_between(times, series, series.min(),
                     where=pos>0, facecolor='green', alpha=0.2,
                     step='post')
    ax1.fill_between(times, series, series.min(),
                     where=pos<0, facecolor='red',   alpha=0.2,
                     step='post')

    # price line
    ax1.plot(times, series, color='black', linewidth=1,
             label=f"Inst {instr}")

    # shifted pivot markers
    ax1.scatter(times[list(shift_idxs)], shift_vals,
                color='blue', s=50, zorder=3,
                label="PIPs (shifted left 1 bar)")

    ax1.set_xlabel("Timestep")
    ax1.set_ylabel("Price")
    ax1.set_title("Fully Causal PIP-Based Up/Down Indicator")

    # overlay live trading position
    ax2 = ax1.twinx()
    ax2.step(times, pos, where='post',
             color='gray', linewidth=0.01,
             label="Position (+1 long / -1 short)")
    ax2.set_ylim(-1.2, 1.2)
    ax2.set_ylabel("Position")

    # combine legends
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, labels1 + labels2,
               loc='upper left', fontsize='small')

    fig.tight_layout()
    plt.show()


In [None]:
#!/usr/bin/env python3
"""
gridsearch_pip_threshold.py
───────────────────────────
For each instrument, sweep pivot‐thresholds from 0.005 to 0.500
(step 0.0005) and pick the one that maximises **net PnL** after a
commission of 0.0005 per dollar traded.

Trading model
-------------
• Position = ±POSITION_SIZE shares when the causal PIP direction is
  `up` / `down`; 0 when flat.
• PnL for bar *t* is  
      pos[t-1] · (price[t] − price[t-1])
  (Mark-to-market at next close.)
• Commission on a position change is  
      COMM_RATE · |Δpos[t]| · price[t]

The code prints a per-instrument summary and an overall total.
"""
from pathlib import Path
from typing import Tuple

import numpy as np

# ───────────────────────────────── CONSTANTS ──────────────────────────────
PRICE_FILE      = "prices.txt"          # whitespace-delimited
THRESH_MIN      = 0.005
THRESH_MAX      = 0.500
THRESH_STEP     = 0.0005
POSITION_SIZE   = 10_000                # shares per instrument
COMM_RATE       = 0.0005                # per dollar traded

# ──────────────────────────── helpers ─────────────────────────────────────
def load_prices(fname: str = PRICE_FILE) -> np.ndarray:
    """
    Return a (T × nInst) price matrix.
    """
    f = Path(fname)
    if not f.exists():
        raise FileNotFoundError(f"{fname!s} not found")
    px = np.loadtxt(f, delimiter=None)
    # shape guard: rows=time, cols=instrument
    if px.ndim != 2:
        raise ValueError("Expecting 2-D price matrix (T × nInst)")
    return px


def causal_direction(series: np.ndarray, thr: float) -> np.ndarray:
    """
    Vector of +1/0/-1 directions for a single instrument given `thr`.
    (Pure Python loop is fine – T is usually O(10³–10⁴).)
    """
    n = len(series)
    out = np.zeros(n, dtype=np.int8)

    last_extreme = series[0]
    direction    = None                   # 'up' | 'down' | None

    for i in range(1, n):
        move = (series[i] - last_extreme) / last_extreme

        if direction is None:
            if abs(move) >= thr:
                direction    = 'up' if move > 0 else 'down'
                last_extreme = series[i]
        else:
            if direction == 'up':
                if series[i] > last_extreme:          # extend rally
                    last_extreme = series[i]
                elif (last_extreme - series[i]) / last_extreme >= thr:
                    direction    = 'down'             # reversal
                    last_extreme = series[i]
            else:                                     # direction == 'down'
                if series[i] < last_extreme:          # extend decline
                    last_extreme = series[i]
                elif (series[i] - last_extreme) / last_extreme >= thr:
                    direction    = 'up'               # reversal
                    last_extreme = series[i]

        out[i] = 1 if direction == 'up' else (-1 if direction == 'down' else 0)
    return out


# ────────────────── simulate_pnl (patched) ──────────────────
def simulate_pnl(series: np.ndarray,
                 thr: float,
                 pos_size: int = POSITION_SIZE,
                 comm_rate: float = COMM_RATE
                 ) -> Tuple[float, float, float, int]:

    # 1) generate ±1/0 signal
    dir_vec = causal_direction(series, thr)

    # --- PATCH ①: promote to 32-bit before scaling ------------------------
    pos = dir_vec.astype(np.int32) * pos_size          # shares held
    # ----------------------------------------------------------------------

    # 2) increments
    price_diff = np.diff(series, prepend=series[0])

    # --- PATCH ②: cast to float64 for safe arithmetic ---------------------
    pnl = pos[:-1].astype(np.float64) * price_diff[1:]   # bar-to-bar PnL
    # ----------------------------------------------------------------------

    delta_pos   = np.diff(pos, prepend=0)

    # --- PATCH ③: commission also in float64 ------------------------------
    commission  = comm_rate * np.abs(delta_pos).astype(np.float64) * series
    # ----------------------------------------------------------------------

    gross_pnl   = float(pnl.sum())
    total_comm  = float(commission.sum())
    net_pnl     = gross_pnl - total_comm
    n_trades    = int((delta_pos != 0).sum())

    return net_pnl, gross_pnl, total_comm, n_trades



def best_threshold(series: np.ndarray) -> Tuple[float, float, float, float, int]:
    """
    Grid-search thresholds and return:
      best_thr, best_net_pnl, gross_pnl, commission, n_trades
    """
    best_thr   = THRESH_MIN
    best_stats = (-np.inf, 0.0, 0.0, 0)   # net, gross, comm, trades

    thr = THRESH_MIN
    while thr <= THRESH_MAX + 1e-12:      # float guard
        net, gross, comm, n_tr = simulate_pnl(series, thr)
        if net > best_stats[0]:
            best_thr   = thr
            best_stats = (net, gross, comm, n_tr)
        thr += THRESH_STEP
    return (best_thr, *best_stats)


# ─────────────────────────────── main ─────────────────────────────────────
def main() -> None:
    px = load_prices()                    # (T × nInst)
    T, nInst = px.shape
    print(f"Loaded price matrix: {T:,d} timesteps × {nInst} instruments\n")

    hdr = ("Instr", "BestThr", "NetPnL", "Gross", "Comm", "Trades")
    print(f"{hdr[0]:>5}  {hdr[1]:>7}  {hdr[2]:>12}  {hdr[3]:>12}  "
          f"{hdr[4]:>10}  {hdr[5]:>6}")
    print("-"*60)

    total_net = total_gross = total_comm = 0.0
    total_trades = 0

    for i in range(nInst):
        best_thr, net, gross, comm, n_tr = best_threshold(px[:, i])

        total_net    += net
        total_gross  += gross
        total_comm   += comm
        total_trades += n_tr

        print(f"{i:5d}  {best_thr:7.4f}  {net:12,.2f}  {gross:12,.2f}  "
              f"{comm:10,.2f}  {n_tr:6d}")

    print("-"*60)
    print(f"Total net PnL : {total_net:,.2f}")
    print(f"   Gross PnL  : {total_gross:,.2f}")
    print(f"   Commission : {total_comm:,.2f}")
    print(f"   Trades     : {total_trades:,d}")


if __name__ == "__main__":
    main()


In [None]:
#!/usr/bin/env python3
"""
gridsearch_pip_threshold_compare.py
───────────────────────────────────
For each instrument:

1.  Grid-search pivot thresholds from **0.005 → 0.500**
   (step 0.0005) on the **full history**.
2.  Repeat the same search using only the **first 500 bars**.
3.  Report, side-by-side, the optimal threshold, net PnL, and the
   standard deviation (σ) of per-bar **net** PnL for both windows.

Trading assumptions
-------------------
• Position = ±POSITION_SIZE shares when the causal PIP direction is
  `up` / `down`; 0 when flat.
• Net PnL per bar *t* = position[t-1] · (price[t]−price[t-1]) −
  commission on any change executed at bar *t*.
• Commission per change = COMM_RATE · |Δposition| · price[t].
"""
from pathlib import Path
from typing   import Tuple

import numpy as np

# ───────────────────────────────── CONSTANTS ──────────────────────────────
PRICE_FILE      = "prices.txt"          # whitespace-delimited
THRESH_MIN      = 0.000
THRESH_MAX      = 0.700
THRESH_STEP     = 0.0005
POSITION_SIZE   = 10_000                # shares per instrument
COMM_RATE       = 0.0005                # per dollar traded
FIRST_N_BARS    = 500                   # early-window length

# ──────────────────────────── data loader ─────────────────────────────────
def load_prices(fname: str = PRICE_FILE) -> np.ndarray:
    f = Path(fname)
    if not f.exists():
        raise FileNotFoundError(f"{fname!s} not found")
    px = np.loadtxt(f, delimiter=None)
    if px.ndim != 2:
        raise ValueError("Expecting 2-D price matrix (T × nInst)")
    return px


# ─────────────────────── causal PIP direction ────────────────────────────
def causal_direction(series: np.ndarray, thr: float) -> np.ndarray:
    """
    Vector of +1 / 0 / -1 directions for one instrument.
    """
    n   = len(series)
    out = np.zeros(n, dtype=np.int8)

    last_extreme = series[0]
    direction    = None            # 'up' | 'down' | None

    for i in range(1, n):
        move = (series[i] - last_extreme) / last_extreme

        if direction is None:
            if abs(move) >= thr:
                direction    = 'up' if move > 0 else 'down'
                last_extreme = series[i]
        else:
            if direction == 'up':
                if series[i] > last_extreme:
                    last_extreme = series[i]
                elif (last_extreme - series[i]) / last_extreme >= thr:
                    direction    = 'down'
                    last_extreme = series[i]
            else:                                  # direction == 'down'
                if series[i] < last_extreme:
                    last_extreme = series[i]
                elif (series[i] - last_extreme) / last_extreme >= thr:
                    direction    = 'up'
                    last_extreme = series[i]

        out[i] = 1 if direction == 'up' else (-1 if direction == 'down' else 0)
    return out


# ───────────────────── simulate PnL for one thr ──────────────────────────
def simulate_pnl(series: np.ndarray,
                 thr: float,
                 pos_size: int = POSITION_SIZE,
                 comm_rate: float = COMM_RATE
                 ) -> Tuple[float, float]:
    """
    Return (net_pnl_total, std_net_pnl_per_bar)
    """
    # 1) ±1/0 signal → shares
    dir_vec = causal_direction(series, thr)
    pos     = dir_vec.astype(np.int32) * pos_size

    # 2) per-bar PnL & commission
    price_diff  = np.diff(series, prepend=series[0])
    gross_pnl   = pos[:-1].astype(np.float64) * price_diff[1:]

    delta_pos   = np.diff(pos, prepend=0)
    commission  = comm_rate * np.abs(delta_pos).astype(np.float64) * series

    net_pnl_vec = gross_pnl - commission[1:]     # align to bar t
    net_total   = float(net_pnl_vec.sum())
    std_net     = float(np.std(net_pnl_vec, ddof=0))

    return net_total, std_net


# ─────────────────── grid-search best threshold ──────────────────────────
def best_threshold(series: np.ndarray) -> Tuple[float, float, float]:
    """
    Return (best_thr, best_net_pnl, std_net_pnl)
    """
    best_thr   = THRESH_MIN
    best_net   = -np.inf
    best_std   = 0.0

    thr = THRESH_MIN
    while thr <= THRESH_MAX + 1e-12:
        net, std = simulate_pnl(series, thr)
        if net > best_net:
            best_thr, best_net, best_std = thr, net, std
        thr += THRESH_STEP
    return best_thr, best_net, best_std


# ───────────────────────────────── main ───────────────────────────────────
def main() -> None:
    px = load_prices()                      # (T × nInst)
    T, nInst = px.shape
    if T < FIRST_N_BARS:
        raise ValueError(f"Need at least {FIRST_N_BARS} bars; only {T} found")

    print(f"Loaded price matrix: {T:,d} timesteps × {nInst} instruments\n")

    hdr = ("Inst", "Thr-Full", "Thr-500", "Δthr",
           "Net-Full", "σ-Full", "Net-500", "σ-500")
    print(f"{hdr[0]:>5}  {hdr[1]:>7}  {hdr[2]:>7}  {hdr[3]:>7}  "
          f"{hdr[4]:>12}  {hdr[5]:>8}  {hdr[6]:>12}  {hdr[7]:>8}")
    print("-"*85)

    tot_full = tot_early = 0.0

    for i in range(nInst):
        series_full  = px[:, i]
        series_early = px[:FIRST_N_BARS, i]

        thr_full,  net_full,  std_full  = best_threshold(series_full)
        thr_early, net_early, std_early = best_threshold(series_early)

        d_thr = thr_early - thr_full

        tot_full  += net_full
        tot_early += net_early

        print(f"{i:5d}  "
              f"{thr_full:7.4f}  {thr_early:7.4f}  {d_thr:7.4f}  "
              f"{net_full:12,.2f}  {std_full:8.2f}  "
              f"{net_early:12,.2f}  {std_early:8.2f}")

    print("-"*85)
    print(f"Σ Net PnL (full) : {tot_full:,.2f}")
    print(f"Σ Net PnL (first {FIRST_N_BARS}) : {tot_early:,.2f}")


if __name__ == "__main__":
    main()


In [None]:
#!/usr/bin/env python3
import numpy as np
import pandas as pd
from pathlib import Path

# ────────────────────────── helpers ────────────────────────── #
def load_prices(prices_file="prices.txt"):
    """Load (timesteps × instruments) price matrix."""
    return np.loadtxt(Path(prices_file), delimiter=None)

def causal_signal(series: np.ndarray, threshold: float) -> np.ndarray:
    """
    Fully-causal zig-zag direction: +1 long / –1 short / 0 flat.
    """
    n   = len(series)
    pos = np.zeros(n, dtype=int)
    last_extreme = series[0]
    direction    = None

    for i in range(1, n):
        move = (series[i] - last_extreme) / last_extreme
        if direction is None:
            if abs(move) >= threshold:
                direction    = 'up' if move > 0 else 'down'
                last_extreme = series[i]
        else:
            if direction == 'up':
                if series[i] > last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (last_extreme - series[i]) / last_extreme
                    if rev >= threshold:
                        direction    = 'down'
                        last_extreme = series[i]
            else:                                 # direction == 'down'
                if series[i] < last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (series[i] - last_extreme) / last_extreme
                    if rev >= threshold:
                        direction    = 'up'
                        last_extreme = series[i]
        pos[i] = 1 if direction=='up' else (-1 if direction=='down' else 0)
    return pos

def trade_returns(series: np.ndarray, pos: np.ndarray) -> list:
    """
    Extract contiguous +1 / –1 runs and compute signed % returns for each.
    """
    trades = []
    cur_pos   = pos[0]
    entry_idx = 0 if cur_pos else None
    for i in range(1, len(series)):
        if pos[i] != cur_pos:
            if cur_pos:
                pnl = (series[i-1] - series[entry_idx]) / series[entry_idx] * cur_pos
                trades.append(pnl)
            cur_pos   = pos[i]
            entry_idx = i if cur_pos else None
    if cur_pos and entry_idx is not None:
        pnl = (series[-1] - series[entry_idx]) / series[entry_idx] * cur_pos
        trades.append(pnl)
    return trades

def bar_winrate(series: np.ndarray, pos: np.ndarray) -> float:
    """
    For each bar where pos != 0, check if next-bar return has same sign.
    """
    correct = total = 0
    for i in range(len(series)-1):          # last bar has no next return
        if pos[i] != 0:
            ret = series[i+1] - series[i]
            total += 1
            if ret * pos[i] > 0:
                correct += 1
    return correct / total if total else np.nan

# ──────────────────────── main analysis ────────────────────── #
if __name__ == "__main__":
    try:
        prices = load_prices("prices.txt")
        print("Loaded prices.txt\n")
    except FileNotFoundError:
        np.random.seed(0)
        prices = np.cumsum(np.random.randn(1000, 50), axis=0) + 100.0
        print("Using synthetic data (1000 × 50)\n")

    T, n_inst = prices.shape
    threshold = 0.01   # 1 % pivot threshold

    rows = []
    for inst in range(n_inst):
        series = prices[:, inst]
        pos    = causal_signal(series, threshold)

        # trade-level stats
        trades = trade_returns(series, pos)
        num_trades = len(trades)
        total_pnl  = float(np.sum(trades))          if num_trades else 0.0
        mean_pnl   = float(np.mean(trades))         if num_trades else 0.0
        std_pnl    = float(np.std(trades, ddof=0))  if num_trades else 0.0
        wins       = sum(r > 0 for r in trades)
        losses     = sum(r < 0 for r in trades)
        win_rate   = wins / num_trades if num_trades else np.nan

        # consecutive streaks
        runs_w = runs_l = []
        if num_trades:
            runs_w = []; runs_l = []
            cw = cl = 0
            for r in trades:
                if r > 0:
                    cw += 1
                    if cl:
                        runs_l.append(cl); cl = 0
                elif r < 0:
                    cl += 1
                    if cw:
                        runs_w.append(cw); cw = 0
            if cw: runs_w.append(cw)
            if cl: runs_l.append(cl)
        max_w = max(runs_w) if runs_w else 0
        max_l = max(runs_l) if runs_l else 0
        avg_w = float(np.mean(runs_w)) if runs_w else 0.0
        avg_l = float(np.mean(runs_l)) if runs_l else 0.0

        # bar-by-bar directional accuracy
        sig_winrate = bar_winrate(series, pos)

        rows.append({
            "Instrument":        inst,
            "Trades":            num_trades,
            "Total PnL":         total_pnl,
            "Mean PnL":          mean_pnl,
            "PnL Std":           std_pnl,
            "Wins":              wins,
            "Losses":            losses,
            "Win Rate":          win_rate,
            "Signal WinRate":    sig_winrate,
            "Max Cons Wins":     max_w,
            "Max Cons Losses":   max_l,
            "Avg Cons Wins":     avg_w,
            "Avg Cons Losses":   avg_l,
        })

    df = pd.DataFrame(rows)
    pd.set_option("display.float_format", "{:.6f}".format)
    print(df.to_string(index=False))


In [None]:
#!/usr/bin/env python3
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

def load_prices(prices_file="prices.txt"):
    """
    Load (timesteps × instruments) price matrix.
    Assumes whitespace-delimited, oldest row first.
    """
    return np.loadtxt(Path(prices_file), delimiter=None)

def find_pips(prices: np.ndarray, threshold: float):
    """
    Zig-zag pivot finder, causal.
    Returns list of (idx, price).
    """
    n = len(prices)
    pips = [(0, prices[0])]
    last_extreme = prices[0]
    direction = None

    for i in range(1, n):
        move = (prices[i] - last_extreme) / last_extreme

        if direction is None:
            if abs(move) >= threshold:
                direction = 'up' if move > 0 else 'down'
                last_extreme = prices[i]
                pips.append((i, prices[i]))
        else:
            if direction=='up' and prices[i] > last_extreme:
                last_extreme = prices[i]
            elif direction=='down' and prices[i] < last_extreme:
                last_extreme = prices[i]

            rev = ((last_extreme - prices[i]) / last_extreme
                   if direction=='up'
                   else (prices[i] - last_extreme) / last_extreme)
            if rev >= threshold:
                pips.append((i, last_extreme))
                direction = 'down' if direction=='up' else 'up'
                last_extreme = prices[i]

    if pips[-1][0] != n-1:
        pips.append((n-1, prices[-1]))
    return pips

def causal_signal(series: np.ndarray, threshold: float):
    """
    One-pass, fully causal up/down signal:
    +1 long, -1 short, 0 flat until first swing.
    """
    n = len(series)
    pos = np.zeros(n, dtype=int)
    last_extreme = series[0]
    direction = None

    for i in range(1, n):
        move = (series[i] - last_extreme) / last_extreme

        if direction is None:
            if abs(move) >= threshold:
                direction = 'up' if move > 0 else 'down'
                last_extreme = series[i]
        else:
            if direction=='up':
                if series[i] > last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (last_extreme - series[i]) / last_extreme
                    if rev >= threshold:
                        direction = 'down'
                        last_extreme = series[i]
            else:
                if series[i] < last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (series[i] - last_extreme) / last_extreme
                    if rev >= threshold:
                        direction = 'up'
                        last_extreme = series[i]

        pos[i] = 1 if direction=='up' else (-1 if direction=='down' else 0)
    return pos

# ───── Example Usage ─────
prices   = load_prices("prices.txt")      # shape (1000, 50)
instr    = 3                              # instrument index
t1, t2   = 850, 900                       # window
threshold = 0.01                          # 1% pip threshold

series = prices[t1:t2+1, instr]
times  = np.arange(t1, t2+1)

pos    = causal_signal(series, threshold)
pivots = find_pips(series, threshold)
idxs, vals = zip(*pivots)

# ───── Plot ─────
fig, ax1 = plt.subplots(figsize=(12,5))

# shade under price
ax1.fill_between(times, series, series.min(),
                 where=pos>0, facecolor='green', alpha=0.2,
                 step='post')
ax1.fill_between(times, series, series.min(),
                 where=pos<0, facecolor='red',   alpha=0.2,
                 step='post')

# price and pivots
ax1.plot(times, series, color='black', linewidth=1, label=f"Inst {instr}")
ax1.scatter(times[list(idxs)], vals,
            color='blue', s=40, label='PIPs')

ax1.set_xlabel("Timestep")
ax1.set_ylabel("Price")
ax1.set_title("Fully Causal PIP-Based Up/Down Indicator")

# ─── overlay actual position predictions ──────────────
ax2 = ax1.twinx()
ax2.step(times, pos, where='post',
         color='gray', linewidth=1, label='Position')
ax2.set_ylim(-1.2, 1.2)
ax2.set_ylabel("Position (+1 long / -1 short)")

# combine legends
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines+lines2, labels+labels2,
           loc='upper left', fontsize='small')

fig.tight_layout()
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

def load_prices(prices_file="prices.txt"):
    """
    Load (timesteps × instruments) price matrix.
    Assumes whitespace-delimited, oldest row first.
    """
    return np.loadtxt(Path(prices_file), delimiter=None)

def find_pips(prices: np.ndarray, threshold: float):
    """
    (Optional) Zig-zag pivot finder, causal:
    you only ever use prices up to the current bar to detect pivots.
    Returns list of (idx, price).
    """
    n = len(prices)
    pips = [(0, prices[0])]
    last_extreme = prices[0]
    direction = None

    for i in range(1, n):
        move = (prices[i] - last_extreme) / last_extreme

        if direction is None:
            if abs(move) >= threshold:
                direction = 'up' if move > 0 else 'down'
                last_extreme = prices[i]
                pips.append((i, prices[i]))
        else:
            # extend swing
            if direction=='up' and prices[i] > last_extreme:
                last_extreme = prices[i]
            elif direction=='down' and prices[i] < last_extreme:
                last_extreme = prices[i]
            # check for reversal
            rev = ((last_extreme - prices[i]) / last_extreme if direction=='up'
                   else (prices[i] - last_extreme) / last_extreme)
            if rev >= threshold:
                pips.append((i, last_extreme))
                direction = 'down' if direction=='up' else 'up'
                last_extreme = prices[i]
    if pips[-1][0] != n-1:
        pips.append((n-1, prices[-1]))
    return pips

def causal_signal(series: np.ndarray, threshold: float):
    """
    One-pass, fully causal up/down signal:
    +1 (long) when current swing is up,
    –1 (short) when it's down,
    0 until first swing confirms.
    """
    n = len(series)
    pos = np.zeros(n, dtype=int)
    last_extreme = series[0]
    direction = None

    for i in range(1, n):
        move = (series[i] - last_extreme) / last_extreme

        if direction is None:
            # wait for first threshold breach
            if abs(move) >= threshold:
                direction = 'up' if move > 0 else 'down'
                last_extreme = series[i]
        else:
            # extend current swing
            if direction=='up' and series[i] > last_extreme:
                last_extreme = series[i]
            elif direction=='down' and series[i] < last_extreme:
                last_extreme = series[i]
            # check for reversal
            rev = ((last_extreme - series[i]) / last_extreme if direction=='up'
                   else (series[i] - last_extreme) / last_extreme)
            if rev >= threshold:
                direction = 'down' if direction=='up' else 'up'
                last_extreme = series[i]

        # set position at this bar
        pos[i] = 1 if direction=='up' else (-1 if direction=='down' else 0)

    return pos

# ───── Example Usage ─────
prices   = load_prices("prices.txt")      # shape (1000, 50)
instr    = 3                             # pick instrument 7
t1, t2 =  850, 900                      # window
threshold = 0.01                      # 0.1% pip threshold

series = prices[:, instr][t1:t2+1]
times  = np.arange(t1, t2+1)

# get fully causal pos signal
pos = causal_signal(series, threshold)

# optional pivot markers (also causal)
pivots = find_pips(series, threshold)
idxs, vals = zip(*pivots)

# ───── Plot ─────
plt.figure(figsize=(12, 5))

# background shading: green when pos>0, red when pos<0
plt.fill_between(times, series.min(), series.max(),
                 where=pos>0, facecolor='green', alpha=0.2, step='post')
plt.fill_between(times, series.min(), series.max(),
                 where=pos<0, facecolor='red',   alpha=0.2, step='post')

# price line + pivots
plt.plot(times, series, color='black', linewidth=1, label=f"Inst {instr}")
plt.scatter(times[list(idxs)], vals, color='blue', s=40, label='PIPs')

# thin black lines at every timestep
for t in times:
    plt.axvline(t, color='black', linewidth=0.3, alpha=0.5)

plt.title("Fully Causal PIP-Based Up/Down Indicator")
plt.xlabel("Timestep")
plt.ylabel("Price") 
plt.legend(loc='upper left', fontsize='small')
plt.tight_layout()
plt.show()


In [None]:
#!/usr/bin/env python3
import numpy as np
import pandas as pd
from pathlib import Path

def load_prices(prices_file="prices.txt"):
    """
    Load (timesteps × instruments) price matrix.
    Assumes whitespace-delimited, oldest row first.
    """
    return np.loadtxt(Path(prices_file), delimiter=None)

def causal_signal(series: np.ndarray, threshold: float):
    """
    One-pass, fully causal up/down signal:
    +1 (long) when current swing is up,
    -1 (short) when it's down,
    0 until first swing confirms.
    """
    n = len(series)
    pos = np.zeros(n, dtype=int)
    last_extreme = series[0]
    direction = None

    for i in range(1, n):
        move = (series[i] - last_extreme) / last_extreme
        if direction is None:
            if abs(move) >= threshold:
                direction = 'up' if move > 0 else 'down'
                last_extreme = series[i]
        else:
            if direction == 'up':
                if series[i] > last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (last_extreme - series[i]) / last_extreme
                    if rev >= threshold:
                        direction = 'down'
                        last_extreme = series[i]
            else:
                if series[i] < last_extreme:
                    last_extreme = series[i]
                else:
                    rev = (series[i] - last_extreme) / last_extreme
                    if rev >= threshold:
                        direction = 'up'
                        last_extreme = series[i]
        pos[i] = 1 if direction == 'up' else (-1 if direction == 'down' else 0)
    return pos

def main():
    # Load prices, with synthetic fallback
    try:
        prices = load_prices("prices.txt")
        print("Loaded prices.txt")
    except FileNotFoundError:
        np.random.seed(0)
        prices = np.cumsum(np.random.randn(1000, 50), axis=0) + 100.0
        print("Using synthetic data (1000×50)")

    T, n_inst = prices.shape
    threshold = 0.01  # pivot threshold

    records = []
    for instr in range(n_inst):
        series = prices[:, instr]
        pos = causal_signal(series, threshold)
        n = len(series)

        # identify trades as contiguous runs of non-zero pos
        trades = []
        last_p = pos[0]
        entry = 0 if last_p != 0 else None
        for i in range(1, n):
            if pos[i] != last_p:
                # close previous trade
                if last_p != 0:
                    trades.append((entry, i-1, last_p))
                # start new if non-zero
                entry = i if pos[i] != 0 else None
                last_p = pos[i]
        # end-of-series close
        if last_p != 0:
            trades.append((entry, n-1, last_p))

        # compute trade returns
        trade_returns = []
        for (e, x, p) in trades:
            ret = (series[x] - series[e]) / series[e] * p
            trade_returns.append(ret)

        num_trades = len(trade_returns)
        total_pnl = np.sum(trade_returns) if num_trades > 0 else 0.0
        mean_pnl  = np.mean(trade_returns) if num_trades > 0 else 0.0
        std_pnl   = np.std(trade_returns, ddof=0) if num_trades > 0 else 0.0
        wins      = sum(1 for r in trade_returns if r > 0)
        losses    = sum(1 for r in trade_returns if r < 0)
        win_rate  = wins / num_trades if num_trades > 0 else np.nan

        # consecutive trade runs
        runs_win = []
        runs_loss = []
        cw = cl = 0
        for r in trade_returns:
            if r > 0:
                cw += 1
                if cl > 0:
                    runs_loss.append(cl)
                    cl = 0
            elif r < 0:
                cl += 1
                if cw > 0:
                    runs_win.append(cw)
                    cw = 0
        if cw > 0:
            runs_win.append(cw)
        if cl > 0:
            runs_loss.append(cl)

        max_cons_wins   = max(runs_win) if runs_win else 0
        max_cons_losses = max(runs_loss) if runs_loss else 0
        avg_cons_wins   = np.mean(runs_win) if runs_win else 0.0
        avg_cons_losses = np.mean(runs_loss) if runs_loss else 0.0

        records.append({
            "Instrument": instr,
            "Num Trades": num_trades,
            "Total PnL": total_pnl,
            "Mean PnL": mean_pnl,
            "PnL Std": std_pnl,
            "Win Rate": win_rate,
            "Wins": wins,
            "Losses": losses,
            "Max Cons Wins": max_cons_wins,
            "Max Cons Losses": max_cons_losses,
            "Avg Cons Wins": avg_cons_wins,
            "Avg Cons Losses": avg_cons_losses
        })

    df = pd.DataFrame(records)
    print("\nTrade-Level Metrics per Instrument:\n")
    print(df.to_string(index=False, float_format="{:.6f}".format))

if __name__ == "__main__":
    main()
