In [1]:
# Cell 0 — Install libs (run once if needed)
import sys
!{sys.executable} -m pip install --upgrade pip setuptools wheel

!{sys.executable} -m pip install --quiet numpy pandas matplotlib pillow ffmpeg-python
!{sys.executable} -m pip install --quiet "stable-baselines3==2.3.0" "gymnasium==0.29.1" "shimmy==0.2.1" torch tensorboard MetaTrader5




In [2]:
# Cell 1 — imports
import os, glob, json, time
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import gymnasium as gym
from gymnasium import spaces

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
try:
    from stable_baselines3.common.env_checker import check_env
except Exception:
    check_env = None


Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


In [3]:
# Cell 2 — config
DATA_DIR = os.path.join("data", "multiasset")
MODEL_DIR = os.path.join("models", "multiasset")
os.makedirs(MODEL_DIR, exist_ok=True)

WINDOW = 50
TRAIN_EPISODES = 50
TOTAL_TIMESTEPS = TRAIN_EPISODES * 10_000
N_ENVS = 4
EMBED_DIM = 8

EMBED_FILE = os.path.join(MODEL_DIR, "asset_embeddings.npy")
ASSET_MAP_FILE = os.path.join(DATA_DIR, "asset_to_idx.csv")

MODEL_FILE = os.path.join(MODEL_DIR, "ppo_multiasset.zip")
VEC_FILE = os.path.join(MODEL_DIR, "vec_normalize.pkl")
METRICS_FILE = os.path.join(MODEL_DIR, "eval_metrics.json")

PPO_PARAMS = dict(
    policy="MlpPolicy",
    verbose=1,
    learning_rate=3e-4,
    batch_size=256,
    n_epochs=10,
    ent_coef=0.01
)


In [4]:
# Cell 3 — load normalized CSVs
def load_normalized_datasets(data_dir=DATA_DIR, window=WINDOW):
    csv_files = sorted(glob.glob(os.path.join(data_dir, "*_normalized.csv")))
    if not csv_files:
        raise FileNotFoundError(f"No normalized CSVs found in {data_dir}. Run Notebook 01 first.")
    datasets = {}
    for p in csv_files:
        safe = os.path.basename(p).replace("_normalized.csv","")
        df = pd.read_csv(p, index_col=0, parse_dates=True)
        required = ['o_pc','h_pc','l_pc','c_pc','v_pc','Close_raw']
        if not all(c in df.columns for c in required):
            # try to derive if raw present
            if all(c in df.columns for c in ['open','high','low','close','volume']):
                tmp = df.copy()
                tmp['o_pc'] = tmp['open'].pct_change()
                tmp['h_pc'] = tmp['high'].pct_change()
                tmp['l_pc'] = tmp['low'].pct_change()
                tmp['c_pc'] = tmp['close'].pct_change()
                tmp['v_pc'] = tmp['volume'].pct_change()
                tmp['Close_raw'] = tmp['close']
                tmp = tmp.dropna()
                df = tmp[required]
            else:
                raise ValueError(f"{p} missing required columns and cannot be auto-converted.")
        else:
            df = df[required].dropna()
        if len(df) > window:
            datasets[safe] = df
    if not datasets:
        raise ValueError("No datasets passed the minimum length requirement.")
    print(f"Loaded {len(datasets)} datasets")
    return datasets

# Quick load
# datasets = load_normalized_datasets(DATA_DIR, WINDOW)


In [5]:
# Cell 4 — asset map & embeddings (ensures ordering matches datasets)
def load_asset_map_and_embeddings(datasets, asset_map_file=ASSET_MAP_FILE, embed_file=EMBED_FILE, embed_dim=EMBED_DIM):
    safe_names = list(datasets.keys())
    # if file exists and matches keys -> load
    if os.path.exists(asset_map_file):
        try:
            df_map = pd.read_csv(asset_map_file, index_col=0, squeeze=False)
            # attempt to get mapping
            ser = pd.read_csv(asset_map_file, index_col=0)
            loaded = ser.to_dict()[ser.columns[0]]
            if set(loaded.keys()) == set(safe_names):
                # reorder according to datasets order
                asset_to_idx = {s: int(loaded[s]) for s in safe_names}
            else:
                # overwrite to match datasets order
                asset_to_idx = {s:i for i,s in enumerate(safe_names)}
                pd.Series(asset_to_idx).to_csv(asset_map_file)
        except Exception:
            asset_to_idx = {s:i for i,s in enumerate(safe_names)}
            pd.Series(asset_to_idx).to_csv(asset_map_file)
    else:
        asset_to_idx = {s:i for i,s in enumerate(safe_names)}
        pd.Series(asset_to_idx).to_csv(asset_map_file)

    n_assets = len(asset_to_idx)
    if os.path.exists(embed_file):
        emb = np.load(embed_file)
        if emb.shape[0] != n_assets:
            emb = np.random.randn(n_assets, embed_dim).astype(np.float32)
            np.save(embed_file, emb)
            print("Embedding count mismatch -> recreated embeddings.")
    else:
        emb = np.random.randn(n_assets, embed_dim).astype(np.float32)
        np.save(embed_file, emb)
        print("Created new embeddings:", embed_file)

    return emb.astype(np.float32), asset_to_idx

# Example:
# embeddings, asset_map = load_asset_map_and_embeddings(datasets)


In [6]:
# Cell 5 — MT5 helpers + lot size calculation (graceful fallback)
import MetaTrader5 as mt5

def mt5_ensure_init():
    try:
        return mt5.initialize()
    except Exception:
        return False

def get_symbol_info(symbol):
    if not mt5_ensure_init():
        return None
    try:
        return mt5.symbol_info(symbol)
    except Exception:
        return None

def pip_value_per_lot_from_mt5(symbol, entry_price):
    info = get_symbol_info(symbol)
    if info is None:
        return None, None
    try:
        pip_val = info.trade_tick_value / info.trade_tick_size
    except Exception:
        contract_size = getattr(info, "trade_contract_size", 100000.0)
        pip_val = (info.point / entry_price) * contract_size
    return float(pip_val), float(info.point)

def calculate_lot_size_mt5_fallback(symbol, balance, risk_percent, entry_price, stop_loss_price):
    dollar_risk = balance * float(risk_percent)
    pip_size = 0.01 if "JPY" in symbol else 0.0001
    pip_risk = abs(entry_price - stop_loss_price) / pip_size
    if pip_risk <= 0:
        return 0.0
    pip_value_per_lot = (pip_size / entry_price) * 100000.0
    lot = dollar_risk / (pip_risk * pip_value_per_lot)
    return round(max(lot, 0.0), 2)

def calculate_lot_size(symbol, balance, risk_percent, entry_price, stop_loss_price):
    pip_val, point = pip_value_per_lot_from_mt5(symbol, entry_price)
    pip_risk = None
    if point not in (None, 0):
        pip_risk = abs(entry_price - stop_loss_price) / point
    else:
        pip_risk = abs(entry_price - stop_loss_price) / (0.01 if "JPY" in symbol else 0.0001)
    if pip_risk <= 0:
        return 0.0
    dollar_risk = balance * float(risk_percent)
    if pip_val is not None:
        lot = dollar_risk / (pip_risk * pip_val)
        info = get_symbol_info(symbol)
        if info is not None:
            try:
                step = float(info.volume_step)
                if step > 0:
                    lot = round(lot / step) * step
            except Exception:
                pass
        return round(max(lot, 0.0), 2)
    return calculate_lot_size_mt5_fallback(symbol, balance, risk_percent, entry_price, stop_loss_price)


In [7]:
# Cell 6 — MultiAssetEnv with embeddings + position sizing
import gym
import numpy as np
from gym import spaces

class MultiAssetEnv(gym.Env):
    """
    MultiAssetEnv(datasets, asset_map, embeddings, asset_to_symbol=None, window=50, ...)
    - datasets: dict safe_name -> dataframe with columns o_pc,h_pc,l_pc,c_pc,v_pc,Close_raw
    - asset_map: dict safe_name -> index (embedding index)
    - embeddings: ndarray (n_assets, embed_dim)
    - asset_to_symbol: optional mapping safe_name -> MT5 symbol (if different)
    """
    metadata = {"render.modes": ["human"]}

    def __init__(self, datasets, asset_map, embeddings, asset_to_symbol=None, window=WINDOW,
                 initial_balance=10_000, risk_per_trade=0.01, leverage=100):
        super().__init__()
        self.datasets = datasets
        self.asset_map = asset_map
        self.embeddings = embeddings
        self.asset_to_symbol = asset_to_symbol or {s: s for s in datasets.keys()}
        self.window = int(window)
        self.initial_balance = float(initial_balance)
        self.risk_per_trade = float(risk_per_trade)
        self.leverage = leverage

        self.safe_names = list(self.datasets.keys())
        self.n_assets = len(self.safe_names)
        self.asset_idx = 0
        self.current_safe = self.safe_names[self.asset_idx]
        self.data = self.datasets[self.current_safe]
        self.ptr = self.window
        self.balance = float(self.initial_balance)
        self.position = 0
        self.position_entry_price = None
        self.position_lot = 0.0
        self.position_sl = None
        self.trades = []

        self.embed_dim = self.embeddings.shape[1] if self.embeddings is not None else 0
        n_features = 5
        obs_dim = n_features + 1 + self.embed_dim
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.window, obs_dim), dtype=np.float32)
        self.action_space = spaces.Discrete(3)

        self.np_random = None
        self.seed(None)

        # trackers for rendering / logging
        self.prices = {s: [] for s in self.safe_names}
        self.actions = {s: [] for s in self.safe_names}

    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        return [seed]

    def reset(self, seed=None, options=None):
        if seed is not None:
            self.seed(seed)
        # sample a random asset index
        self.asset_idx = int(self.np_random.integers(0, self.n_assets))
        self.current_safe = self.safe_names[self.asset_idx]
        self.data = self.datasets[self.current_safe]
        self.ptr = self.window
        self.balance = float(self.initial_balance)
        self.position = 0
        self.position_entry_price = None
        self.position_lot = 0.0
        self.position_sl = None
        self.trades = []

        # initialize trackers with initial window close prices
        init_prices = list(self.data['Close_raw'].iloc[self.ptr-self.window:self.ptr].values)
        self.prices[self.current_safe] = init_prices.copy()
        self.actions[self.current_safe] = [0]*len(init_prices)
        return self._get_obs(), {}

    def _get_obs(self):
        window_data = self.data.iloc[self.ptr-self.window:self.ptr]
        feat = window_data[['o_pc','h_pc','l_pc','c_pc','v_pc']].values.astype(np.float32)
        balance_col = np.full((self.window,1), float(self.balance)/float(self.initial_balance), dtype=np.float32)
        if self.embeddings is not None and self.asset_idx < self.embeddings.shape[0]:
            emb = np.tile(self.embeddings[self.asset_idx].reshape(1,-1).astype(np.float32),(self.window,1))
        else:
            emb = np.zeros((self.window, self.embed_dim), dtype=np.float32)
        obs = np.concatenate([feat, balance_col, emb], axis=1)
        return obs

    def _close_position(self, exit_price):
        if self.position == 0 or self.position_lot <= 0:
            return 0.0
        symbol = self.asset_to_symbol.get(self.current_safe, self.current_safe)
        pip_val, point = pip_value_per_lot_from_mt5(symbol, exit_price)
        if pip_val is None:
            pip_size = 0.01 if "JPY" in symbol else 0.0001
            pip_val = (pip_size / exit_price) * 100000.0
            point = pip_size
        price_diff = exit_price - self.position_entry_price
        pips = price_diff / (point if point not in (None,0) else 0.0001)
        pips_signed = pips * (1 if self.position==1 else -1)
        pnl_usd = pips_signed * pip_val * self.position_lot
        self.balance += pnl_usd
        trade = {
            "asset": self.current_safe,
            "symbol": symbol,
            "entry": float(self.position_entry_price),
            "exit": float(exit_price),
            "position": int(self.position),
            "lot": float(self.position_lot),
            "pnl": float(pnl_usd),
            "balance_after": float(self.balance),
            "timestamp": str(self.data.index[self.ptr])
        }
        self.trades.append(trade)
        self.position = 0
        self.position_entry_price = None
        self.position_lot = 0.0
        self.position_sl = None
        return float(pnl_usd)

    def step(self, action):
        prev_close = float(self.data['Close_raw'].iloc[self.ptr-1])
        new_close = float(self.data['Close_raw'].iloc[self.ptr])
        reward = 0.0

        # If opening a new opposite position, close existing first
        if action == 0:
            # HOLD - no explicit close here
            pass
        else:
            if self.position != 0:
                pnl_realized = self._close_position(prev_close)
                reward += pnl_realized / max(1.0, self.initial_balance)
            direction = 1 if action==1 else -1
            recent = self.data['Close_raw'].iloc[self.ptr-self.window:self.ptr]
            vol = float(recent.pct_change().std() * recent.iloc[-1]) if recent.pct_change().std() != 0 else 0.0
            if vol <= 0 or np.isnan(vol):
                sl_dist = max(0.0005, 0.001 * recent.iloc[-1])
            else:
                sl_dist = max(vol * 1.5, 0.0005 * recent.iloc[-1])
            if direction == 1:
                stop_loss = new_close - sl_dist
            else:
                stop_loss = new_close + sl_dist
            symbol = self.asset_to_symbol.get(self.current_safe, self.current_safe)
            try:
                lot = calculate_lot_size(symbol, float(self.balance), float(self.risk_per_trade), float(new_close), float(stop_loss))
            except Exception:
                lot = calculate_lot_size_mt5_fallback(symbol, float(self.balance), float(self.risk_per_trade), float(new_close), float(stop_loss))
            lot = max(0.01, round(min(lot, 100.0), 2))
            self.position = direction
            self.position_entry_price = new_close
            self.position_lot = lot
            self.position_sl = stop_loss
            open_trade = {
                "asset": self.current_safe,
                "symbol": symbol,
                "entry": float(self.position_entry_price),
                "position": int(self.position),
                "lot": float(self.position_lot),
                "stop_loss": float(self.position_sl),
                "timestamp": str(self.data.index[self.ptr])
            }
            self.trades.append(open_trade)

        # shaping reward: unrealized PnL scaled by initial balance
        reward_shaping = 0.0
        if self.position != 0 and self.position_entry_price is not None:
            symbol = self.asset_to_symbol.get(self.current_safe, self.current_safe)
            pip_val, point = pip_value_per_lot_from_mt5(symbol, new_close)
            if pip_val is None:
                pip_size = 0.01 if "JPY" in symbol else 0.0001
                pip_val = (pip_size / new_close) * 100000.0
                point = pip_size
            price_diff = new_close - self.position_entry_price
            pips_signed = (price_diff / point) * (1 if self.position==1 else -1)
            unrealized = pips_signed * pip_val * self.position_lot
            reward_shaping = unrealized / max(1.0, self.initial_balance)
        reward += float(reward_shaping)

        # trackers
        safe = self.current_safe
        self.prices[safe].append(new_close)
        self.actions[safe].append(int(action))

        self.ptr += 1
        done = self.ptr >= len(self.data)
        info = {"balance": float(self.balance), "asset": self.current_safe}
        return self._get_obs(), float(reward), bool(done), False, info

    def render(self, mode='human', animate=False, interval=200, save_path=None, fps=10):
        import matplotlib.animation as animation
        assets_with_data = [s for s in self.safe_names if len(self.prices.get(s,[]))>0]
        if not assets_with_data:
            print("⚠️ Nothing recorded to render yet.")
            return
        n = len(assets_with_data)
        fig, axes = plt.subplots(n,1, figsize=(10,4*n), sharex=True)
        if n==1:
            axes=[axes]
        if not animate:
            for i,s in enumerate(assets_with_data):
                prices = np.array(self.prices[s])
                acts = np.array(self.actions[s])
                steps = np.arange(len(prices))
                axes[i].plot(steps, prices, label=f"{s} price")
                axes[i].scatter(steps[acts==1], prices[acts==1], marker='^', color='green', label='Buy')
                axes[i].scatter(steps[acts==2], prices[acts==2], marker='v', color='red', label='Sell')
                axes[i].legend(); axes[i].grid(True)
            plt.tight_layout()
            if save_path and save_path.lower().endswith(('.png','.jpg','.pdf')):
                plt.savefig(save_path, dpi=200); print("Saved static render to", save_path)
            plt.show()
            return
        # animated
        lines, buys, sells = [], [], []
        for ax, s in zip(axes, assets_with_data):
            line, = ax.plot([],[],lw=2)
            buy_sc = ax.scatter([],[], marker='^', color='green')
            sell_sc = ax.scatter([],[], marker='v', color='red')
            lines.append(line); buys.append(buy_sc); sells.append(sell_sc)
            arr = np.array(self.prices[s])
            ax.set_xlim(0, max(1,len(arr)))
            ax.set_ylim(np.min(arr)*0.98, np.max(arr)*1.02)
            ax.set_title(s)
            ax.grid(True)
        def update(frame):
            artists=[]
            for i,s in enumerate(assets_with_data):
                pr = np.array(self.prices[s]); ac = np.array(self.actions[s])
                f = frame if frame<=len(pr) else len(pr)
                x = np.arange(f); y = pr[:f]
                lines[i].set_data(x,y); artists.append(lines[i])
                buys_idx = x[ac[:f]==1] if f>0 else []
                sells_idx = x[ac[:f]==2] if f>0 else []
                if len(buys_idx)>0:
                    buys[i].set_offsets(np.c_[buys_idx, pr[:f][ac[:f]==1]])
                else:
                    buys[i].set_offsets([])
                if len(sells_idx)>0:
                    sells[i].set_offsets(np.c_[sells_idx, pr[:f][ac[:f]==2]])
                else:
                    sells[i].set_offsets([])
                artists += [buys[i], sells[i]]
            return artists
        frames = len(self.prices[assets_with_data[0]])
        ani = animation.FuncAnimation(fig, update, frames=frames, interval=interval, blit=True, repeat=False)
        if save_path:
            ext = os.path.splitext(save_path)[1].lower()
            if ext=='.mp4':
                ani.save(save_path, writer='ffmpeg', fps=fps)
            elif ext=='.gif':
                ani.save(save_path, writer='pillow', fps=fps)
            print("Saved animation to", save_path)
        plt.show()


In [8]:
# Cell 7 — env factory for DummyVecEnv
def make_env_factory(datasets, asset_map, embeddings, asset_to_symbol, window):
    def _init():
        return MultiAssetEnv(datasets, asset_map, embeddings, asset_to_symbol=asset_to_symbol, window=window)
    return _init


In [9]:
# Cell 8 — train & save
def train_and_save(datasets, asset_map, embeddings, asset_to_symbol=None, total_timesteps=TOTAL_TIMESTEPS, n_envs=N_ENVS):
    os.makedirs(MODEL_DIR, exist_ok=True)
    env_fns = [make_env_factory(datasets, asset_map, embeddings, asset_to_symbol or {s:s for s in datasets.keys()}, WINDOW) for _ in range(n_envs)]
    vec_env = DummyVecEnv(env_fns)
    vec_env = VecNormalize(vec_env, norm_obs=True, norm_reward=False, clip_obs=10.)
    model = PPO(**PPO_PARAMS, env=vec_env, tensorboard_log=os.path.join(MODEL_DIR, "tensorboard"))
    print("Starting training:", total_timesteps, "timesteps")
    model.learn(total_timesteps=total_timesteps)
    model.save(MODEL_FILE)
    vec_env.save(VEC_FILE)
    print("Saved model and VecNormalize wrapper.")
    return model, vec_env


In [10]:
# Cell 9 — safe action extraction (use in evaluation)
def extract_action_scalar(action):
    if isinstance(action, (int, np.integer)):
        return int(action)
    a = np.array(action)
    if a.size == 1:
        return int(a.flatten()[0])
    return int(a.flatten()[0])


In [11]:
# Cell 10 — evaluation (deterministic)
def evaluate_model(model_path, datasets, embeddings, n_episodes=10, window=WINDOW):
    model = PPO.load(model_path)
    emb_dummy = embeddings if embeddings is not None else np.zeros((len(datasets), EMBED_DIM), dtype=np.float32)
    asset_map = {s:i for i,s in enumerate(datasets.keys())}
    env = MultiAssetEnv(datasets, asset_map, emb_dummy, window=window)
    rewards = []
    for ep in range(n_episodes):
        obs, _ = env.reset()
        done = False
        ep_r = 0.0
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            act = extract_action_scalar(action)
            obs, reward, done, truncated, info = env.step(act)
            ep_r += float(reward)
        rewards.append(ep_r)
    metrics = {
        "n_episodes": n_episodes,
        "mean_reward": float(np.mean(rewards)),
        "std_reward": float(np.std(rewards)),
        "total_reward": float(np.sum(rewards)),
        "timestamp": datetime.utcnow().isoformat()
    }
    with open(METRICS_FILE, "w") as fh:
        json.dump(metrics, fh, indent=2)
    print("Evaluation metrics:", metrics)
    return metrics


In [None]:
# Cell 11 — run pipeline
datasets = load_normalized_datasets(DATA_DIR, WINDOW)
embeddings, asset_map = load_asset_map_and_embeddings(datasets)
# If needed, asset_to_symbol mapping (safe->MT5 symbol). Default uses safe==symbol
asset_to_symbol = {s: s for s in datasets.keys()}

# Train
model, vec_env = train_and_save(datasets, asset_map, embeddings, asset_to_symbol, TOTAL_TIMESTEPS, N_ENVS)

# Evaluate
metrics = evaluate_model(MODEL_FILE, datasets, embeddings, n_episodes=10, window=WINDOW)
print("Done.")


Loaded 16 datasets




Using cpu device
Starting training: 500000 timesteps
Logging to models\multiasset\tensorboard\PPO_5


In [None]:
# Trend & Stop-hunt utilities
# Copy this whole cell into your notebook.

import numpy as np
import pandas as pd
from typing import Tuple, Dict, List
from collections import deque, Counter
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_recall_fscore_support
import random

# -----------------------------
# 1) Local extrema / swing points
# -----------------------------
def find_swing_points(prices: pd.Series, order:int = 5) -> pd.DataFrame:
    """
    Detect local highs and lows (simple approach).
    Args:
        prices: pd.Series of close prices indexed by datetime
        order: number of bars on each side to use for local extremum detection
    Returns:
        DataFrame with columns ['is_high','is_low'] boolean mask aligned with prices.
    """
    n = len(prices)
    is_high = np.zeros(n, dtype=bool)
    is_low = np.zeros(n, dtype=bool)
    vals = prices.values
    for i in range(order, n-order):
        left = vals[i-order:i]
        right = vals[i+1:i+1+order]
        center = vals[i]
        if (center > left).all() and (center > right).all():
            is_high[i] = True
        if (center < left).all() and (center < right).all():
            is_low[i] = True
    return pd.DataFrame({'is_high': is_high, 'is_low': is_low}, index=prices.index)

# -----------------------------
# 2) Structure-based trend (HH/HL and LH/LL)
# -----------------------------
def trend_by_structure(prices: pd.Series,
                       order:int = 5,
                       lookback_swings:int = 6) -> pd.Series:
    """
    Determine trend using recent swing structure: HH/HL -> uptrend, LH/LL -> downtrend, else 0 (sideways).
    Args:
        prices: close price series
        order: window for swing point detection
        lookback_swings: number of recent swings to evaluate pattern
    Returns:
        pd.Series of {-1,0,1} per timestamp (trend label)
    """
    swings = find_swing_points(prices, order=order)
    # extract actual swing points (value + index)
    highs_idx = list(prices[swings['is_high']].index)
    lows_idx = list(prices[swings['is_low']].index)

    # Create combined sorted list of swings (time, type, price)
    combined = []
    for idx in highs_idx:
        combined.append((idx, 'H', prices.loc[idx]))
    for idx in lows_idx:
        combined.append((idx, 'L', prices.loc[idx]))
    combined.sort(key=lambda x: x[0])

    # Build swing sequence (type, price)
    seq = [(t, typ, p) for t,typ,p in combined]

    # For each timestamp, infer trend from last lookback_swings swings
    trend = pd.Series(0, index=prices.index)
    # create sliding list of swing types/prices
    for i in range(len(seq)):
        end = i+1
        start = max(0, end - lookback_swings)
        window = seq[start:end]
        types = [t[1] for t in window]
        prices_window = [t[2] for t in window]
        # We need at least 3 swings to infer a structure
        if len(window) < 3:
            continue
        # Determine monotonic patterns:
        # Uptrend if highs increasing and lows increasing (both monotonic increasing)
        highs = [p for (ti, tp, p) in window if tp=='H']
        lows  = [p for (ti, tp, p) in window if tp=='L']
        def is_strictly_increasing(arr):
            return all(x2 > x1 for x1,x2 in zip(arr, arr[1:])) if len(arr)>=2 else False
        def is_strictly_decreasing(arr):
            return all(x2 < x1 for x1,x2 in zip(arr, arr[1:])) if len(arr)>=2 else False
        idx_time = window[-1][0]  # assign label at last swing time
        if is_strictly_increasing(highs) and is_strictly_increasing(lows):
            trend.loc[idx_time] = 1
        elif is_strictly_decreasing(highs) and is_strictly_decreasing(lows):
            trend.loc[idx_time] = -1
        else:
            trend.loc[idx_time] = 0

    # propagate last known swing label forward to all timestamps until next swing
    last_val = 0
    for t in trend.index:
        if trend.loc[t] != 0:
            last_val = trend.loc[t]
        trend.loc[t] = last_val
    return trend.fillna(0).astype(int)

# -----------------------------
# 3) Regression slope label (future-return style)
# -----------------------------
def trend_by_regression(prices: pd.Series,
                        future_window:int = 50,
                        up_thresh:float = 0.005,
                        down_thresh:float = -0.005,
                        vol_window:int = 50) -> pd.Series:
    """
    Label trend by fitting a linear regression to FUTURE closes and thresholding normalized slope.
    Args:
        prices: pd.Series closes
        future_window: horizon to compute slope over future bars
        up_thresh/down_thresh: thresholds on normalized slope for label decision
        vol_window: window for recent volatility normalization
    Returns:
        pd.Series of {-1,0,1} shape len(prices), with NaN near end where future not available (filled 0)
    """
    n = len(prices)
    labels = np.zeros(n, dtype=int)
    close_vals = prices.values
    for t in range(n - future_window):
        y = close_vals[t+1:t+1+future_window]
        x = np.arange(len(y))
        # linear fit slope
        a = np.polyfit(x, y, 1)[0]
        # normalize slope by price and recent volatility
        recent_diff = np.diff(prices.values[max(0, t-vol_window):t+1]) if t>0 else np.array([0.0])
        recent_vol = np.std(recent_diff) if recent_diff.size>0 else 1.0
        if recent_vol <= 0:
            recent_vol = 1e-8
        norm_slope = a / (prices.values[t] * recent_vol)
        if norm_slope > up_thresh:
            labels[t] = 1
        elif norm_slope < down_thresh:
            labels[t] = -1
        else:
            labels[t] = 0
    return pd.Series(labels, index=prices.index)

# -----------------------------
# 4) Combined/ensemble trend label (for auxiliary task)
# -----------------------------
def combined_trend_label(prices: pd.Series,
                         structure_order:int=5, structure_lookback:int=6,
                         future_window:int=50, up_thresh:float=0.005, down_thresh:float=-0.005,
                         voting:bool=True) -> pd.Series:
    """
    Combine structure-based and regression-based labels using voting/priority:
      - If structure and regression agree -> that label
      - If disagree -> optionally prioritize regression or return 0
      - voting=True uses majority (if more signals available you can extend)
    Returns pd.Series {-1,0,1}
    """
    struct = trend_by_structure(prices, order=structure_order, lookback_swings=structure_lookback)
    reg = trend_by_regression(prices, future_window=future_window, up_thresh=up_thresh, down_thresh=down_thresh)
    combined = pd.Series(0, index=prices.index)
    for t in prices.index:
        s = int(struct.get(t, 0))
        r = int(reg.get(t, 0))
        if s == r:
            combined.loc[t] = s
        else:
            # if one indicates strong trend and the other 0, take the non-zero
            if s != 0 and r == 0:
                combined.loc[t] = s
            elif r != 0 and s == 0:
                combined.loc[t] = r
            else:
                # conflict: prefer regression (short-term future), but you can change to s or 0
                combined.loc[t] = r
    return combined

# -----------------------------
# 5) Detect stop-loss hunt spikes
# -----------------------------
def detect_stop_loss_hunt(prices: pd.Series,
                          intrabar_returns: pd.Series=None,
                          spike_multiplier:float=4.0,
                          short_lived_window:int=5,
                          min_spike_size:float=None) -> pd.Series:
    """
    Detect potential stop-loss hunt events:
      - Find spikes in returns that are large relative to recent distribution (z-score)
      - Mark spikes that reverse within `short_lived_window` bars (short-lived extremes)
    Args:
        prices: close price series
        intrabar_returns: optional series of bar returns (if you have tick intrabar data use that)
        spike_multiplier: multiple of rolling std to mark spike
        short_lived_window: bars to check for reversal
        min_spike_size: absolute minimum return magnitude to qualify as spike (optional)
    Returns:
        pd.Series boolean indexed by prices indicating suspected stop-hunt bar (True)
    """
    # compute returns if not supplied
    if intrabar_returns is None:
        returns = prices.pct_change().fillna(0)
    else:
        returns = intrabar_returns.reindex(prices.index).fillna(0)
    rolling_std = returns.rolling(window=50, min_periods=5).std().fillna(0.0)
    z = returns / (rolling_std.replace(0, np.nan))
    # candidate spikes
    is_spike = (z.abs() > spike_multiplier)
    if min_spike_size is not None:
        is_spike &= (returns.abs() >= min_spike_size)
    is_spike = is_spike.fillna(False)
    # mark short-lived reversal: spike followed by reversal of sign within short_lived_window
    n = len(returns)
    idxs = prices.index
    hunt_flag = pd.Series(False, index=prices.index)
    for i in range(n):
        if not is_spike.iloc[i]:
            continue
        sign = np.sign(returns.iloc[i])
        # check next short_lived_window bars for return of opposite sign with magnitude > 50% spike
        window_end = min(n, i+1+short_lived_window)
        reversed_found = False
        for j in range(i+1, window_end):
            if np.sign(returns.iloc[j]) == -sign and abs(returns.iloc[j]) > 0.5 * abs(returns.iloc[i]):
                reversed_found = True
                break
        if reversed_found:
            hunt_flag.iloc[i] = True
    return hunt_flag

# -----------------------------
# 6) Simulate stop-hunt noise (inject spikes) for robust training
# -----------------------------
def inject_stop_hunt_noise(prices: pd.Series,
                           prob:float = 0.002,
                           spike_multiplier:float=0.02,
                           rng_seed:int = None) -> pd.Series:
    """
    Produce a new price series with occasional injected spikes (up or down) that reverse quickly.
    Args:
        prices: original close price series (pd.Series)
        prob: probability of injecting a spike at any bar
        spike_multiplier: relative price change magnitude for spike (fraction of price)
        rng_seed: seed for reproducibility
    Returns:
        pd.Series with same index and modified prices
    """
    rng = np.random.default_rng(rng_seed)
    p = prices.copy().astype(float)
    n = len(p)
    for i in range(2, n-3):
        if rng.random() < prob:
            sign = rng.choice([1, -1])
            spike_size = (1.0 + sign * spike_multiplier)
            # apply instantaneous spike at bar i and immediate partial reversal next bar
            p.iloc[i] = p.iloc[i] * spike_size
            # partial reversal next bar:
            p.iloc[i+1] = p.iloc[i] * (1.0 - 0.8 * sign * spike_multiplier)
            # ensure subsequent next bars adjust smoothly (simple linear interpolation)
            # note: this is simple; you can adapt for ticks intrabar injection
    return p

# -----------------------------
# 7) Confusion metrics vs true trend / compare to baseline
# -----------------------------
def trend_confusion_report(y_true: pd.Series, y_pred: pd.Series, labels=[-1,0,1]) -> Dict:
    """
    Compute confusion matrix and classification metrics for trend labels.
    Returns dict with confusion matrix, accuracy, and sklearn classification report.
    """
    # align
    y_true, y_pred = y_true.align(y_pred, join='inner', fill_value=0)
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, labels=labels, zero_division=0)
    cls_report = classification_report(y_true, y_pred, labels=labels, zero_division=0, output_dict=True)
    return {
        'confusion_matrix': cm.tolist(),
        'labels': labels,
        'accuracy': float(acc),
        'precision_per_label': prec.tolist(),
        'recall_per_label': rec.tolist(),
        'f1_per_label': f1.tolist(),
        'classification_report': cls_report
    }

def compare_with_baseline(y_true: pd.Series, y_pred: pd.Series, baseline_preds: Dict[str, pd.Series]) -> Dict:
    """
    Compare model predictions against one or more baseline prediction series.
    baseline_preds: dict name->pd.Series
    Returns metrics dict keyed by model/baseline name.
    """
    results = {}
    results['model'] = trend_confusion_report(y_true, y_pred)
    for name, bs in baseline_preds.items():
        results[name] = trend_confusion_report(y_true, bs)
    return results

# -----------------------------
# 8) Normalization + seeds + cross-validation utilities
# -----------------------------
def normalize_per_asset(df: pd.DataFrame, feature_cols:List[str]=['o_pc','h_pc','l_pc','c_pc','v_pc']) -> Tuple[pd.DataFrame, pd.Series, pd.Series]:
    """
    Z-score normalize selected feature columns and return (normalized_df, means, stds)
    """
    means = df[feature_cols].mean()
    stds = df[feature_cols].std().replace(0,1.0)
    norm = df.copy()
    norm[feature_cols] = (df[feature_cols] - means) / stds
    return norm, means, stds

def set_global_seed(seed:int):
    """Set python/numpy/random torch seeds for reproducibility (call at top of training/eval)."""
    import random, os
    random.seed(seed)
    np.random.seed(seed)
    try:
        import torch
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
    except Exception:
        pass
    os.environ['PYTHONHASHSEED'] = str(seed)

# -----------------------------
# 9) Example baseline predictors
# -----------------------------
def simple_baselines(prices: pd.Series, short_window:int=5, long_window:int=20) -> Dict[str, pd.Series]:
    """
    Compute some simple baseline trend predictors (no external indicators other than moving averages of price)
    Returns dict of name->pd.Series of {-1,0,1}
    """
    close = prices
    ma_short = close.rolling(short_window).mean()
    ma_long = close.rolling(long_window).mean()
    buy = (ma_short > ma_long).astype(int)
    # Convert to -1/0/1: if short > long -> 1 else -1 (aggressive) or 0 for neutral thresholding
    baseline = pd.Series(0, index=prices.index)
    baseline[ma_short > ma_long] = 1
    baseline[ma_short < ma_long] = -1
    return {'ma_crossover': baseline.fillna(0).astype(int)}

# -----------------------------
# 10) Usage examples (pseudo)
# -----------------------------
# Example pseudo-code to produce labels and evaluate:
# raw_df: a DataFrame for a single asset with 'close' or 'Close_raw' column.
# prices = raw_df['Close_raw'] if 'Close_raw' in raw_df.columns else raw_df['close']
# labels_aux = combined_trend_label(prices)                    # supervisory trend labels
# stophunt_flags = detect_stop_loss_hunt(prices)              # boolean series of suspected hunts
# baseline_preds = simple_baselines(prices)
# results = compare_with_baseline(labels_aux, model_preds_series, baseline_preds)
# confusion = trend_confusion_report(labels_aux, model_preds_series)
#
# To make model robust:
# p_noisy = inject_stop_hunt_noise(prices, prob=0.002, spike_multiplier=0.02, rng_seed=42)
# use p_noisy in some episodes during RL training (data augmentation)
