<a href="https://colab.research.google.com/github/Jatin-Khiyani/Visual-Situmlai-Reconstruction-Using-fMRI-and-Deep-Learning/blob/main/FastL2LiR_copy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Importing Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import torch

## Importing Captions and fMRI

In [None]:
fmri = np.load('/content/drive/MyDrive/NSD_Dataset/fmri.npy').astype(np.float32)
clip_text = np.load('/content/drive/MyDrive/NSD_Dataset/Caption Embdedding.npy').astype(np.float32)
vqvae_latents = np.load('/content/drive/MyDrive/NSD_Dataset/z.npy').astype(np.float32)
# === Print shapes ===
print("fMRI shape:", fmri.shape)
print("CLIP text embeddings shape:", clip_text.shape)
print("VQ-VAE latents shape:", vqvae_latents.shape)


fMRI shape: (27750, 215, 200)
CLIP text embeddings shape: (27892, 7680)
VQ-VAE latents shape: (27750, 16384)


### Flattening and PreProccessing

In [None]:
# === Flatten fMRI to (N, D_fmri) ===
fmri_flat = fmri.reshape(fmri.shape[0], -1)  # shape: (27750, 43000)

# === Clip c to match samples ===
clip_text = clip_text[:fmri_flat.shape[0]]  # now (27750, 7680)

# === Final sanity check ===
print("fMRI (flattened):", fmri_flat.shape)
print("CLIP text:", clip_text.shape)
print("VQ-VAE latents:", vqvae_latents.shape)

fMRI (flattened): (27750, 43000)
CLIP text: (27750, 7680)
VQ-VAE latents: (27750, 16384)


## Importing FastL2LiR

In [7]:
import numpy as np
import torch
from tqdm import tqdm

# ─── your RidgeDual class (as defined above) ─────────────────────────────────

class RidgeDual:
    def __init__(self, alpha=1.0, device=None):
        self.alpha = alpha
        self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
        self.W = None
        self.b = None

    def fit(self, X_np, Y_np):
        # X: (N, D), Y: (N, T)
        X = torch.from_numpy(X_np).float().to(self.device)
        Y = torch.from_numpy(Y_np).float().to(self.device)

        # Center data
        X_mean = X.mean(dim=0, keepdim=True)
        Y_mean = Y.mean(dim=0, keepdim=True)
        Xc = X - X_mean
        Yc = Y - Y_mean

        N, D = Xc.shape

        # Compute (X X^T + αI) and solve for A
        K = Xc @ Xc.T                    # (N, N)
        A = torch.linalg.solve(
            K + self.alpha * torch.eye(N, device=self.device),
            Yc
        )                                # (N, T)

        # Recover W and b
        self.W = Xc.T @ A                # (D, T)
        self.b = (Y_mean - X_mean @ self.W).squeeze(0)  # (T,)

    def predict(self, X_np, chunk_size=None):
        X = torch.from_numpy(X_np).float().to(self.device)
        if chunk_size is None:
            return (X @ self.W + self.b).cpu().numpy()

        outs = []
        for start in tqdm(range(0, X.size(0), chunk_size), desc="Predicting", unit="chunk"):
            outs.append((X[start:start+chunk_size] @ self.W + self.b).cpu().numpy())
        return np.vstack(outs)

# ─── load your data (replace with your actual load code) ────────────────────────

# ─── fit the model ───────────────────────────────────────────────────────────────

model = RidgeDual(alpha=0.001)   # you can tweak alpha as needed
model.fit(fmri_flat, vqvae_latents)

# ─── (optional) predict on the same data with a progress bar ────────────────────

Y_pred = model.predict(fmri_flat, chunk_size=1000)
print("Predicted shape:", Y_pred.shape)


Predicting: 100%|██████████| 28/28 [00:03<00:00,  8.37chunk/s]


Predicted shape: (27750, 16384)


## Evaluating

In [8]:
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score
import torch

# Assume fmri_flat, vqvae_latents are your original numpy arrays
# and `model` is your fitted RidgeDual

# 1) Predict
Y_pred = model.predict(fmri_flat, chunk_size=1000)  # (27750, 16384)

# 2) Mean Squared Error
mse = mean_squared_error(vqvae_latents, Y_pred)
print(f"Overall MSE: {mse:.4e}")

# 3) R² score
#   - 'variance_weighted' weights each latent-dimension by its variance in the data.
r2 = r2_score(vqvae_latents, Y_pred, multioutput='variance_weighted')
print(f"Weighted R²: {r2:.4f}")

# 4) Sample-wise Pearson correlation
#    (skips any all-constant vectors to avoid NaNs)
cors = []
for i in range(Y_pred.shape[0]):
    y_true = vqvae_latents[i]
    y_hat  = Y_pred[i]
    if np.std(y_true) > 0 and np.std(y_hat) > 0:
        cors.append(np.corrcoef(y_true, y_hat)[0,1])
cors = np.array(cors)
print(f"Mean sample-wise ρ: {cors.mean():.4f} ± {cors.std():.4f}")

# 5) (Optional) Per-dimension R²
r2_per_dim = r2_score(vqvae_latents, Y_pred, multioutput='raw_values')
print(f"First 5 dims R²: {r2_per_dim[:5]}")
print(f"Median dim R²: {np.median(r2_per_dim):.4f}")


Predicting: 100%|██████████| 28/28 [00:03<00:00,  8.36chunk/s]


Overall MSE: 5.5100e+03
Weighted R²: -6711.4258
Mean sample-wise ρ: 0.0121 ± 0.0549
First 5 dims R²: [-5966.1294 -9507.328  -3979.802  -8833.873  -8673.898 ]
Median dim R²: -5136.1836
