## Quantum Model Version: Fidelity Quantum Kernel Ridge

- This notebook contains my steps on creating the most optimal model for the quantum version of this. 
- The goal for the model is to find the next day % return of RBLX stock.

Why RBLX? well, I grew up playing roblox and I also have shares invested in roblox lol

### Data and feature pipelining

In [39]:
# ───────────────────────────────── Imports ────────────────────────────────────
import math
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, root_mean_squared_error
from sklearn.decomposition import PCA

# ─── quantum stack ────────────────────────────────────────────
from qiskit_aer import Aer                              # simulators
from qiskit_algorithms.utils import algorithm_globals    # RNG / seed
from qiskit.circuit.library import ZZFeatureMap
from qiskit_machine_learning.kernels import FidelityStatevectorKernel
from sklearn.kernel_ridge import KernelRidge             # classical par
from qiskit_aer.primitives import Sampler                    # BaseSampler V1 ✓  :contentReference[oaicite:0]{index=0}
from torch.utils.data import DataLoader, TensorDataset
from torch.optim import Adam


# PennyLane
import pennylane as qml
import torch
from torch.optim import Adam

# Technical indicators
import ta  # pip install ta


In [40]:
# ─────────────────────────── Configuration ────────────────────────────
CONFIG = dict(
    seed            = 7,
    window          = 5,        # look-back days
    test_size       = 0.2,
    target          = "return", # "return" | "close"
    scaler          = StandardScaler(),
    n_qubits        = 6,        # keep small for state-vector kernel
    pca_keep        = 6,        # MUST match n_qubits
    q_layers        = 3,
    epochs          = 20,
    batch_size      = 64,
    lr              = 1e-2,
    reg_alpha       = 1e-2,
    classical_baseline = True,
)

np.random.seed(CONFIG["seed"])
torch.manual_seed(CONFIG["seed"])


<torch._C.Generator at 0x24cc8f53830>

In [41]:
# ──────────────────────────── 1. Load data ───────────────────────────────────
def load_dataset(path: str) -> pd.DataFrame:
    df = pd.read_csv(path, parse_dates=["date"]).sort_values("date")
    df = df.ffill().bfill()
    df["ret"] = df["Close"].pct_change()
    return df.dropna().reset_index(drop=True)

In [42]:
# ───────────── 2. Feature engineering & windowing ────────────────────────────
def make_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["rsi14"] = ta.momentum.rsi(df["Close"], 14)
    df["macd"]  = ta.trend.macd_diff(df["Close"])
    df["atr14"] = ta.volatility.average_true_range(df["High"], df["Low"], df["Close"])
    df["sma20"] = df["Close"].rolling(20).mean()
    df["ema20"] = ta.trend.ema_indicator(df["Close"], 20)
    df["vol_chg"] = df["Volume"].pct_change()

    numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
    df[numeric_cols] = df[numeric_cols].ffill().bfill()
    return df

def windowify(df: pd.DataFrame, window: int, target: str):
    """Vectorised; ~100× faster than Python loop."""
    feats = df.drop(columns=["date", "Close", "ret"]).to_numpy(dtype=np.float32)
    n, d  = feats.shape
    stride0, stride1 = feats.strides
    X = np.lib.stride_tricks.as_strided(
        feats,
        shape=(n - window - 1, window * d),
        strides=(stride0, stride0)
    )
    if target == "return":
        y = df["ret"].to_numpy(dtype=np.float32)[window + 1 :]
    else:
        y = df["Close"].to_numpy(dtype=np.float32)[window + 1 :]
    return X.copy(), y.copy()


In [43]:
def build_sampler(shots: int | None = None, seed: int = 7):
    """
    Return an *Aer* Sampler (V1).  This class still inherits from
    `BaseSampler` – exactly what `ComputeUncompute` expects.  Setting
    `shots=None` makes Aer calculate exact probabilities; an integer
    gives shot-based sampling.  The seed is forwarded via run-options.
    """
    run_opts = {"shots": shots, "seed": seed}                # shots=None ⇒ exact  :contentReference[oaicite:1]{index=1}
    return Sampler(run_options=run_opts)     

### Method 1: Quantum Kernel Ridge Regression via Qiskit + Sklearn

In [44]:
# ──────────── 4A. Quantum Kernel Ridge Regression via Qiskit + Sklearn ───────
def train_qkrr_fast(X_tr, y_tr, X_te, alpha=0.1, reps=1):
    fmap = ZZFeatureMap(CONFIG["n_qubits"], reps=reps)
    kernel = FidelityStatevectorKernel(
        feature_map=fmap,
        auto_clear_cache=False,   #  KEEP the state-vector cache
        shots=None,
    )
    # Pre-compute Gram matrices just once
    K_tr = kernel.evaluate(X_tr)
    K_te = kernel.evaluate(X_te, X_tr)

    model = KernelRidge(alpha=alpha, kernel="precomputed")
    model.fit(K_tr, y_tr)
    return model.predict(K_te)

### Evaluation

In [45]:
# ────────────────────────── 5. Utility metrics ──────────────────────────────
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error

def directional_accuracy(y_true, y_pred):
    """Share of samples whose predicted sign equals the true sign."""
    return np.mean(np.sign(y_true) == np.sign(y_pred))

def evaluate(y, yhat, name):
    mae   = mean_absolute_error(y, yhat)
    rmse  = math.sqrt(mean_squared_error(y, yhat))
    dacc  = directional_accuracy(y, yhat)
    print(f"{name:<6}  MAE={mae:.6f}  RMSE={rmse:.6f}  DA={dacc:.3f}")
    return mae, rmse, dacc


In [46]:
csv_path = "../data/master_dataset_cleaned.csv"
print("Loading dataset …")

df = make_features(load_dataset(csv_path))
X, y = windowify(df, CONFIG["window"], CONFIG["target"])
X = CONFIG["scaler"].fit_transform(X)

# dimensionality reduction → n_qubits dims
pca = PCA(
    n_components=CONFIG["pca_keep"],
    random_state=CONFIG["seed"],
    svd_solver="randomized",
    iterated_power=3,
    n_oversamples=8,
)
X_red = pca.fit_transform(X)

X_tr, X_te, y_tr, y_te = train_test_split(
    X_red, y, test_size=CONFIG["test_size"], shuffle=False)

results = {}

# — QKRR —
preds = train_qkrr_fast(X_tr, y_tr, X_te)
results["QKRR"] = evaluate(y_te, preds, "QKRR")


print("\nSummary:")
for name, (mae, rmse, dacc) in results.items():
    print(f"{name}: MAE={mae:.6f} | RMSE={rmse:.6f} | DA={dacc:.3f}")

Loading dataset …


KeyboardInterrupt: 

In [23]:
import os
from itertools import product
from joblib import Parallel, delayed
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from qiskit_machine_learning.kernels import FidelityStatevectorKernel
from qiskit.circuit.library import ZZFeatureMap
from sklearn.kernel_ridge import KernelRidge

# 1) feature compression (LDA) -------------------------------------------
lda = LDA(n_components=1)
x1d = lda.fit_transform(X, np.sign(y))      # shape (N, 1)

# tile the single component to fill 6 qubits
X_red = np.tile(x1d, (1, CONFIG["n_qubits"]))   # shape (N, 6)

# 2) split ---------------------------------------------------------------
X_tr, X_te, y_tr, y_te = train_test_split(
    X_red, y, test_size=CONFIG["test_size"], shuffle=False)

# 3) statevector backend options -----------------------------------------
sv_kwargs = {"method": "statevector_gpu"} if "CUDA_VISIBLE_DEVICES" in os.environ else {}
fmap_cache = {}      # reuse circuits across α,reps combos

def fit_single(alpha, reps):
    # cached feature map -----------------------------------------------
    if reps not in fmap_cache:
        fmap_cache[reps] = ZZFeatureMap(CONFIG["n_qubits"], reps=reps)
    fmap = fmap_cache[reps]

    kernel = FidelityStatevectorKernel(
        feature_map=fmap,
        auto_clear_cache=False,        # speed!
        shots=None  
    )

    # one-shot Gram matrix ---------------------------------------------
    K_tr = kernel.evaluate(X_tr)
    K_te = kernel.evaluate(X_te, X_tr)

    mdl = KernelRidge(alpha=alpha, kernel="precomputed")
    mdl.fit(K_tr, y_tr)
    preds = mdl.predict(K_te)
    mae  = mean_absolute_error(y_te, preds)
    rmse = root_mean_squared_error(y_te, preds)
    da   = np.mean(np.sign(y_te) == np.sign(preds))     # directional accuracy

    return (alpha, reps, mae, rmse, da)

# 4) tiny grid search -----------------------------------------------------
alphas = 10.0 ** np.linspace(-4, -1, 4)  # 1e-4 … 1e-1
reps_list = [1, 2]
results = Parallel(n_jobs=-1)(
    delayed(fit_single)(a, r) for a, r in product(alphas, reps_list)
)

alpha_best, reps_best, mae_best, rmse_best, da_best = min(
    results, key=lambda t: t[2]  # still minimizing MAE (index 2)
)
print(f"BEST α={alpha_best:.1e}, reps={reps_best}  "
      f"MAE={mae_best:.6f}  RMSE={rmse_best:.6f}  DA={da_best:.3f}")



BEST α=1.0e-01, reps=1  MAE=0.020730  RMSE=0.029117  DA=0.668


In [28]:
fmap = ZZFeatureMap(6, reps=1)
kernel = FidelityStatevectorKernel(
    feature_map=fmap,
    auto_clear_cache=False,
    shots=None
)
K_train = kernel.evaluate(X_tr)
K_test  = kernel.evaluate(X_te, X_tr)

model = KernelRidge(alpha=0.1, kernel="precomputed")
model.fit(K_train, y_tr)
y_pred = model.predict(K_test)