In [1]:
%load_ext autoreload
%autoreload 2

import os, sys, json, time, warnings
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
# Paths (adjust if your layout differs)
ROOT = Path('/home/pduce/ICAIF_2025_Cryptocurrency_Forecasting_Starter_Kit')
DATA = ROOT / "data"
SRC  = ROOT / "src"
SUBM = ROOT / "sample_submission"

# Ensure src is importable
if str(SRC) not in sys.path:
    sys.path.insert(0, str(SRC))

# Create sample_submission dir if missing
SUBM.mkdir(parents=True, exist_ok=True)

SEED = 1337
np.random.seed(SEED)
torch.manual_seed(SEED)

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
DEVICE

'cpu'

In [3]:
# Load dataset files
info_path = DATA / "dataset_info.json"
if info_path.exists():
    info = json.loads(info_path.read_text(encoding="utf-8"))
    print("dataset_info.json loaded. Keys:", list(info.keys()))
    print(json.dumps({k: info[k] for k in ['features','input_len','horizon_len','outputs']}, indent=2))
else:
    print("dataset_info.json not found at", info_path)

# Peek train / x_test
train_path = DATA / "train.pkl"
x_test_path  = DATA / "x_test.pkl"
y_local_path = DATA / "y_test_local.pkl"

train = pd.read_pickle(train_path)
train['event_datetime'] = pd.to_datetime('2024-01-01') + train['time_step']*pd.Timedelta(minutes=1) 
x_test  = pd.read_pickle(x_test_path)
y_test_local = pd.read_pickle(y_local_path)

print("train shape:", train.shape, "| columns:", train.columns.tolist())
print("x_test  shape:", x_test.shape,  "| columns:", x_test.columns.tolist())
print("y_test_local shape:", y_test_local.shape, "| columns:", y_test_local.columns.tolist())

display(train.head(3))
display(x_test.head(3))
display(y_test_local.head(3))

dataset_info.json loaded. Keys: ['freq', 'features', 'input_len', 'horizon_len', 'dtypes', 'outputs', 'sha256']
{
  "features": [
    "close",
    "volume"
  ],
  "input_len": 60,
  "horizon_len": 10,
  "outputs": {
    "train": {
      "columns": [
        "series_id",
        "time_step",
        "close",
        "volume"
      ]
    },
    "x_test": {
      "columns": [
        "window_id",
        "time_step",
        "close",
        "volume"
      ]
    },
    "y_test_local": {
      "columns": [
        "window_id",
        "time_step",
        "close"
      ]
    }
  }
}
train shape: (18331224, 5) | columns: ['series_id', 'time_step', 'close', 'volume', 'event_datetime']
x_test  shape: (3000000, 4) | columns: ['window_id', 'time_step', 'close', 'volume']
y_test_local shape: (20, 3) | columns: ['window_id', 'time_step', 'close']


Unnamed: 0,series_id,time_step,close,volume,event_datetime
0,1,0,0.137,171985.703125,2024-01-01 00:00:00
1,1,1,0.13656,85451.398438,2024-01-01 00:01:00
2,1,2,0.13647,121151.898438,2024-01-01 00:02:00


Unnamed: 0,window_id,time_step,close,volume
0,1,0,0.1126,24976.0
1,1,1,0.1126,0.0
2,1,2,0.1125,2299.0


Unnamed: 0,window_id,time_step,close
0,1,0,0.1131
1,1,1,0.1131
2,1,2,0.113


In [4]:
import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view as swv

class WindowsDatasetVect:
    """
    Vectorized window builder that returns two DataFrames:

    X columns:
      - window_id: integer id of each window
      - time_step: 0..(input_len-1) within the input segment
      - close
      - volume
      - event_datetime

    y columns:
      - window_id: same ids as in X
      - time_step: 0..(horizon_len-1) within the future horizon
      - close: future close
      - prev_close: the last input close (index input_len-1, e.g. 59 when input_len=60)
      - event_datetime: timestamps of the future horizon

    Notes:
      * Requires df to have columns:
          ['series_id','time_step','close','volume','event_datetime']
      * Windows are created per series_id after sorting by time_step.
    """
    def __init__(
        self,
        df : pd.DataFrame = None,
        train_path: str = None,
        window: int = 70,
        input_len: int = 60,
        horizon_len: int = 10,
        rolling: bool = True,
        step_size: int | None = None,
    ) -> None:
        assert input_len + horizon_len == window, "window must equal input_len + horizon_len"
        # default stepping: 1 if rolling, else full non-overlapping windows
        if step_size is None:
            step_size = 1 if rolling else window

        if train_path is not None:
            df = pd.read_pickle(train_path)
        if df is None:
            raise ValueError("Provide either df or train_path")

        required = {'series_id','time_step','close','volume','event_datetime'}
        if not required.issubset(df.columns):
            raise ValueError(f"df missing required columns {required}, found {list(df.columns)}")

        # group per series, sorted by time_step
        groups = {sid: g.sort_values('time_step').reset_index(drop=True)
                  for sid, g in df.groupby('series_id')}

        X_parts: list[pd.DataFrame] = []
        Y_parts: list[pd.DataFrame] = []

        next_win_id = 0

        for _, g in groups.items():
            n = len(g)
            if n < window:
                continue

            close  = g['close' ].to_numpy(np.float32)
            volume = g['volume'].to_numpy(np.float32)
            dt     = g['event_datetime'].to_numpy('datetime64[ns]')

            # sliding windows (shape: (n - window + 1, window))
            w_close  = swv(close,  window_shape=window)[::step_size]
            w_volume = swv(volume, window_shape=window)[::step_size]
            w_dt     = swv(dt,     window_shape=window)[::step_size]

            num_win = w_close.shape[0]
            if num_win == 0:
                continue

            # split into input and horizon
            x_close  = w_close[:,  :input_len]                      # (num_win, input_len)
            x_volume = w_volume[:, :input_len]
            x_dt     = w_dt[:,     :input_len]

            y_close = w_close[:,  input_len:]                       # (num_win, horizon_len)
            y_dt    = w_dt[:,     input_len:]
            prev_c  = x_close[:, -1]                                # (num_win,)

            # window ids for this group
            win_ids = np.arange(next_win_id, next_win_id + num_win, dtype=np.int64)

            # X dataframe chunk
            X_parts.append(pd.DataFrame({
                "window_id":     np.repeat(win_ids, input_len)+1,
                "time_step":     np.tile(np.arange(input_len, dtype=np.int32), num_win),
                "close":         x_close.reshape(-1),
                "volume":        x_volume.reshape(-1),
                "event_datetime": x_dt.reshape(-1),
            }))

            # y dataframe chunk
            Y_parts.append(pd.DataFrame({
                "window_id":     np.repeat(win_ids, horizon_len)+1,
                "time_step":     np.tile(np.arange(horizon_len, dtype=np.int32), num_win),
                "close":         y_close.reshape(-1),
                "prev_close":    np.repeat(prev_c, horizon_len),
                "event_datetime": y_dt.reshape(-1),
            }))

            next_win_id += num_win

        # Public attributes
        if X_parts:
            self.X = pd.concat(X_parts, ignore_index=True)
        else:
            self.X = pd.DataFrame(columns=["window_id","time_step","close","volume","event_datetime"])

        if Y_parts:
            self.y = pd.concat(Y_parts, ignore_index=True)
        else:
            self.y = pd.DataFrame(columns=["window_id","time_step","close","prev_close","event_datetime"])

        self.num_windows = int(self.X["window_id"].max() + 1) if len(self.X) else 0
        self.input_len = input_len
        self.horizon_len = horizon_len
        self.window = window
        self.step_size = step_size

    def __len__(self) -> int:
        return self.num_windows

    def windows(self, window_id: int) -> tuple[pd.DataFrame, pd.DataFrame]:
        """Convenience: return (X_rows, y_rows) for a given window_id."""
        Xw = self.X[self.X["window_id"] == window_id].sort_values("time_step")
        Yw = self.y[self.y["window_id"] == window_id].sort_values("time_step")
        return Xw, Yw


In [5]:
# NEW / UPDATED CODE

import copy
import numpy as np
import torch
from torch.utils.data import Dataset
import pandas as pd
from typing import Dict, Any, List, Optional, Iterable, Callable, Tuple
from concurrent.futures import ProcessPoolExecutor, as_completed

# --- your existing imports ---
from icaif.dataset import TrainWindowSampler, TrainWindowSamplerVect
from athenea.stats.regressions import Ridge
from icaif.metrics import evaluate_all_metrics

SEED = 42  # ensure you define this somewhere

def transform(X):
    """
    X: array-like of shape (n_samples, 60, 2)
       [:, :, 0] = prices (close); [:, :, 1] = volumes
    Returns: list[pd.DataFrame], each of shape (n_samples, 1)
    """
    EPS = 1e-12

    # Arrange as (time x samples)
    X_prices  = pd.DataFrame(X[:, :, 0]).T
    X_volumes = pd.DataFrame(X[:, :, 1]).T

    logp    = np.log(X_prices)
    logrets = logp.diff()  # 1-min log returns, time on rows

    # ----- Your original features (named) -----
    avg_lr        = logrets.mean().to_frame()
    sign_change_share = np.sign(logrets).diff().ne(0).sum().to_frame()
    avg_vol_lr = logrets.mul(X_volumes,axis=0).mean().to_frame()
    vd = np.sign(logrets).mul(X_volumes,axis=0).mean().to_frame()
    rv = logrets.pow(2).mean().to_frame()

    r_5 = logrets.iloc[-5].to_frame(0)
    r_10 = logrets.iloc[-10].to_frame(0)
    r_20 = logrets.iloc[-20].to_frame(0)
    r_30 = logrets.iloc[-30].to_frame(0)
    r_40 = logrets.iloc[-40].to_frame(0)
    r_50 = logrets.iloc[-50].to_frame(0)

    


    features = [
        avg_lr,
        avg_vol_lr,
        vd,
        rv,
        r_5,
        r_10,
        r_20,
        r_30,
        r_40,
        r_50,
    ]

    #sig = jax_gpu_signature(X, depth=3)
    #df_sig = pd.DataFrame(sig)
    #print(df_sig.head())
    return features

from features_compute import build_features_np

def transform_nick(X):
    return [pd.DataFrame(f) for f in build_features_np(X)]

from icaif.metrics_np import evaluate_all_metrics_vectorized

def evaluate(model_results, y_true):

    y_pred = y_true.copy(deep=True)
    y_pred['pred_close'] = np.exp(model_results.loc[model_results.index.repeat(10)].groupby(level=0).cumsum()).reset_index(drop=True)
    y_pred['pred_close'] *= y_pred['prev_close']

    results = evaluate_all_metrics_vectorized(
        y_true=y_true,
        y_pred=y_pred,
    )

    return results

# --------------- NEW: worker function ---------------
def _run_one_fold(
    idx: int,
    train_ids: np.ndarray,
    val_ids: np.ndarray,
    df_train: pd.DataFrame,
    model_or_factory: Any,
) -> Tuple[int, str, Dict[str, Any]]:
    """
    Execute a single fold end-to-end and return (idx, fold_name, metrics).
    Runs in a separate process when parallelized.
    """
    # Rebuild model per process
    model = model_or_factory() if callable(model_or_factory) else copy.deepcopy(model_or_factory)

    # Slice data for this fold
    df_tr = df_train[df_train['series_id'].isin(train_ids)].copy()
    df_va = df_train[df_train['series_id'].isin(val_ids)].copy()

    # Defensive: skip degenerate folds
    if df_tr.empty or df_va.empty:
        return idx, f"val_{'-'.join(map(str, val_ids))}", {}

    train_ds = WindowsDatasetVect(df_tr)
    val_ds = WindowsDatasetVect(df_va)

    X_train = train_ds.X
    y_train = train_ds.y

    X_val = val_ds.X
    y_val = val_ds.y

    X_np_train = X_train.set_index(['window_id','time_step'])[['close','volume']].to_numpy().reshape(-1, 60, 2)
    y_np_train = y_train.set_index(['window_id','time_step'])[['close']].to_numpy().reshape(-1, 10)

    X_np_val = X_val.set_index(['window_id','time_step'])[['close','volume']].to_numpy().reshape(-1, 60, 2)
    y_np_val = y_val.set_index(['window_id','time_step'])[['close']].to_numpy().reshape(-1, 10)

    y_lr_train = pd.Series(np.diff(np.log(y_np_train),axis=1).mean(axis=1)).to_frame()

    features = model.transform(X_np_train)
    features_val = model.transform(X_np_val)

    model.fit(y_lr_train, features)
    y_pred = model.predict(features_val)

    metrics: Dict[str, Any] = model.evaluate(y_pred, y_val)

    fold_name = f"val_{'-'.join(map(str, val_ids))}"
    return idx, fold_name, metrics


# --------------- UPDATED: run_cv with parallelism ---------------
def run_cv(
    df_train: pd.DataFrame,
    n_train: int = 4,
    n_val: int = 1,
    model=None,
    *,
    include_last_fold: bool = False,   # set True to include the final possible window
    n_jobs: int = 1,                   # NEW: #processes; 1 keeps it sequential
    model_factory: Optional[Callable[[], Any]] = None,  # NEW: pass to rebuild model per fold
) -> pd.DataFrame:
    """
    Rolling group-based CV, optionally parallel across folds (process-based).
    Assumptions about `model` / `model_factory`:
      - Either:
          model_factory() -> fresh model with fit/predict/evaluate/transform
        Or:
          model is a picklable object with those methods (we deep-copy it per fold).
    """
    if (model is None) and (model_factory is None):
        raise ValueError("Provide either `model` or a `model_factory` (callable returning a fresh model).")

    series_ids = df_train['series_id'].unique()
    n_series   = len(series_ids)
    n_total    = n_train + n_val

    if n_total > n_series:
        raise ValueError(f"n_train + n_val must be ≤ number of groups ({n_series}).")
    if n_train < 1:
        raise ValueError(f"n_train must be ≥ 1 (got {n_train}).")
    if n_val < 1:
        raise ValueError(f"n_val must be ≥ 1 (got {n_val}).")
    if n_train < n_val:
        raise ValueError(f"n_train must be ≥ n_val (got {n_train} < {n_val}).")

    start = n_total
    stop  = n_series + (1 if include_last_fold else 0)

    # Build fold definitions once
    fold_specs = []
    for i in range(start, stop):
        train_ids = series_ids[i - n_total : i - n_val]
        val_ids   = series_ids[i - n_val   : i]
        fold_specs.append((i, train_ids, val_ids))

    if not fold_specs:
        raise RuntimeError("No folds were produced. Check your data and parameters.")

    # Choose what to pass to workers for model creation
    model_or_factory = model_factory if model_factory is not None else model

    # Sequential path (n_jobs == 1)
    if n_jobs == 1:
        results = [
            _run_one_fold(idx, train_ids, val_ids, df_train, model_or_factory)
            for (idx, train_ids, val_ids) in fold_specs
        ]
    else:
        # Parallel path (processes)
        # Tip: to avoid CPU over-subscription with BLAS, consider setting env vars:
        # OMP_NUM_THREADS=1 MKL_NUM_THREADS=1 NUMEXPR_NUM_THREADS=1
        results = [None] * len(fold_specs)
        with ProcessPoolExecutor(max_workers=n_jobs) as ex:
            futures = {
                ex.submit(_run_one_fold, idx, train_ids, val_ids, df_train, model_or_factory): pos
                for pos, (idx, train_ids, val_ids) in enumerate(fold_specs)
            }
            for fut in as_completed(futures):
                pos = futures[fut]
                results[pos] = fut.result()

    # results: list of (idx, fold_name, metrics)
    # Keep chronological order by fold index
    results.sort(key=lambda t: t[0])
    fold_names = [name for _, name, _ in results]
    metric_rows = [metrics for _, _, metrics in results]

    # Assemble DataFrame: rows = metric names, cols = folds
    metrics_df = pd.DataFrame(metric_rows, index=fold_names).T
    return metrics_df


In [None]:
from athenea.stats.regressions import Ridge
def make_ridge(l2=0.1):
    model = Ridge(l2=l2)
    model.transform = transform
    model.evaluate = evaluate
    return model




metrics = run_cv(
    df_train=train,
    n_train=10,
    n_val=2,
    include_last_fold=True,
    n_jobs=1,
    model_factory=make_ridge(l2=1),
)

In [12]:
metrics.median(axis=1)

MSE            0.000048
MAE            0.002885
IC             0.027106
IR             0.090055
SharpeRatio    0.033906
MDD            0.946407
VaR           -0.004963
ES            -0.008986
dtype: float64

In [None]:
metrics.median(axis=1)

MSE            0.000009
MAE            0.000630
IC             0.092503
IR             0.157267
SharpeRatio    0.090816
MDD            0.957867
VaR           -0.008703
ES            -0.018601
dtype: float64

In [33]:
metrics.median(axis=1)

MSE            0.000009
MAE            0.000628
IC             0.068642
IR             0.126262
SharpeRatio    0.080043
MDD            0.963835
VaR           -0.008796
ES            -0.018692
dtype: float64

In [62]:
y_true = val_ds.y.copy(deep=True)
y_true['pred_close'] = np.exp(y_pred.loc[y_pred.index.repeat(10)].groupby(level=0).cumsum()).reset_index(drop=True)
y_true['pred_close'] *= y_true['prev_close']

In [66]:
from icaif.metrics_np import evaluate_all_metrics_vectorized

evaluate_all_metrics_vectorized(
    y_true=val_ds.y,
    y_pred=y_true,
)

{'MSE': 7.324066618496394e-06,
 'MAE': 0.0006133882334464747,
 'IC': 0.04474149733947167,
 'IR': 0.07659221911983606,
 'SharpeRatio': 0.07708397267707696,
 'MDD': 0.9547024643339382,
 'VaR': -0.008717975181701603,
 'ES': -0.018549726934457684}