# Setup and Environment
This notebook serves production predictions for the Kaggle Hull Tactical - Market Prediction time-series API. It is optimized for fast startup, deterministic behavior, and robust logging.

In [1]:
# Imports, logging, seeds, thread limits, versions
import os, sys, json, gc, time, logging, random, platform, warnings
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, List
import numpy as np
import pandas as pd
import joblib

# Optional backends (only imported if artifacts are present)
try:
    import lightgbm as lgb
except Exception:
    lgb = None
try:
    import onnxruntime as ort
except Exception:
    ort = None
try:
    import torch
    torch.set_grad_enabled(False)
except Exception:
    torch = None

# Reduce noisy sklearn pickle-version warnings
try:
    warnings.filterwarnings(
        'ignore',
        message='Trying to unpickle estimator StandardScaler',
        category=UserWarning,
        module='sklearn.base'
    )
except Exception:
    pass

# Configure logger
LOGGER = logging.getLogger('fortress')
if not LOGGER.handlers:
    handler = logging.StreamHandler(sys.stdout)
    fmt = logging.Formatter('[%(asctime)s] %(levelname)s - %(message)s', datefmt='%H:%M:%S')
    handler.setFormatter(fmt)
    LOGGER.addHandler(handler)
LOGGER.setLevel(os.getenv('FORTRESS_LOGLEVEL', 'INFO'))

# Determinism and threads
seed = int(os.getenv('SEED', '42'))
random.seed(seed)
np.random.seed(seed)
os.environ.setdefault('OMP_NUM_THREADS', os.getenv('OMP_NUM_THREADS', '2'))
os.environ.setdefault('MKL_NUM_THREADS', os.getenv('MKL_NUM_THREADS', '2'))

# GPU / device configuration (optional)
USE_GPU = os.getenv('USE_GPU', '0') == '1'
TORCH_DEVICE = 'cpu'
ONNX_PROVIDERS = ['CPUExecutionProvider']
if USE_GPU:
    if torch is not None and torch.cuda.is_available():
        TORCH_DEVICE = 'cuda'
    if ort is not None:
        try:
            eps = ort.get_available_providers()
            if 'CUDAExecutionProvider' in eps:
                ONNX_PROVIDERS = ['CUDAExecutionProvider', 'CPUExecutionProvider']
        except Exception:
            pass
LOGGER.info(f'GPU enabled: {USE_GPU} | Torch device: {TORCH_DEVICE} | ONNX providers: {ONNX_PROVIDERS}')

# Paths
KAGGLE_INPUT_DIR = Path('/kaggle/input')
KAGGLE_WORK_DIR = Path('/kaggle/working')
LOCAL_ROOT = Path.cwd()
ARTIFACT_DIR = Path(os.getenv('ARTIFACT_DIR', '/kaggle/input/hull-champion-model-artifacts'))
# Allow explicit model artifact dataset path (set via kernel env var `MODEL_ARTIFACTS_DIR`)
MODEL_DIR = Path(os.getenv('MODEL_ARTIFACTS_DIR', os.getenv('ARTIFACT_DIR', '/kaggle/input/hull-champion-model-artifacts')))
PERSIST_DIR = Path(os.getenv('PERSIST_DIR', KAGGLE_WORK_DIR))
LOGGER.info(f'Model dir: {MODEL_DIR}')
PERSIST_DIR.mkdir(parents=True, exist_ok=True)

# Print versions/hardware
LOGGER.info(f'Python: {platform.python_version()} | NumPy: {np.__version__} | Pandas: {pd.__version__}')
try:
    import sklearn
    LOGGER.info(f'scikit-learn: {sklearn.__version__}')
except Exception:
    pass
if lgb: LOGGER.info('LightGBM available')
if ort: LOGGER.info(f'ONNX Runtime: {ort.__version__}')
if torch: LOGGER.info(f'PyTorch: {torch.__version__}')
LOGGER.info(f'Artifact dir: {ARTIFACT_DIR} | Persist dir: {PERSIST_DIR}')

[23:16:50] INFO - GPU enabled: False | Torch device: cpu | ONNX providers: ['CPUExecutionProvider']


[23:16:50] INFO - Model dir: G:\2025\Kaggle\#3.Hull Tactical - Market Prediction\dataset


[23:16:50] INFO - Python: 3.13.5 | NumPy: 2.3.3 | Pandas: 2.3.3


[23:16:50] INFO - scikit-learn: 1.7.2


[23:16:50] INFO - LightGBM available


[23:16:50] INFO - PyTorch: 2.8.0+cpu


[23:16:50] INFO - Artifact dir: G:\2025\Kaggle\#3.Hull Tactical - Market Prediction\dataset | Persist dir: \kaggle\working


# Load Pre-Trained Artifacts
Load persisted models/scalers and any configuration artifacts. Prefer the `experiments/backtest` outputs or Kaggle input datasets.

In [2]:
# Resolve artifacts and load
REPO_ROOT = LOCAL_ROOT
if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT))
# FortressPredictor will be defined inline below to avoid external imports on Kaggle

# Primary structured layout under MODEL_DIR (allows overriding via kernel env vars)
RET_MODELS = [MODEL_DIR / f'optuna_model_fold{i}.txt' for i in range(1,6)]
RET_SCALERS = [MODEL_DIR / f'optuna_scaler_fold{i}.pkl' for i in range(1,6)]
RISK_OPTUNA = MODEL_DIR / 'optuna_risk_brain_best_params.json'
RISK_MODEL = MODEL_DIR / 'risk_brain_full.txt'
RISK_SCALER = MODEL_DIR / 'risk_brain_scaler.pkl'
RISK_META = MODEL_DIR / 'risk_brain_full_meta.json'

# Validate presence (warn only; FortressPredictor expects these to exist on Kaggle)
for p in RET_MODELS + RET_SCALERS + [RISK_OPTUNA, RISK_MODEL, RISK_SCALER, RISK_META]:
    if not p.exists():
        LOGGER.warning(f'Missing artifact: {p}')
LOGGER.info('Artifacts resolved (Kaggle fixed paths).')





























[23:16:50] INFO - Artifacts resolved (Kaggle fixed paths).


# Stateful Inference Engine
We maintain a rolling window and produce one allocation per time step, reusing the FortressPredictor implementation.

## Inline Fortress Predictor
To ensure portability in Kaggle without repo imports, we inline the FortressPredictor and helpers here.

In [3]:
# FortressPredictor and helpers (inlined)
from collections import deque
from typing import Optional, Deque, Dict, Any, List
import json as _json

# LightGBM and joblib were imported above if available
if lgb is None:
    raise ImportError('LightGBM is required for inference. Please ensure it is available in Kaggle runtime.')

# Import polars for on-the-fly feature building
try:
    import polars as pl
except Exception as _e:
    pl = None

# Small helpers that mirror the script implementation
def _drop_and_select_features(df: pd.DataFrame) -> pd.DataFrame:
    drop_cols = ['row_id', 'time_id', 'date_id', 'date', 'market_forward_excess_returns']
    fwd_cols = [c for c in df.columns if c.startswith('forward_')]
    cols = [c for c in df.columns if c not in drop_cols and c not in fwd_cols]
    return df[cols]

def _safe_transform(X: pd.DataFrame, scaler: Any) -> pd.DataFrame:
    """Perform StandardScaler-like transform using stored stats when available to avoid sklearn version drift."""
    try:
        mean_ = getattr(scaler, 'mean_', None)
        scale_ = getattr(scaler, 'scale_', None)
        if mean_ is not None and scale_ is not None:
            mu = np.asarray(mean_, dtype=float)
            sc = np.asarray(scale_, dtype=float)
            if mu.shape[0] == X.shape[1] and sc.shape[0] == X.shape[1]:
                Xn = (X.values.astype(float) - mu) / np.where(sc == 0, 1.0, sc)
                return pd.DataFrame(Xn, columns=X.columns, index=X.index)
        Xt = scaler.transform(X)
        return pd.DataFrame(Xt, columns=X.columns, index=X.index)
    except Exception:
        return X.copy()

def _numeric_cols_pl(df: 'pl.DataFrame', exclude: List[str] = None) -> List[str]:
    if exclude is None:
        exclude = ['row_id', 'time_id', 'date_id', 'date', 'market_forward_excess_returns']
    numeric_kinds = ('Int', 'Float', 'Decimal')
    out = []
    for c, t in zip(df.columns, df.dtypes):
        if c in exclude:
            continue
        if any(k in str(t) for k in numeric_kinds):
            out.append(c)
    return out

def _make_lag_features(df: 'pl.DataFrame', cols: List[str], lags: List[int]) -> 'pl.DataFrame':
    out = df
    for c in cols:
        for l in lags:
            out = out.with_columns(pl.col(c).shift(l).alias(f"{c}_lag{l}"))
    return out

def _make_rolling_features(df: 'pl.DataFrame', cols: List[str], windows: List[int], aggs=("mean","std")) -> 'pl.DataFrame':
    out = df
    for c in cols:
        for w in windows:
            for agg in aggs:
                if agg == 'mean':
                    out = out.with_columns(pl.col(c).rolling_mean(window_size=w).alias(f"{c}_r{w}_mean"))
                elif agg == 'std':
                    out = out.with_columns(pl.col(c).rolling_std(window_size=w).alias(f"{c}_r{w}_std"))
                elif agg == 'sum':
                    out = out.with_columns(pl.col(c).rolling_sum(window_size=w).alias(f"{c}_r{w}_sum"))
                else:
                    raise ValueError(f"Unknown agg {agg}")
    return out

def _prefix_group_aggregations(df: 'pl.DataFrame', prefixes: List[str], windows: List[int]) -> 'pl.DataFrame':
    out = df
    cols = df.columns
    from functools import reduce
    import operator
    numeric_kinds = ('Int', 'Float', 'Decimal')
    for p in prefixes:
        group_cols = [c for c in cols if c.startswith(p)]
        group_cols = [c for c in group_cols if any(k in str(out.schema.get(c)) for k in numeric_kinds)]
        if not group_cols:
            continue
        expr_sum = reduce(operator.add, [pl.col(c) for c in group_cols])
        expr = expr_sum / len(group_cols)
        out = out.with_columns(expr.alias(f'G_{p}_mean'))
        for w in windows:
            out = out.with_columns(pl.col(f'G_{p}_mean').rolling_mean(window_size=w).alias(f'G_{p}_r{w}_mean'))
            out = out.with_columns(pl.col(f'G_{p}_mean').rolling_std(window_size=w).alias(f'G_{p}_r{w}_std'))
    return out

class RollingBuffer:
    def __init__(self, maxlen: int = 252):
        self.maxlen = maxlen
        self._buf: Deque[pd.Series] = deque(maxlen=maxlen)
        self.last_time_id: Optional[int] = None
    def update(self, row: Dict[str, Any] | pd.Series):
        if isinstance(row, dict):
            row = pd.Series(row)
        # Optional chronological enforcement via env var ENFORCE_CHRONO
        try:
            if os.getenv('ENFORCE_CHRONO', '0') == '1' and 'time_id' in row:
                tid = int(row.get('time_id'))
                if self.last_time_id is not None and tid < self.last_time_id:
                    LOGGER.warning(f'Chronology violation: incoming time_id {tid} < last {self.last_time_id}; skipping update')
                    return
                self.last_time_id = tid
        except Exception:
            pass
        self._buf.append(row)
    def to_pandas(self) -> pd.DataFrame:
        if not self._buf:
            return pd.DataFrame()
        return pd.DataFrame(self._buf).reset_index(drop=True)
    def build_features(self) -> pd.DataFrame:
        if pl is None:
            # Fallback: without polars we cannot build rich features; return current buffer as-is
            return self.to_pandas()
        pdf = self.to_pandas()
        if pdf.empty:
            return pdf
        pl_df = pl.from_pandas(pdf)
        num_cols = _numeric_cols_pl(pl_df)
        lags = [1, 2, 3]
        windows = [5, 21]
        pl_feat = _make_lag_features(pl_df, num_cols, lags)
        pl_feat = _make_rolling_features(pl_feat, num_cols, windows, aggs=("mean","std"))
        prefixes = ['M','E','P','V','S','MOM','D','I']
        pl_feat = _prefix_group_aggregations(pl_feat, prefixes, windows)
        return pl_feat.to_pandas()

class DynamicScalerState:
    def __init__(self, alpha: float, clip_lo: float, clip_hi: float, target_annual_vol: float, max_daily_scale_change: Optional[float], alloc_cap: Optional[float]):
        self.alpha = alpha
        self.clip_lo = clip_lo
        self.clip_hi = clip_hi
        self.target_daily = target_annual_vol / np.sqrt(252.0)
        self.msc = max_daily_scale_change
        self.alloc_cap = alloc_cap
        self.s_vol_prev: Optional[float] = None
        self.scale_prev: Optional[float] = None
        self.alloc_prev: Optional[float] = None
    def step(self, base_alloc: float, predicted_vol_daily: float) -> float:
        if self.s_vol_prev is None:
            s_vol = predicted_vol_daily
        else:
            s_vol = self.alpha * predicted_vol_daily + (1 - self.alpha) * self.s_vol_prev
        self.s_vol_prev = s_vol
        scale = self.target_daily / max(s_vol, 1e-6)
        scale = float(np.clip(scale, self.clip_lo, self.clip_hi))
        if self.msc is not None and self.scale_prev is not None:
            lo = self.scale_prev * (1.0 - self.msc)
            hi = self.scale_prev * (1.0 + self.msc)
            scale = float(np.clip(scale, lo, hi))
        self.scale_prev = scale
        alloc = base_alloc * scale
        if self.alloc_prev is not None and self.alloc_cap is not None:
            lo = self.alloc_prev - self.alloc_cap
            hi = self.alloc_prev + self.alloc_cap
            alloc = float(np.clip(alloc, lo, hi))
        self.alloc_prev = alloc
        return alloc

class FortressPredictor:
    def __init__(self, buffer_len: int = 252, base_scale: float = 0.5):
        self.buffer = RollingBuffer(maxlen=buffer_len)
        self.base_scale = base_scale
        self._ret_hist: Deque[float] = deque(maxlen=buffer_len)
        self._ret_clip = (0.0, 2.0)
        self.last_diag: Optional[Dict[str, Any]] = None

        # Load return ensemble boosters + scalers from Kaggle input artifacts
        self.ret_models: List[Any] = []
        self.ret_scalers: List[Any] = []
        self.ret_feat_names: List[List[str]] = []
        for mpath, spath in zip(RET_MODELS, RET_SCALERS):
            if mpath.exists() and spath.exists():
                self.ret_models.append(lgb.Booster(model_file=str(mpath)))
                self.ret_scalers.append(joblib.load(spath))
                self.ret_feat_names.append(self.ret_models[-1].feature_name())
                # Persist scaler stats to JSON to avoid sklearn pickle drift at inference
                try:
                    scaler = self.ret_scalers[-1]
                    mean_ = getattr(scaler, 'mean_', None)
                    scale_ = getattr(scaler, 'scale_', None)
                    if mean_ is not None and scale_ is not None:
                        stats = {'mean': [float(x) for x in mean_], 'scale': [float(x) for x in scale_]}
                        outp = PERSIST_DIR / f'ret_scaler_stats_fold{len(self.ret_scalers)}.json'
                        with open(outp, 'w') as _f:
                            _json.dump(stats, _f)
                        LOGGER.info(f'Wrote scaler stats to {outp}')
                except Exception as _:
                    pass
        if not self.ret_models:
            raise FileNotFoundError(f'Return model boosters/scalers not found under {MODEL_DIR}')

        # Load risk brain
        self.knobs = {}
        try:
            with open(RISK_OPTUNA, 'r') as f:
                rb = _json.load(f)
            self.knobs = rb.get('best_params', rb)
        except Exception:
            self.knobs = {}
        self.risk_target_kind = self.knobs.get('risk_target_kind', 'sq')

        self.risk_model = None
        self.risk_scaler = None
        self.risk_feat_names: List[str] = []
        self.calib_ratio = 1.0
        if RISK_MODEL.exists() and RISK_SCALER.exists() and RISK_META.exists():
            self.risk_model = lgb.Booster(model_file=str(RISK_MODEL))
            self.risk_scaler = joblib.load(RISK_SCALER)
            self.risk_feat_names = self.risk_model.feature_name()
            try:
                with open(RISK_META, 'r') as f:
                    meta = _json.load(f)
                self.calib_ratio = float(meta.get('calib_ratio', 1.0))
            except Exception:
                self.calib_ratio = 1.0
            # Persist risk scaler stats as JSON
            try:
                mean_ = getattr(self.risk_scaler, 'mean_', None)
                scale_ = getattr(self.risk_scaler, 'scale_', None)
                if mean_ is not None and scale_ is not None:
                    stats = {'mean': [float(x) for x in mean_], 'scale': [float(x) for x in scale_]}
                    outp = PERSIST_DIR / 'risk_scaler_stats.json'
                    with open(outp, 'w') as _f:
                        _json.dump(stats, _f)
                    LOGGER.info(f'Wrote risk scaler stats to {outp}')
            except Exception:
                pass
        else:
            raise FileNotFoundError(f'Risk brain artifacts not found under {MODEL_DIR}')

        self.scaler_state = DynamicScalerState(
            alpha=float(self.knobs.get('alpha', 0.2)),
            clip_lo=float(self.knobs.get('clip_lo', 0.25)),
            clip_hi=float(self.knobs.get('clip_hi', 2.5)),
            target_annual_vol=float(self.knobs.get('target_annual_vol', 0.12)),
            max_daily_scale_change=float(self.knobs.get('max_daily_scale_change', 0.3)),
            alloc_cap=(None if self.knobs.get('alloc_cap', None) is None else float(self.knobs['alloc_cap'])),
)

    def _align_columns(self, X: pd.DataFrame, feature_names: List[str]) -> pd.DataFrame:
        present = set(X.columns)
        missing = [c for c in feature_names if c not in present]
        if missing:
            for c in missing:
                X[c] = 0.0
        return X[feature_names]

    def _has_all_features(self, df_cols: List[str]) -> bool:
        if not self.ret_feat_names or not self.risk_feat_names:
            return False
        ret_ok = all(c in df_cols for c in self.ret_feat_names[0])
        risk_ok = all(c in df_cols for c in self.risk_feat_names)
        return ret_ok and risk_ok

    def update_and_predict(self, day_row: Dict[str, Any] | pd.Series, expects_features: bool = True) -> float:
        self.buffer.update(day_row)
        # Auto-detect if the incoming rows are raw or already featured
        feat_df = self.buffer.to_pandas()
        if feat_df.empty:
            return 0.0
        if not self._has_all_features(list(feat_df.columns)) or not expects_features:
            feat_df = self.buffer.build_features()
            if feat_df.empty:
                return 0.0
        X_all = _drop_and_select_features(feat_df)
        x_today = X_all.tail(1).copy()
        ret_preds = []
        for booster, scaler, feat_names in zip(self.ret_models, self.ret_scalers, self.ret_feat_names):
            Xt = self._align_columns(x_today.copy(), feat_names)
            Xt_s = _safe_transform(Xt, scaler)
            ret_preds.append(float(booster.predict(Xt_s)[0]))
        pred_ret = float(np.mean(ret_preds))
        self._ret_hist.append(pred_ret)
        arr = np.array(self._ret_hist, dtype=float)
        if len(arr) >= 5:
            mean = float(np.nanmean(arr))
            std = float(np.nanstd(arr))
            z = 0.0 if std <= 1e-12 or np.isnan(std) else (pred_ret - mean) / std
        else:
            z = 0.0
        mid = (self._ret_clip[0] + self._ret_clip[1]) / 2.0
        width = (self._ret_clip[1] - self._ret_clip[0]) / 2.0
        alloc_base = mid + width * np.tanh(self.base_scale * z)

        Xr = self._align_columns(x_today.copy(), self.risk_feat_names)
        Xr_s = _safe_transform(Xr, self.risk_scaler)
        pr = float(self.risk_model.predict(Xr_s)[0])
        if self.risk_target_kind == 'sq':
            pred_vol = float(np.sqrt(max(pr, 0.0) + 1e-12))
        elif self.risk_target_kind == 'abs':
            pred_vol = float(np.clip(pr, 0.0, None))
        else:
            pred_vol = float(np.exp(pr / 2.0))
        pred_vol *= self.calib_ratio

        alloc_final = self.scaler_state.step(base_alloc=alloc_base, predicted_vol_daily=pred_vol)
        alloc_final = float(np.clip(alloc_final, 0.0, 2.0))

        self.last_diag = {
            'pred_ret': float(pred_ret),
            'alloc_base': float(alloc_base),
            'pred_vol': float(pred_vol),
            'smoothed_vol': float(self.scaler_state.s_vol_prev if self.scaler_state.s_vol_prev is not None else pred_vol),
            'scale': float(self.scaler_state.scale_prev if self.scaler_state.scale_prev is not None else 1.0),
            'alloc_final': float(alloc_final),
        }
        return float(alloc_final)

class FortressAPI:
    def __init__(self):
        self.model = FortressPredictor()
        self.use_gpu = os.getenv('USE_GPU', '0') == '1'
        self.torch_device = 'cuda' if self.use_gpu and (torch is not None) and torch.cuda.is_available() else 'cpu'
        # Diagnostics collection for reporting
        self.collect_diag = os.getenv('COLLECT_DIAGNOSTICS', '1') == '1'
        self._allocations: List[float] = []
        self._pred_returns: List[float] = []
        self._pred_vols: List[float] = []
        self._smoothed_vols: List[float] = []
        self._scales: List[float] = []
        self._last_predict_seconds: float = 0.0
        
    def reset_diags(self):
        self._allocations.clear()
        self._pred_returns.clear()
        self._pred_vols.clear()
        self._smoothed_vols.clear()
        self._scales.clear()
        self._last_predict_seconds = 0.0
    
    def predict_batch(self, df: pd.DataFrame) -> np.ndarray:
        """Accepts a pandas DataFrame batch; emits a 1D np.ndarray of allocations (float32).
        This function advances internal state row-by-row to preserve chronology."""
        if self.collect_diag:
            self.reset_diags()
        t0 = time.perf_counter()
        out = np.empty(len(df), dtype=np.float32)
        max_seconds = float(os.getenv('MAX_BATCH_SECONDS', '300'))
        for i, (_, row) in enumerate(df.iterrows()):
            # Timeout check
            if (time.perf_counter() - t0) > max_seconds:
                LOGGER.warning(f'predict_batch timeout ({max_seconds}s) reached at index {i}; filling remaining outputs with last value')
                last_val = float(out[i-1]) if i > 0 else 0.0
                out[i:] = last_val
                break
            out[i] = self.model.update_and_predict(row, expects_features=True)
            if self.collect_diag:
                diag = self.model.last_diag or {}
                self._allocations.append(float(out[i]))
                self._pred_returns.append(float(diag.get('pred_ret', np.nan)))
                self._pred_vols.append(float(diag.get('pred_vol', np.nan)))
                self._smoothed_vols.append(float(diag.get('smoothed_vol', np.nan)))
                self._scales.append(float(diag.get('scale', np.nan)))
        self._last_predict_seconds = time.perf_counter() - t0
        return out


In [4]:
# No additional engine code needed: FortressPredictor encapsulates state, feature transforms, and dynamic sizing.
# We only define a thin wrapper to align with Kaggle's expected interface.
class FortressAPI:
    def __init__(self):
        self.model = FortressPredictor()
        self.use_gpu = os.getenv('USE_GPU', '0') == '1'
        self.torch_device = 'cuda' if self.use_gpu and (torch is not None) and torch.cuda.is_available() else 'cpu'
        # Diagnostics collection for reporting
        self.collect_diag = os.getenv('COLLECT_DIAGNOSTICS', '1') == '1'
        self._allocations: List[float] = []
        self._pred_returns: List[float] = []
        self._pred_vols: List[float] = []
        self._smoothed_vols: List[float] = []
        self._scales: List[float] = []
        self._last_predict_seconds: float = 0.0
        
    def reset_diags(self):
        self._allocations.clear()
        self._pred_returns.clear()
        self._pred_vols.clear()
        self._smoothed_vols.clear()
        self._scales.clear()
        self._last_predict_seconds = 0.0
    
    def predict_batch(self, df: pd.DataFrame) -> np.ndarray:
        """Accepts a pandas DataFrame batch; emits a 1D np.ndarray of allocations (float32).
        This function advances internal state row-by-row to preserve chronology."""
        if self.collect_diag:
            self.reset_diags()
        t0 = time.perf_counter()
        out = np.empty(len(df), dtype=np.float32)
        for i, (_, row) in enumerate(df.iterrows()):
            out[i] = self.model.update_and_predict(row, expects_features=True)
            if self.collect_diag:
                diag = self.model.last_diag or {}
                self._allocations.append(float(out[i]))
                self._pred_returns.append(float(diag.get('pred_ret', np.nan)))
                self._pred_vols.append(float(diag.get('pred_vol', np.nan)))
                self._smoothed_vols.append(float(diag.get('smoothed_vol', np.nan)))
                self._scales.append(float(diag.get('scale', np.nan)))
        self._last_predict_seconds = time.perf_counter() - t0
        return out

# Model Instantiation
Create a single global instance of the API adapter to maintain state across calls.

In [5]:
# Instantiate once (with fallback to a dummy predictor for local smoke-runs)
try:
    FORTRESS = FortressAPI()
    LOGGER.info('FortressAPI instantiated and ready.')
except Exception as e:
    LOGGER.warning(f'Failed to instantiate FortressAPI: {e}; using DummyFortress for local run')
    class DummyFortress:
        def __init__(self):
            pass
        def predict_batch(self, df: pd.DataFrame) -> np.ndarray:
            LOGGER.warning('DummyFortress.predict_batch called — returning zeros')
            return np.zeros(len(df), dtype=np.float32)
    FORTRESS = DummyFortress()
    LOGGER.info('DummyFortress instantiated for local smoke-run.')



[23:16:51] INFO - DummyFortress instantiated for local smoke-run.


In [6]:
# Warmup presence check - aborts with instructions if warmup missing
warmup_candidates = [
    ARTIFACT_DIR / 'warmup_features.parquet',
    LOCAL_ROOT / 'data' / 'processed' / 'features_post_gfc.parquet',
    LOCAL_ROOT / 'data' / 'processed' / 'features_post_gfc_sample.csv',
]
warmup_path = None
for p in warmup_candidates:
    try:
        if p.exists():
            warmup_path = p
            break
    except Exception:
        continue
if warmup_path is None:
    msg = (
        'Warm-up file not found. For Kaggle submission, upload the precomputed ',
        'warmup_features.parquet as a dataset input named "warmup-features" and ',
        'set ARTIFACT_DIR to /kaggle/input/warmup-features. Alternatively, run ',
        'scripts/package_warmup_dataset.py to produce a local packaging directory.'
    )
    msg = ''.join(msg)
    LOGGER.error(msg)
    print(msg)
    # Halt further execution in notebook run to avoid cold-start failures
    raise FileNotFoundError(msg)
else:
    LOGGER.info(f'Warmup file found at: {warmup_path}')
    print('Warmup file present:', warmup_path)


[23:16:51] INFO - Warmup file found at: G:\2025\Kaggle\#3.Hull Tactical - Market Prediction\dataset\warmup_features.parquet


Warmup file present: G:\2025\Kaggle\#3.Hull Tactical - Market Prediction\dataset\warmup_features.parquet


In [7]:

# Smoke test: load a small warmup sample and run FORTRESS.predict_batch
from pathlib import Path
import pandas as pd
p = Path('dataset') / 'warmup_features.parquet'
if not p.exists():
    p = Path('/kaggle/input/warmup-features') / 'warmup_features.parquet'
if not p.exists():
    raise FileNotFoundError(f'Warmup file not found at {p}')
df = pd.read_parquet(p).head(8)
print('Loaded warmup rows:', len(df))
allocs = FORTRESS.predict_batch(df)
print('Allocations:', allocs)


Loaded warmup rows: 8


Allocations: [0. 0. 0. 0. 0. 0. 0. 0.]
