In [3]:
import pickle
import sys
import os
import numpy as np

## Load the data

In [4]:
with open('data/EXP2.pkl', 'rb') as f: 
	exp2_data = pickle.load(f)
with open('data/EXP3.pkl', 'rb') as f: 
    exp3_data = pickle.load(f)

In [5]:
voxel_neighbors = exp2_data['meta'][0][0][7]

In [6]:
#Let's load the functions from learn_decoder.py
from learn_decoder import read_matrix

#and the data for experiment 1
exp1_fmri = read_matrix("data/neuralData_for_EXP1.csv", sep=",", header=True, index_col=True)
exp1_vecs = read_matrix("data/vectors_180concepts.GV42B300.txt", sep=" ")
exp1_conc = np.genfromtxt('data/stimuli_180concepts.txt', dtype=np.dtype('U'))  #The names of the 180 concepts

In [5]:
from tqdm.auto import tqdm
import sklearn.linear_model
from scipy.stats import pearsonr
from joblib import Parallel, delayed

  from .autonotebook import tqdm as notebook_tqdm


In [7]:
# --- Helper function for preparing data for a single voxel ---
def get_voxel_specific_data(data, v_idx, voxel_neighbors):
    """
    Extracts data for a central voxel and its 3D neighbors.
    
    @param data: (n_samples, V_total) array
    @param v_idx: index of the central voxel
    @param voxel_neighbors: matrix of voxel neighbors, where each row corresponds to a voxel's neighbors.
    
    Returns: (n_samples, K) array, where K is 1 (center) + number of actual neighbors found.
             The central voxel's data is the first column.
    """
    center_voxel_data = data[:, v_idx]
    
    # Collect neighbors as in the original logic
    neighbor_columns = []
    neighbors_chosen = set() # Start with the center voxel index to avoid duplicates
    neighbors_chosen.add(v_idx) # Ensure the center voxel is included
    for n_loop_idx in voxel_neighbors[v_idx]:
        if n_loop_idx < 0 or n_loop_idx >= data.shape[1]:
            continue
        if n_loop_idx not in neighbors_chosen:
            neighbors_chosen.add(n_loop_idx)
            neighbor_columns.append(data[:, n_loop_idx])
    
    if not neighbor_columns:
        return center_voxel_data[:, np.newaxis] # Return as (n_samples, 1)
    else:
        return np.column_stack([center_voxel_data] + neighbor_columns)

# --- Helper function for processing one voxel within a fold (for parallel execution) ---
def process_voxel_for_fold(v_idx, train_data_fold, test_data_fold, 
                           train_vectors_fold, true_test_vectors_fold,
                           voxel_neighbors, ridge_alpha):
    """
    Processes a single voxel: fits ridge, predicts, correlates.
    Returns the maximum absolute correlation for this voxel in this fold.
    """
    # 1. Prepare feature matrices for this voxel
    # X_train_v will be (n_train_samples, n_features_for_voxel_v)
    # n_features_for_voxel_v can vary near edges if window is truncated.
    X_train_v = get_voxel_specific_data(train_data_fold, v_idx, voxel_neighbors)
    X_test_v = get_voxel_specific_data(test_data_fold, v_idx, voxel_neighbors)

    # Basic check, though get_voxel_specific_data should always return at least one column
    if X_train_v.shape[1] == 0 or X_test_v.shape[1] == 0 or X_train_v.shape[0] == 0 or X_test_v.shape[0] == 0:
        return 0.0

    # 2. Fit Ridge Regression model
    ridge = sklearn.linear_model.Ridge(alpha=ridge_alpha, fit_intercept=True)
    try:
        ridge.fit(X_train_v, train_vectors_fold)
    except ValueError: # Can happen if X_train_v is problematic (e.g. all zeros, too few samples/features)
        return 0.0

    # 3. Predict semantic vectors for the test data
    predicted_vectors = ridge.predict(X_test_v) # Shape: (n_test_samples, n_semantic_dimensions)

    # 4. Vectorized Pearson Correlation
    # P = predicted_vectors, T = true_test_vectors_fold
    # Both are (n_samples_test, n_semantic_dims)
    
    if predicted_vectors.shape[0] == 0: # No test samples
        return 0.0

    P_demeaned = predicted_vectors - np.mean(predicted_vectors, axis=0, keepdims=True)
    T_demeaned = true_test_vectors_fold - np.mean(true_test_vectors_fold, axis=0, keepdims=True)

    numerator = np.sum(P_demeaned * T_demeaned, axis=0)
    
    ss_P = np.sum(P_demeaned**2, axis=0)
    ss_T = np.sum(T_demeaned**2, axis=0)
    
    denominator = np.sqrt(ss_P * ss_T)
    
    correlations_all_dims = np.zeros_like(denominator) # Initialize with zeros
    
    # Avoid division by zero
    valid_mask = denominator > 1e-12 # Check if denominator is meaningfully non-zero
    
    correlations_all_dims[valid_mask] = numerator[valid_mask] / denominator[valid_mask]
    
    # Handle any potential NaNs that might arise from input NaNs or perfect zero variance in original data
    correlations_all_dims = np.nan_to_num(correlations_all_dims, nan=0.0, posinf=0.0, neginf=0.0)

    if correlations_all_dims.size == 0:
        return 0.0
        
    return np.max(np.abs(correlations_all_dims))


# --- Main Function ---
def voxel_representativeness_optimized(data, vectors, voxel_neighbors, n_jobs: int = -1) -> np.ndarray:
    """ Given a CxV matrix of C concepts and V voxel activations,
    return a V-dimensional vector of voxel representativeness.
    Optimized for speed using joblib parallelism and vectorized correlation.
    
    @param data: (S, V) numpy array of S samples (e.g., trials/timepoints) and V voxel activations.
    @param vectors: (S, C) numpy array of S samples and C semantic features/dimensions.
    @param voxel_neighbors: A matrix where each row corresponds to a voxel and contains indices of its neighbors.
    @param n_jobs: Number of parallel jobs for joblib. -1 means use all available cores.
    @return: A V-dimensional vector of voxel representativeness scores.
    """

    np.random.seed(3) # For reproducible fold splitting

    n_samples = data.shape[0]
    total_n_voxels = data.shape[1] # V
    
    ridge_alpha = 1.0 # As specified: regularization parameter set to 1

    # Splitting the data into 18 folds
    n_folds = 18
    indices = np.arange(n_samples)
    np.random.shuffle(indices)
    folds = np.array_split(indices, n_folds)

    # Initialize a vector to hold the maximum correlation for each voxel across all folds
    max_correlation_across_folds = np.zeros(total_n_voxels)

    for fold_idx, test_fold_indices in enumerate(tqdm(folds, desc="Processing Folds", unit="fold")):
        train_indices = np.setdiff1d(indices, test_fold_indices)

        # Create training and test sets for this fold
        # Data: (samples, voxels)
        # Vectors: (samples, semantic_dims)
        current_train_data = data[train_indices, :]
        current_test_data = data[test_fold_indices, :]
        current_train_vectors = vectors[train_indices, :]
        current_true_test_vectors = vectors[test_fold_indices, :]

        if current_train_data.shape[0] == 0 or current_test_data.shape[0] == 0:
            print(f"Warning: Fold {fold_idx+1} has no train or test samples, skipping.")
            continue
            
        # Parallel processing of voxels for the current fold
        # tqdm can be used with joblib, but requires a bit more setup for nested progress bars.
        # For simplicity here, tqdm is on the outer (fold) loop.
        # The `desc` in the inner loop of the original code will be lost, 
        # but joblib will utilize cores effectively.
        
        # print(f"Fold {fold_idx+1}/{n_folds}: Processing {total_n_voxels} voxels with {n_jobs} workers...")
        
        voxel_correlations_for_this_fold = Parallel(n_jobs=n_jobs)(
            delayed(process_voxel_for_fold)(
                v_idx, # Index of the voxel to process
                current_train_data,
                current_test_data,
                current_train_vectors,
                current_true_test_vectors,
                voxel_neighbors,
                ridge_alpha
            ) for v_idx in range(total_n_voxels) # Iterate over all voxel indices
        )
        
        # Update the max_correlation_across_folds
        # voxel_correlations_for_this_fold is a list of max correlations, one for each voxel in this fold
        for v_idx, fold_corr in enumerate(voxel_correlations_for_this_fold):
            max_correlation_across_folds[v_idx] = max(max_correlation_across_folds[v_idx], fold_corr)

    return max_correlation_across_folds

In [None]:
# We want to lower the dimensionality of the data to 15000 most informative features
# Because it is unclear which features are most informative, we will use the 15000 features with the highest
# overall average values in absolute value.
def get_top_features(data, n_features=None, metric='mean', **kwargs):
    """
    Select the top n_features based on the mean absolute value of each feature across all samples.
    :param data: The input data matrix (samples x features).
    :param n_features: The number of top features to select.
    :param metric: The metric to use for feature selection 
    ('mean' or 'var' for variance or 
    'corr' for max correlation, 
    's_corr' for simple version).
    :return: The reduced data matrix and the indices of the selected features.
    """
    if n_features is None:
        return data, np.arange(data.shape[1])

    if metric not in ['mean', 'var', 'corr']:
        raise ValueError("Metric must be one of the allowed ones.")
    elif metric == 'corr':
        representativeness = voxel_representativeness_optimized(data, glove_vectors, voxel_neighbors, n_jobs=6)
    elif metric == 'var':
        representativeness = np.var(data, axis=0)
    else:
        representativeness = np.mean(np.abs(data), axis=0)
    top_indices = np.argsort(representativeness)[-n_features:]
    # save the representativeness variable to avoid recomputing it
    np.save('data/representativeness.npy', representativeness)

    return data[:, top_indices], top_indices

exp1_fmri_reduced, top_indices = get_top_features(exp1_fmri, n_features=5000, metric='corr')
# Save the reduced data and the top indices
np.save('data/data_reduced.npy', exp1_fmri_reduced)
np.save('data/top_indices.npy', top_indices)

Processing Folds: 100%|██████████| 18/18 [37:29<00:00, 124.97s/fold]


In [None]:
# in case we need to load the data later
# data_reduced = np.load('data/data_reduced.npy')
# top_indices = np.load('data/top_indices.npy')
# representativeness = np.load('data/representativeness.npy')

In [8]:
# in case we need to change the number of features later
representativeness = np.load('data/representativeness.npy')
top_indices = np.argsort(representativeness)[-5000:]
# to take all
# top_indices = np.argsort(representativeness)
exp1_fmri_reduced = exp1_fmri[:, top_indices]

In [9]:
print(exp1_fmri.shape)
print(exp1_fmri_reduced.shape)
print(exp1_vecs.shape)
print(exp1_conc.shape)

(180, 185866)
(180, 5000)
(180, 300)
(180,)


In [10]:
exp2_fmri = exp2_data['Fmridata']
exp2_fmri_reduced = exp2_fmri[:, top_indices]

exp3_fmri = exp3_data['Fmridata']
exp3_fmri_reduced = exp3_fmri[:, top_indices]

In [11]:
print(exp2_fmri.shape)
print(exp2_fmri_reduced.shape)
print(exp3_fmri.shape)
print(exp3_fmri_reduced.shape)

(384, 185866)
(384, 5000)
(243, 185866)
(243, 5000)


In [12]:
# importing the vectors for the seconds and third experiments
exp2_vecs = read_matrix("data/vectors_384sentences.GV42B300.average.txt", sep=" ")
exp3_vecs = read_matrix("data/vectors_243sentences.GV42B300.average.txt", sep=" ")
# importing the corresponding sentences
exp2_sent = np.genfromtxt('data/stimuli_384sentences.txt', delimiter='\t', dtype=np.dtype('U'))
exp3_sent = np.genfromtxt('data/stimuli_243sentences.txt', delimiter='\t', dtype=np.dtype('U'))

In [13]:
print(exp2_vecs.shape)
print(exp3_vecs.shape)
print(exp2_sent.shape)
print(exp3_sent.shape)

(384, 300)
(243, 300)
(384,)
(243,)


# Open Section

In [14]:
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM

  from .autonotebook import tqdm as notebook_tqdm


In [15]:
# loading the glove vectors from the data/glove.42B.300d.txt
glove_embs = {}
# if the embedding are already saved, we can load them
if os.path.exists('data/glove_embs.pkl'):
    with open('data/glove_embs.pkl', 'rb') as f:
        glove_embs = pickle.load(f)
else:
    # otherwise, we load them from the file
    with open('data/glove.42B.300d.txt', 'r', encoding='utf-8') as f:
        for line in f:
            split_line = line.split()
            word = split_line[0]
            vector = np.array([float(x) for x in split_line[1:]])
            glove_embs[word] = vector

In [15]:
# saving the glove embeddings
with open('data/glove_embs.pkl', 'wb') as f:
    pickle.dump(glove_embs, f)

In [16]:
def get_word_sequence_embedding(text_sequence, tokenizer, model):
    """
    Calculates the semantic embedding for a sequence of text by averaging the GloVe word vectors.
    """
    words = text_sequence.split()
    word_vectors = [glove_embs[word] for word in words if word in glove_embs]
    if not word_vectors:
        # If no valid word vectors are found, return a zero vector.
        return np.zeros(300)
    # Average the word vectors to get the sequence embedding.
    sequence_embedding = np.mean(word_vectors, axis=0)
    return sequence_embedding

In [17]:
def get_llm_sequence_embedding(text_sequence, tokenizer, model):
    """
    Calculates the semantic embedding for a sequence of text averaging the hidden states of a language model.
    """
    inputs = tokenizer(text_sequence, return_tensors='pt', padding=True)
    with torch.no_grad():
        # Get the hidden states from the model output.
        outputs = model(**inputs, output_hidden_states=True)
        # We take the hidden states from the last layer.
        hidden_states = outputs.hidden_states[-1]
        # Average the token embeddings to get a single vector for the sequence.
        sequence_embedding = torch.mean(hidden_states, dim=1).squeeze().numpy()
    return sequence_embedding

In [18]:
def setup_model_and_tokenizer(model_name='mistralai/Mistral-7B-v0.3'):
    """
    Loads a pre-trained language model and tokenizer using AutoClass.

    AutoTokenizer and AutoModelForCausalLM are "smart" loaders that automatically
    infer the correct architecture from the model name and download the
    appropriate classes, weights, and configuration. This makes the code
    flexible enough to work with Mistral, Llama, GPT-2, Pythia, etc.
    """
    print(f"Loading {model_name} model and tokenizer...")
    # You can now use any compatible model from the Hugging Face Hub.
    
    # The AutoTokenizer will load the correct tokenizer for the Mistral model.
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # It's good practice to set a padding token if the model doesn't have one.
    # The end-of-sentence token is often used for this purpose.
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
        
    # The AutoModelForCausalLM will load the correct model architecture.
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        # Using torch_dtype=torch.float16 can help save memory on GPUs.
        # Remove this line if you are running on a CPU or encounter issues.
        # torch_dtype=torch.float16,
        # This will try to spread the model across available GPUs/CPU memory.
        # device_map='auto'
    )
    
    print("Model and tokenizer loaded successfully.")
    return model, tokenizer

In [40]:
def reconstruct_sentence(encoder, target_voxel_activation, model, tokenizer, get_sequence_embedding, max_length=20, k=10):
    """
    Reconstructs a sentence using the Brain-Guided Reconstruction method.

    Args:
        encoder (np.array): The brain encoder matrix (num_voxels, embedding_dim).
        target_voxel_activation (np.array): The actual fMRI data for the sentence.
        model: The pre-trained language model.
        tokenizer: The tokenizer for the language model.
        get_sequence_embedding (function): Function to get the semantic embedding of a text sequence.
        max_length (int): The maximum number of words to generate.
        k (int): The number of candidate words to evaluate at each step.

    Returns:
        str: The reconstructed sentence.
    """
    # Start with the model's beginning-of-sequence token.
    reconstructed_indices = [tokenizer.bos_token_id]
    
    print("\n--- Starting Sentence Reconstruction ---")
    
    for i in range(max_length):
        # Convert the current list of token indices to a tensor for the model.
        current_sequence_tensor = torch.tensor([reconstructed_indices])
        
        # Get the model's predictions (logits) for the next word.
        with torch.no_grad():
            outputs = model(current_sequence_tensor)
            next_token_logits = outputs.logits[:, -1, :]

        # Get the top 'k' most likely next tokens from the language model such that
        # they are not special symbols or punctuation.
        token_sorted_indices = torch.argsort(next_token_logits, descending=True)
        token_sorted_indices = token_sorted_indices[0, :].tolist()
        # Filter out tokens that are not valid words (e.g., special symbols, punctuation).
        top_k_tokens = []
        for token in token_sorted_indices:
            if len(top_k_tokens) >= k:
                break
            # If the token is an eos token, we want to add it too
            if token == tokenizer.eos_token_id:
                top_k_tokens.append(token)
                continue
            # Decode the token to check if it's a word.
            decoded_token = tokenizer.decode(token).strip()
            # We strip whitespace and check if the result is purely alphabetic.
            if decoded_token.isalpha():
                top_k_tokens.append(token)

        best_token = -1
        highest_score = -np.inf  # Start with negative infinity.
        
        # This is the core loop where the brain data guides the generation.
        for token in top_k_tokens:
            # Create the candidate sequence of token indices.
            candidate_indices = reconstructed_indices + [token]
            
            # Decode the indices to a text string.
            candidate_text = tokenizer.decode(candidate_indices)
            
            # 1. Get the semantic vector for the candidate sequence.
            v_candidate = get_sequence_embedding(candidate_text, tokenizer, model)
            
            # 2. Use the encoder to predict the fMRI pattern for this candidate.
            predicted_voxel_activation = v_candidate @ encoder
            
            # 3. Score the prediction against the actual fMRI data.
            # Pearson correlation is a good choice for comparing voxel patterns.
            # We add a small epsilon to avoid division by zero if variance is zero.
            epsilon = 1e-8
            score = np.corrcoef(predicted_voxel_activation + epsilon, target_voxel_activation + epsilon)[0, 1]
            
            # Keep track of the token that produces the best score.
            if score > highest_score:
                highest_score = score
                best_token = token
        
        # If no valid token was found, we can stop.
        if best_token == -1:
            print("No valid token found, stopping reconstruction.")
            break

        # Append the best token to our sequence.
        reconstructed_indices.append(best_token)
        
        # If the model generates an end-of-sentence token, we can stop.
        if best_token == tokenizer.eos_token_id:
            print("End-of-sentence token generated.")
            break
            
        # Print the progress for this step.
        current_text = tokenizer.decode(reconstructed_indices)
        print(f"Step {i+1}/{max_length}: Best Score={highest_score:.4f}, Current Text: '{current_text}'")

    # Decode the final list of indices into a human-readable sentence.
    final_sentence = tokenizer.decode(reconstructed_indices, skip_special_tokens=True)
    print("--- Reconstruction Complete ---")
    return final_sentence

In [29]:
# Setting up the model and tokenizer
lm_model, lm_tokenizer = setup_model_and_tokenizer('EleutherAI/pythia-1b-deduped')
# The embedding dimension for Pythia-70M is 768.
embedding_dim = 768

Loading EleutherAI/pythia-1b-deduped model and tokenizer...
Model and tokenizer loaded successfully.


## Word embeddings to Sentence Prediction

In [21]:
from learn_decoder import learn_encoder

In [42]:
# Creating the encoder from the data used in the second experiment
encoder = learn_encoder(voxels=exp2_fmri_reduced, vectors=exp2_vecs)

In [43]:
# for the sake of testing, let's reconstruct the first sentence from the second experiment
target_sentence = exp2_sent[0]
target_voxel_activation = exp2_fmri_reduced[0, :]
print(f"Target Sentence: {target_sentence}")

reconstructed_sentence = reconstruct_sentence(
    encoder=encoder,
    target_voxel_activation=target_voxel_activation,
    model=lm_model,
    tokenizer=lm_tokenizer,
    get_sequence_embedding=get_word_sequence_embedding,
    max_length=20,
    k=10
)
print(f"Reconstructed Sentence: {reconstructed_sentence}")

Target Sentence: An accordion is a portable musical instrument with two keyboards.

--- Starting Sentence Reconstruction ---
Step 1/20: Best Score=0.0000, Current Text: '<|endoftext|>s'
Step 2/20: Best Score=-0.2235, Current Text: '<|endoftext|>s on'
Step 3/20: Best Score=-0.2160, Current Text: '<|endoftext|>s on his'
Step 4/20: Best Score=-0.2029, Current Text: '<|endoftext|>s on his personal'
Step 5/20: Best Score=-0.2029, Current Text: '<|endoftext|>s on his personal Facebook'
Step 6/20: Best Score=-0.1963, Current Text: '<|endoftext|>s on his personal Facebook feed'
Step 7/20: Best Score=-0.2044, Current Text: '<|endoftext|>s on his personal Facebook feed on'
Step 8/20: Best Score=-0.2044, Current Text: '<|endoftext|>s on his personal Facebook feed on Tuesday'
Step 9/20: Best Score=-0.2110, Current Text: '<|endoftext|>s on his personal Facebook feed on Tuesday night'
Step 10/20: Best Score=-0.2154, Current Text: '<|endoftext|>s on his personal Facebook feed on Tuesday night at'
Ste

## Sentence embeddings to Sentence Prediction