# DecVTC - Decoding Voxels To CLIP

Over the last 15 years, there has been an exponentially compounding effort of interpreting the 'Meaning' of language by machines. With the astronomical rise of LLMs, fueled by the giant leap of improvement in attention mechanisms, understanding how meaning of input is parsed and understood is the by far most researched topic in the Data Science world, and particularly in the NLP world. With pre-training, we can achieve an effect of "understanding" a language, and the patterns in it, for later downstream task (as demonstrated by the rise of GPT models). We have seen that with enough compute power and electricity, we can practically teach machines to understand *anything*, from photos to videos to audio, and transformers have already made a huge impact on the world, less than a decade after being concieved.




This notebook's purpose is to try and learn a **linear** decoder of fMRI data (specifically voxel encoded data), that will generalize on unseen fMRI data for completely unrelated concepts. We hope to show that even with a simple linear decoder, without introducing any further complexity to the model, we can true understandinng of a concept, meaning, no matter how it's presented to us (textual, visual data), we will be ale to understand it.

### Text to CLIP cross modality ###
This notebook is the implementation of our original hypothesis <br>
Before starting the open ended project, we have hypothesized that meaning of textual data (both individual words of concepts and full sentences) and images of the same concepts might be interpreted in the same places inside the brain. <br>
To try proving said hypothesis, we set out to expand on the work done by Pereira et al. (2018). In their work, _Toward a universal decoder of linguistic meaning from brain activation_, Pereira et al. have made big strides in proving that meaning of different concepts, ranging from abstract to physical objects, is being parsed withing the brain in the same area. They scanned the

#### Setup

##### Dependecies #####
First, let's download all relevant dependecies to check our hypothesis

In [2]:
# Install dependencies
%pip install ftfy regex tqdm scikit-learn numpy matplotlib
%pip install -U gdown
%pip install git+https://github.com/openai/CLIP.git


Note: you may need to restart the kernel to use updated packages.
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to c:\users\user\appdata\local\temp\pip-req-build-zp70x3u0
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Note: you may need to restart the kernel to use updated packages.


  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git 'C:\Users\user\AppData\Local\Temp\pip-req-build-zp70x3u0'


##### Data

Now, we can import all of our relevant data! <br>
**Extract prerequisite data**
For this project, we've created a drive folder, containing all of the relevant code and data from the original Pereira et al. paper. Moreover, because the list of concepts and the related images from the original paper is static, we pre-calculated all of the relevant CLIP embeddings (The exact code we used can be seen here in "one_time_drive_setup"), and persisted them to drive. The folder contains:
- The CLIP embeddings for the textual concepts
- The CLIP embedding for all of the photos related to each concept
- The list of concepts
- The actual images for each concept (zipped)
- The fMRI data of one of the participant's in experiment 1 of the Pereira paper <br>
All of this data is based on the original Pereira paper.

In [3]:
import platform
from pathlib import Path

# If the data already exists, we don't need to download it again
if not Path("data").exists():
    # Check operating system - handlind the data is done differently on Windows and Linux.
    # This will allow us to run the code on Colab, locally, and on any other platform we may choose.
    IS_WINDOWS = platform.system() == "Windows"
    
    if IS_WINDOWS:
        !python -m gdown --folder --id 1CwmFOsYFnq6t33KAzpvw0gaOTQXbcozs -O ./data/
        !powershell -NoProfile -Command "Expand-Archive -Path ./data/experiment-images.zip -DestinationPath ./data/ -Force"
        !powershell -NoProfile -Command "Remove-Item ./data/experiment-images.zip"
    else:
        !gdown --folder --id 1CwmFOsYFnq6t33KAzpvw0gaOTQXbcozs --output ./data
        !unzip ./data/experiment-images.zip
        !rm ./data/experiment-images.zip

#### Pre-Processing

##### Parse Embeddings

**Textual**

We saved the embedding of the prompt "A picture of {c}", where c is the name of the relevant concept. The reason for that choice is that CLIP reacts very well to prompting, and that embedding is improving results over the non-prompt version. Because we only saved one embedding per concept, the usage of the text embeddings would be very straightfowrward - there is only one way to use them.<br>
For further analysis (outside of the scope of this project), because CLIP is so prompt aware, we could consider ensembling of multiple prompts, and handling of multiple text-embedding per concept. <br>
Finally, we'll also grab the list of concepts - it would be relevant in later analysis.

In [None]:
import numpy as np

with np.load("data/clip_text_embeddings.npz") as text_embeddings:
    clip_text_embeddings = text_embeddings["data"]

**Visual**

The parsing of visual data is much more interesting. In the dataset supplied by the original Pereira paper, there are 6-7 photos for each concept, all of them describing the concept (in contrast to the text which only has one related embedding vectore - the name of the concept). Because we're working with the high-end model of CLIP, embedding takes quite a bit of time. Moreover, the because the input is the same each time, the output embedding vector is also equal for each individual photo. For those reasons, we chose to separate the embedding itself from the main pipeline, and use it as a baseline. The strategy, though, was to embed and save every single picture for every single concept. This was done because when dealing with multiple inputs, it begs the question of what is the best way to process them for later evaluation. <br>
Specifically, the possible we checked for processing our base image embeddings are:
1. Mean - the "intuitive" way to approach multiple inputs per concept, this is just taking all of the embedding vectors, and averaging each of their components to create one "centralized" one.
2. Medoid - It's a similar way of getting one clustered value from a batch of values, but it differs slightly. Medoid is the data point whose average distance to all other points is smallest.
3. Best of K - This is a third way to look at the data, based on the first two. We'll still calculate the average or the medoid for each concept, but instead of clustering ALL of our data for each concept, we'll the the k best ones. For example, a Best of 3 mean approach would mean we'd take the 3 "best" image-based embedding vectors of the concept, and take their mean as our final embedding vector. The definition of "Best" here would be "Cosine Similarity to the embedding of the single text-based embedding vector of the same concept". The choice of cosine similarity to define closeness is extensively discussed further down the notebook.

Let's define all of the functions we discussed: <br>


In [None]:
from typing import List, Tuple
def prototypes_mean(embeddings: np.ndarray, concept_ids: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Mean prototype per concept.

    Args:
        embeddings: (M, D) image embeddings (many rows per concept)
        concept_ids: (M,) integer concept id per row

    Returns:
        concepts: (C_unique,) sorted unique concept ids
        protos: (C_unique, D) mean embedding per concept
    """
    concepts = np.unique(concept_ids)
    protos: List[np.ndarray] = []
    for c in concepts:
        protos.append(embeddings[concept_ids == c].mean(axis=0))
    return concepts, np.vstack(protos)


def prototypes_medoid(embeddings: np.ndarray, concept_ids: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Medoid prototype per concept = instance with max mean cosine to others.

    Args:
        embeddings: (M, D)
        concept_ids: (M,)

    Returns:
        concepts: (C_unique,)
        protos: (C_unique, D)
    """
    concepts = np.unique(concept_ids)
    protos: List[np.ndarray] = []
    for c in concepts:
        E = embeddings[concept_ids == c]
        En = row_norm(E)
        S = En @ En.T  # cosine sims (m x m)
        i_medoid = int(np.argmax(S.mean(axis=1)))
        protos.append(E[i_medoid])
    return concepts, np.vstack(protos)


def prototypes_topk_by_text_anchor(
    embeddings: np.ndarray,
    concept_ids: np.ndarray,
    text_by_concept: Dict[int, np.ndarray],
    k: int = 3,
    agg: str = "mean"
) -> Tuple[np.ndarray, np.ndarray]:
    """
    For each concept, select the top-k image instances by cosine to that concept's
    TEXT embedding (anchor), then aggregate by mean or medoid.

    Args:
        embeddings: (M, D) image embeddings
        concept_ids: (M,) integer concept id per row
        text_by_concept: dict mapping concept id -> (D,) text embedding
        k: number of instances to keep
        agg: 'mean' or 'medoid'

    Returns:
        concepts: (C_unique,)
        protos: (C_unique, D)
    """
    concepts = np.unique(concept_ids)
    protos: List[np.ndarray] = []
    for c in concepts:
        E = embeddings[concept_ids == c]
        anchor = text_by_concept[int(c)]
        En = row_norm(E)
        a = anchor / max(np.linalg.norm(anchor), 1e-12)
        sims = En @ a
        idx = np.argsort(-sims)[:min(k, E.shape[0])]
        Ek = E[idx]
        if agg == "mean" or Ek.shape[0] == 1:
            protos.append(Ek.mean(axis=0))
        elif agg == "medoid":
            Ekn = row_norm(Ek)
            S = Ekn @ Ekn.T
            i_med = int(np.argmax(S.mean(axis=1)))
            protos.append(Ek[i_med])
        else:
            raise ValueError("agg must be 'mean' or 'medoid'")
    return concepts, np.vstack(protos)

##### Masking

#### Training ####
As stated earlier, we are going to keep Pereira et al's method of learning a **linear** decoder, that would be able to generalize from the data it saw to unseen data, and thus capture deeper meaning than the training data itself. In the original paper, they managed to achieve results which are **much** better than chances, implying that a linear decoder is more than sufficient to capture meaning of textual-fMRI data when the data is projected onto an embedding space. <br>
We have changed the embedding space from GloVe to CLIP, in order to see if textual-fMRI data can capture meaning of images as well, but we believe that adding extra complexity to the model will defeat our purpose. If our hypothesis is correct, then a linear decoder will be able to capture the meaning of the images in the same embedding space, without adding any non-linearity - a simple model should suffice, as much as is did for the same modality data. <br>
For that reason, we are going to train our decoder in the same way that it was trained in the original paper:
* For each participant's fMRI data - we're only going to take the top 5000 relevant `voxels`.
* For the decoder - we're going to learn a simple ridge regression (using the same code)
<br> 
Another important thing to remember is that we're only going to feed the function **textual** fMRI data, because our hypothesis states that from the same areas in the brain responsible for interpreting textual data can also give us insight on visual data. That means that we will only train our model on textual data, and withold any visual data for evaluation only.

First, let's define the function that will help us determine the top 5000 relevant voxels: <br>
The fMRI data from the experiments consists of a big series of voxel each corresponding to the activation in a different area in the brain. The problem here is that most of the voxels are non-importent and are just noise which will reduce our model's accuracy. For that reason we will clean up the data and only use the 5000 most influencing voxels out of the 200,000 in the original fMRI data and we will be doing so using the select_top_voxels_indexes function:

In [5]:
from sklearn.feature_selection import f_regression
import numpy as np
import tqdm

def select_top_voxels_indices(fmri_data, semantic_vectors, num_voxels=5000):
    f_scores = []
    for i in tqdm.tqdm(range(semantic_vectors.shape[1])):
        f, _ = f_regression(fmri_data, semantic_vectors[:, i])
        f_scores.append(f)

    f_scores = np.array(f_scores)
    voxel_scores = np.max(f_scores, axis=0)
    top_voxel_indices = np.argsort(voxel_scores)[-num_voxels:]

    return top_voxel_indices

Now, let's take the fMRI textual data from our data folder:

In [6]:
import scipy.io

mat = scipy.io.loadmat("data/brain-responses-data/examples_180concepts_wordclouds.mat")
fmri_text_data = mat["examples"]

and get only our top voxels:

In [7]:
top_voxel_indices = select_top_voxels_indices(fmri_text_data, clip_text_embeddings)
reduced_fmri_data = fmri_text_data[:, top_voxel_indices]

100%|██████████| 768/768 [02:39<00:00,  4.80it/s]


For training the decoder (fitting a linear mapping from brain features, aka. voxels, to our semantic space), we'll use the function "learn_decoder", which we took directly from the original paper:

In [8]:
""" learn_decoder """
import sklearn.linear_model

def learn_decoder(data, vectors):
     """ Given data (a CxV matrix of V voxel activations per C concepts)
     and vectors (a CxD matrix of D semantic dimensions per C concepts)
     find a matrix M such that the dot product of M and a V-dimensional 
     data vector gives a D-dimensional decoded semantic vector. 

     The matrix M is learned using ridge regression:
     https://en.wikipedia.org/wiki/Tikhonov_regularization
     """
     ridge = sklearn.linear_model.RidgeCV(
         alphas=[1, 10, .01, 100, .001, 1000, .0001, 10000, .00001, 100000, .000001, 1000000],
         fit_intercept=False
     )
     ridge.fit(data, vectors)
     return ridge.coef_.T

We're now ready to train our decoder, but we would like to do so not on all of the avaliable data - it is very scarce, and so we want to run k-fold cross validation on the decoder, and actually train and evaluate a bunch of decoders as part of the same pipeline, and plot out the result as an average, to get a better sense of the true preformance of our decoder, and thus, our hypothesis. Theoretically, the code to train a decoder on all of the available textual data would be as follows: 

In [49]:
decoder = learn_decoder(reduced_fmri_data, clip_text_embeddings)

But we're going to train the actual decoders we'll use as a part of the evaluation pipeline.

#### Evaluation

For evaluation, we're going to have the following guiding principles:
1. Distance function (cosine). We measure similarity with cosine. In distributional semantics, cosine best reflects semantic relatedness and was explicitly used in Pereira’s pipeline, so our evaluation geometry matches the embedding space and prior work.
2. K-Fold Cross Validation - Quality fMRI data is extremely limited. Pereira is still the only open English dataset for fMRI cross-modality, single concept data, similar to what we're trying to model. There are some other alternatives - Allen 672 which is Chinese, Tuckute 2024, which is for full sentences only, six words each, each of them on no more than 16 participants. That means we were little available data to train on. Because of that fact, we'll choose to evaluate the data using K-Fold Cross Validation - that way, we can train on every piece of data we have, and still evaluate the model. The folding strategy would be as follows - we're going to train a set of 18 decoders, each trained on 170 concepts, and evaluated on the remaining 10 only. The evaluation would be split into two parts:
* Textual trials (to confirm within-modality performance)
* Visual trials (to test the hypothesis that text-trained decoders generalize cross-modally)
For the eval, we'll evaluate on the concepts we didn't train on - both on the visual and the textual data. This is to prevent data leakage - nothing about a test concept appears in training.
3. Primary metric: rank accuracy (normalized). For each held-out stimulus, we feed its brain image into the decoder to produce a semantic vector (the decoder maps brain→text semantic space), then compare it by cosine to all candidate vectors in the same evaluation set and compute rank accuracy (normalized):
$$
\text{rank\_accuracy} \;=\; 1 \;-\; \frac{\text{rank} - 1}{\#\text{candidates} - 1}
$$
This yields a score in [0, 1] with chance = 0.5, and 1.0 being a perfect decoder (always spits out the correct answer); it’s exactly the statistic used in Pereira.

##### Symbols Definition
- $N$ - Number of rows, being trials or training data
- $V$ - Voxel number
- $D$ - Dimension of the embedding space (512 for CLIP over the 300 of GloVe)
- $C$ - Concepts number (no. of possible candidates, for Pereira will be 180)
- $M$ - Raw images number - before aggregating them.
- $B$ - Bulk size for leaving out of training for the cross validation process

##### Helper functions

**row_norm** - This function is getting an `ndarray`, and normalizing each of it rows. We need this kind of normalization because it's necessary for cosine similarity. Similarly, in CLIP, both the image and text embeddings are being L2-normalized to unit length before computing their similarity. Because of that normalization, when the model uses a dot product, it is exactly equal to cosine similarity. We are going to take a similar route in our calculations, and because the embedding need to be normalized both in training and in inference, that justifies creating a small helper function for them in our opinion.<br>
The `ndarray` we're getting will be in dimensions $(N, D)$, because each row would be a CLIP embedding vector (and hence a $D$ elements), and there are $N$ rows by definition. <br>
For `epsilon`, we'll set it to avoid dividing a vector by zero (the original row of the matrix stays zero, just the norm is capped below by epsilon).

In [9]:
import numpy as np

def row_norm(X: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    """
    L2-normalize each row safely (for cosine). If a row norm is 0, clamp to eps.

    Args:
        X: (N, D) array
        eps: small constant to avoid division by zero

    Returns:
        (N, D) normalized array
    """
    nrm = np.linalg.norm(X, axis=1, keepdims=True)
    nrm = np.maximum(nrm, eps)
    return X / nrm

**cosine_rank** - This function is the core of our main metric of evaluation on a single test trial. We get a `decoded vector`, already a part of the CLIP embedding space (dimension $D$), and we get it's `true index` in the concept list (just an int, indicating it's place in the sorted array of the concepts by name), and all the embeddings of the `candidate concepts`, compiled to a one big matrix, with the dimesions $(C,D)$ where $C$ is the number of concepts, and $D$ is the dimension of our embedding space. This is a 1-based rank, meaning, we're not going for accuracy of the decoder yet, just how close we are for this specific test case to the true concept. Our calculation will work as follows:
1. Normalize everything (both the decoded vector, and the matrix of all the embedded concepts, so everything that's embedded in our embedding space)
2. Calculate cosine similarity using a dot product (Cosine similarity between two vectors a and b is just the dot product of their unit versions.)
3. Sort all of the cosine similarities in a descending order, to find out where the `true_idx` sits.

In [10]:
def cosine_rank(decoded_vec: np.ndarray, true_idx: int, candidate_matrix: np.ndarray) -> int:
    """
    1-based rank of the true candidate by cosine similarity.

    Args:
        decoded_vec: (D,) decoded semantic vector for a single test trial
        true_idx: integer index of the correct row in candidate_matrix
        candidate_matrix: (C, D) candidate concept embeddings

    Returns:
        1-based rank in [1..C] (1 = best)
    """
    dv = decoded_vec / max(np.linalg.norm(decoded_vec), 1e-12)
    Cn = row_norm(candidate_matrix)
    sims = Cn @ dv
    order = np.argsort(-sims)  # descending
    return int(np.where(order == true_idx)[0][0] + 1)

**rank_accuracy** - This function is for actually evaluating a decoder. As part of the evaluation startegy we've outlined, we said that we're going to do a cross validation, training $C / B$ decoders, each with $C - B$ training samples, and $B$ test samples. For evaluating the decoder, we outlined our main metric as rank_accuracy, which is: _across all test samples, how accurate is a decoder (1.0 being always correct, 0.5 chance value)_. This is exactly what this function does - we're getting all of our decoded test vectors, a matrix of dimensions $(N,D)$, which is $N$ test vectors of $D$ embedding elements each, an array of $N$ test true indexes for the concepts, and the entire concept embeddings, which, as earlier, is $(C,D)$. We then:
1. Calculate the ranks of all of our test decoded vectors.
2. Calculate the accuracy of the decoder, the mean rank, and any other matric we want for that decoder (this function works for a single decoder).

In [11]:
from typing import Dict
def rank_accuracy(
    decoded: np.ndarray,
    true_indices: np.ndarray,
    candidates: np.ndarray
) -> Dict[str, object]:
    """
    Compute raw ranks per test row, mean raw rank, and normalized rank accuracy.

    Args:
        decoded: (N_test, D) decoded vectors for the test rows
        true_indices: (N_test,) integer index per row into 'candidates'
        candidates: (C, D) candidate concept embeddings

    Returns:
        {
          "ranks": np.ndarray of shape (N_test,), dtype=int (1..C),
          "mean_rank": float,
          "rank_accuracy": float in [0,1], chance = 0.5
        }
    """
    ranks = np.array(
        [cosine_rank(decoded[i], int(true_indices[i]), candidates) for i in range(decoded.shape[0])],
        dtype=int
    )
    C = candidates.shape[0]
    mean_rank = float(ranks.mean()) if ranks.size else float('nan')
    # Normalized rank accuracy (Pereira): 1 - (mean_rank - 1)/(C - 1)
    rank_acc = 1.0 - (mean_rank - 1.0) / (C - 1.0) if C > 1 and np.isfinite(mean_rank) else float('nan')
    return {"ranks": ranks, "mean_rank": mean_rank, "rank_accuracy": rank_acc}

The beauty of all of our helper functions is that they are modal agnostic. Meaning, they're just getting CLIP embeddings, and do not care whether the data originated in textual or visual inputs, in fMRI, or in anything else. This means that the implementation is extremely generic, and we can switch practically everything we want around - as long as the main way to calculate "distance" between two vectors is cosine similarity, and not Euclidean distance, for example. Because the approach of cosine similiarity is very popular, and the most prevalent in practically all embedding spaces, this gives us (and potentially future researches) a ton of freedom to work with - The pipeline will be able to work on practically any input data, as long as it's possible to project it onto a relevant embedding space.

##### Fitting a decoder (Training & Inference)

This is the part that we've mentioned earlier, where we skipped our training. We'll write the relevant functions to fit a decoder, and for the forward pass of the inference, which is the decoding of test data. Both would be called at different stages of the cross validation pipeline.

**fit_decoder** - This function is a small wrapper to the `learn_decoder` function that from the Pereira paper. The point of the wrapper is to:
1. Be able to switch out `learn_decoder` for any other fitting function or pipeline - if we wanted to try and fit using elasticnet instead of ridge, or even add non-linearity, for further testing of hypotheses, like trying to optimize for the best voxel mask size, instead of the original 5000 (all of these won't be a part of our analysis, but still, we find that allowing further possible research is an essential part of the research process, as almost all research is built on top of previous research).
2. Masking indexes - according to the Pereira paper, if we're masking our training data, we need to mask the same indexes on our test data. This could be done without a function, but saving the masking as a built in part of the process prevents later errors.
3. Scaling - We're also scaling our data after training, to standardize the data around the mean 0 and with standard deviation of 1. This helps us to create a robust scale for any application of usage of our embedding features, whether PCA, linear regression, or a full-blown nueral network.

In [12]:
from dataclasses import dataclass
import numpy as np
from sklearn.preprocessing import StandardScaler

@dataclass
class TrainedDecoder:
    W: np.ndarray                  # (V_sel, D)
    scaler: StandardScaler | None  # fitted on train only
    mask: np.ndarray | None        # boolean or int indices for selected voxels

def fit_decoder(train_X, train_Y, learn_decoder_fn, use_scaler=True, mask=None):
    """
    train_X: (N_train, V) brain features
    train_Y: (N_train, D) semantic targets (text space)
    mask:    optional voxel selection (fit on train only)
    Returns: TrainedDecoder(W, scaler, mask)
    """
    scaler = StandardScaler().fit(train_X) if use_scaler else None
    Xs = scaler.transform(train_X) if scaler else train_X
    Xs = Xs[:, mask] if mask is not None else Xs

    # This is exactly your ridge-based learner:
    W = learn_decoder_fn(Xs, train_Y)  # shape (V_sel, D)
    return TrainedDecoder(W=W, scaler=scaler, mask=mask)

**decoder** - This function is for our inference, and is simply doing the following things:
1. Apply the same scaler and mask we've applied in the `fit_decoder` function.
2. Doing a dot product on the scaled decoder (which contains all our "learned parameters", per se).

Again, we're covering our bases here with a generic wrapper, as we want our code to possibly be used in many future research directions.

In [13]:
def decode(model: TrainedDecoder, X: np.ndarray) -> np.ndarray:
    """
    Apply the same scaler and voxel mask, then multiply by W.

    Args:
        model: TrainedDecoder returned by fit_decoder
        X: (N, V) brain features to decode

    Returns:
        (N, D) decoded semantic vectors
    """
    Xs = model.scaler.transform(X) if model.scaler is not None else X
    if model.mask is not None:
        Xs = Xs[:, model.mask]
    return Xs @ model.W


##### Cross Validation

In [16]:
def make_folds_by_concept(concept_ids: np.ndarray, fold_size: int) -> List[Tuple[np.ndarray, np.ndarray]]:
    """
    Create concept-grouped folds: each test fold holds out 'fold_size' concepts.

    Args:
        concept_ids: (N,) concept label per row of the *training* matrix you'll split
        fold_size: number of concepts in each test fold (e.g., 10)

    Returns:
        List of (train_idx, test_idx) index arrays into the rows of the input arrays
    """
    uniq = np.unique(concept_ids)
    folds: List[Tuple[np.ndarray, np.ndarray]] = []
    for start in range(0, len(uniq), fold_size):
        held = uniq[start:start + fold_size]
        test_mask = np.isin(concept_ids, held)
        train_idx = np.where(~test_mask)[0]
        test_idx = np.where(test_mask)[0]
        folds.append((train_idx, test_idx))
    return folds

In [17]:

def cross_validate_once_per_fold(
    X_brain: np.ndarray,             # (N, V) rows aligned with Y_targets & concept_ids
    Y_targets: np.ndarray,           # (N, D)
    concept_ids: np.ndarray,         # (N,)
    fold_size: int,
    learn_decoder_fn,                # your learn_decoder
    eval_sets: Dict[str, Dict[str, np.ndarray]],
    use_scaler: bool = True,
    make_mask_fn = None             # optional callable(train_X, train_Y) -> mask (per fold)
) -> Dict[str, List[Dict[str, object]]]:
    """
    Train the decoder once per fold on the training split, then evaluate it on
    multiple evaluation sets for *held-out concepts only*.

    Args:
        X_brain: (N, V) brain data used to train (e.g., text trials only)
        Y_targets: (N, D) target text-space embeddings aligned to X_brain
        concept_ids: (N,) concept id per row in X_brain (used to define folds)
        fold_size: number of concepts in each test fold (e.g., 10)
        learn_decoder_fn: your learn_decoder
        eval_sets: dict mapping eval-name -> dict with:
            {
              "X": (N_eval, V),                   # brain rows to decode
              "concept_ids": (N_eval,),           # concept id per row
              "candidates": (C, D),               # candidate matrix to rank against
              "candidate_concepts": (C,)          # concept ids in same order as rows in 'candidates'
            }
        use_scaler: fit StandardScaler on training split
        make_mask_fn: optional per-fold voxel selection function fit on training split

    Returns:
        results: dict eval-name -> list (per fold) of metrics dicts from ranks_and_accuracy
    """
    # Prepare result containers
    results: Dict[str, List[Dict[str, object]]] = {name: [] for name in eval_sets.keys()}

    # Build concept-grouped folds on the training set's concept ids
    folds = make_folds_by_concept(concept_ids, fold_size)

    # For each fold: train once, evaluate across all eval sets
    for fold_idx, (train_idx, test_idx) in enumerate(folds):
        # Optional voxel selection (fit on training split only)
        mask = None
        if make_mask_fn is not None:
            mask = make_mask_fn(X_brain[train_idx], Y_targets[train_idx])

        # Fit decoder
        model = fit_decoder(
            train_X=X_brain[train_idx],
            train_Y=Y_targets[train_idx],
            learn_decoder_fn=learn_decoder_fn,
            use_scaler=use_scaler,
            mask=mask
        )

        # The set of held-out concepts for this fold (used to filter eval sets)
        held_concepts = np.unique(concept_ids[test_idx])

        # Evaluate against each eval view
        for name, spec in eval_sets.items():
            X_eval_all   = spec["X"]
            cid_eval_all = spec["concept_ids"]
            Cmat         = spec["candidates"]
            Cconcepts    = spec["candidate_concepts"]

            # Select only rows whose concept is in the held-out set
            mask_rows = np.isin(cid_eval_all, held_concepts)
            if not np.any(mask_rows):
                # Nothing to evaluate for this view in this fold
                results[name].append({"ranks": np.array([], dtype=int), "mean_rank": float('nan'), "rank_accuracy": float('nan')})
                continue

            X_eval = X_eval_all[mask_rows]
            cid_eval = cid_eval_all[mask_rows]

            # Map each eval row's concept id to the index in the candidate matrix
            concept_to_idx = {int(c): i for i, c in enumerate(Cconcepts)}
            try:
                true_indices = np.array([concept_to_idx[int(c)] for c in cid_eval], dtype=int)
            except KeyError as e:
                missing = int(e.args[0])
                raise KeyError(f"Concept id {missing} not found in candidate_concepts for eval set '{name}'. "
                               "Ensure candidate_concepts matches your concept_id space.") from e

            # Decode brain -> semantic
            decoded = decode(model, X_eval)

            # Rank metrics
            metrics = rank_accuracy(decoded, true_indices, Cmat)
            results[name].append(metrics)

    return results

In [18]:
def summarize_cv_results(results: Dict[str, List[Dict[str, object]]]) -> Dict[str, Dict[str, float]]:
    """
    Compute simple per-view averages across folds.

    Args:
        results: dict eval-name -> list of per-fold metrics

    Returns:
        summary: dict eval-name -> {"rank_accuracy": float, "mean_rank": float}
    """
    summary: Dict[str, Dict[str, float]] = {}
    for name, folds in results.items():
        ra = np.nanmean([f.get("rank_accuracy", np.nan) for f in folds])
        mr = np.nanmean([f.get("mean_rank", np.nan) for f in folds])
        summary[name] = {"rank_accuracy": float(ra), "mean_rank": float(mr)}
    return summary

In [20]:
import numpy as np

# --- label normalization (prevents “cat ” vs “Cat” mismatches) ---
def _norm_label(x) -> str:
    return str(x).strip().lower()

def run_cv_text_and_images_using_your_prototypes(
    image_npz_path: str,
    reduced_fmri_data: np.ndarray,     # (C, V)
    clip_text_embeddings: np.ndarray,  # (C, D)
    concept_labels: np.ndarray,        # (C,) strings, aligned to rows of fmri/text
    *,
    fold_size: int = 10,
    image_proto: str = "topk",         # "mean" | "medoid" | "topk"
    k: int = 3,
    agg: str = "mean"
):
    """
    Uses YOUR prototypes_mean / prototypes_medoid / prototypes_topk_by_text_anchor,
    and your cross_validate_once_per_fold (which expects rank_accuracy).
    Assumes you have: learn_decoder, fit_decoder, decode, cross_validate_once_per_fold, rank_accuracy
    """
    # 1) Load image embeddings
    pack = np.load(image_npz_path, allow_pickle=True)
    img_embeddings = np.asarray(pack["embeddings"], dtype=np.float32)  # (M, D)
    img_concepts_raw = pack["concepts"]                                 # (M,) strings

    # 2) Normalize labels for robust mapping
    concept_labels = np.asarray(concept_labels)
    labels_norm = np.array([_norm_label(s) for s in concept_labels])
    label_to_id = {lab: i for i, lab in enumerate(labels_norm)}

    img_concepts_norm = np.array([_norm_label(s) for s in img_concepts_raw])
    unknown = np.setdiff1d(np.unique(img_concepts_norm), labels_norm)
    if unknown.size > 0:
        # Show a few to help you fix concepts.txt vs folder names
        raise ValueError(
            f"{unknown.size} image concepts not found in concept_labels. "
            f"First few: {unknown[:10].tolist()}.\n"
            "Make sure 'concepts.txt' corresponds to your image folder names (case/space-insensitive)."
        )

    img_cids = np.array([label_to_id[s] for s in img_concepts_norm], dtype=int)

    # 3) Build image prototypes using YOUR functions
    #    (these must be imported/defined already)
    if image_proto == "mean":
        uniq_img_concepts, C_img = prototypes_mean(img_embeddings, img_cids)
    elif image_proto == "medoid":
        uniq_img_concepts, C_img = prototypes_medoid(img_embeddings, img_cids)
    elif image_proto == "topk":
        # Build dict[int -> (D,)] expected by your prototypes_topk_by_text_anchor
        text_by_concept = {i: clip_text_embeddings[i] for i in range(clip_text_embeddings.shape[0])}
        uniq_img_concepts, C_img = prototypes_topk_by_text_anchor(
            embeddings=img_embeddings,
            concept_ids=img_cids,
            text_by_concept=text_by_concept,
            k=k,
            agg=agg
        )
    else:
        raise ValueError("image_proto must be 'mean', 'medoid', or 'topk'")

    # 4) Pack eval sets: text and image
    X = np.asarray(reduced_fmri_data, dtype=np.float32)
    Y = np.asarray(clip_text_embeddings, dtype=np.float32)
    C = X.shape[0]
    assert Y.shape[0] == C, f"FMRI and text embeddings row mismatch: {X.shape} vs {Y.shape}"

    concept_ids = np.arange(C, dtype=int)

    eval_sets = {
        "text": {
            "X": X,
            "concept_ids": concept_ids,
            "candidates": Y,                      # text candidates
            "candidate_concepts": concept_ids
        },
        "image": {
            "X": X,
            "concept_ids": concept_ids,
            "candidates": C_img,                  # image prototypes (subset of concepts)
            "candidate_concepts": uniq_img_concepts
        }
    }

    # 5) Run your CV driver (train once per fold on text → evaluate both views)
    results = cross_validate_once_per_fold(
        X_brain=X,
        Y_targets=Y,
        concept_ids=concept_ids,
        fold_size=fold_size,
        learn_decoder_fn=learn_decoder,
        eval_sets=eval_sets,
        use_scaler=True,
        make_mask_fn=None
    )

    # 6) Summarize
    def _summarize(res):
        out = {}
        for name, folds in res.items():
            ra = np.nanmean([f["rank_accuracy"] for f in folds])
            mr = np.nanmean([f["mean_rank"] for f in folds])
            out[name] = {"rank_accuracy": float(ra), "mean_rank": float(mr)}
        return out

    summary = _summarize(results)
    print("CV summary:")
    for view, s in summary.items():
        print(f" - {view:>5s}: rank_acc={s['rank_accuracy']:.3f}  mean_rank={s['mean_rank']:.1f}  (chance rank_acc=0.5)")
    return results, summary


In [56]:
concepts = np.genfromtxt("data/concepts.txt", dtype=str)

results, summary = run_cv_text_and_images_using_your_prototypes(
    image_npz_path="data/clip_image_embeddings.npz",
    reduced_fmri_data=reduced_fmri_data,
    clip_text_embeddings=clip_text_embeddings,
    concept_labels=np.array(concepts),
    fold_size=8,
    image_proto="topk",   # or "mean", "medoid"
    k=3,
    agg="mean"
)


CV summary:
 -  text: rank_acc=0.696  mean_rank=55.3  (chance rank_acc=0.5)
 - image: rank_acc=0.633  mean_rank=66.6  (chance rank_acc=0.5)


### Multimodal to Cross-Modality
