### Semantic features as word embeddings

In [1]:
import numpy as np
import os
import torch
import json
from tqdm import tqdm
import scipy
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.cross_decomposition import PLSRegression
from sklearn.linear_model import Ridge, Lasso, LinearRegression
from sklearn.ensemble import StackingRegressor
from sklearn.neural_network import MLPRegressor

from tqdm import tqdm
from numpy import dot
from numpy.linalg import norm
from sklearn.model_selection import LeavePOut
from sklearn.multioutput import MultiOutputRegressor
from sklearn.metrics import r2_score

In [2]:
def cosim(a, b):
    if (norm(a)*norm(b)) == 0: return 0
    return dot(a, b)/(norm(a)*norm(b))

### Semantic features

In [3]:
min_len_line = 5
N_SEMANTIC_FEATURES = 25
semantic_features = {}

def dump_mitchell_web_semantic_features(raw_file = os.path.join("data","mitchell_semantic_raw.txt")):
    with open(raw_file, "r") as datafile:
        lines = datafile.readlines()
        word = None

        for line in lines:

            # Skip empty
            if len(line) >= min_len_line:

                # New feature
                if "Features for" in line:

                    # Discard invalid ones (once fully parsed)
                    if word and len(semantic_features[word]['features']) < N_SEMANTIC_FEATURES: del semantic_features[word] 
                        
                    word = line.split("<a name=\"")[1].split("\"")[0]
                    semantic_features[word] = { "features": [], "values": []}

                elif word:
                    feature_name = line.split("(")[0]
                    val = float(line.split("(")[1].split(")")[0])
                    semantic_features[word]["features"].append(feature_name)
                    semantic_features[word]["values"].append(val)

    # Save to file
    #with open(os.path.join('data', 'mitchell_semantic_features.json'), 'w') as fp:
    #    json.dump(semantic_features, fp)

    return semantic_features


def load_sorted_semantic_features(file = os.path.join("data","mitchell_semantic_features.json")):
    with open(file) as f:
        semantic_features = json.load(f)
        for word in semantic_features.keys():
            # Sort all features
            sorted_features = sorted(enumerate(semantic_features[word]["features"]), key=lambda x:x[1])
            sorted_indices = [i[0] for i in sorted_features]
            sorted_values = [semantic_features[word]["values"][i] for i in sorted_indices]

            # Re-store them
            semantic_features[word]["features"] = [x[1] for x in sorted_features]
            semantic_features[word]["values"] = sorted_values
            break

    return semantic_features
            

### fMRI data

In [4]:
def get_mitchell_original_data(subject = 1, random_voxels = None):
    mdata = scipy.io.loadmat(os.path.join("data", "mitchell", f"data-science-P{subject}.mat"))
    subject_data = {}

    # 6 x 60 trials
    for i in range(mdata["data"][:].shape[0]):
        cond, cond_number, word, word_number, epoch = [x[0] for x in mdata["info"][0][i]]

        # Set trial data
        if epoch[0] not in subject_data: subject_data[epoch[0]] = {}

        if random_voxels:
            random_voxels_idx = np.random.choice(mdata["data"][i][0][0].shape[0], random_voxels)
            subject_data[epoch[0]][word] = mdata["data"][i][0][0][random_voxels_idx]
        else: subject_data[epoch[0]][word] = mdata["data"][i][0][0]

    return subject_data

### Voxel selection methods

In [5]:
def lowest_std_voxels(completeFmriData, train_split_indices, K=1000):
    n_voxels = completeFmriData[1]["bell"].shape[0]
    train_words = [k for i,k in enumerate(completeFmriData[1].keys()) if i in train_split_indices] # words used for training
    word_wise_activations = np.array([[completeFmriData[epoch][word] for epoch in range(1,7)] for word in train_words]) # (6, 58)    
    voxels_stds = np.abs(np.mean(word_wise_activations.std(axis=1), axis=0)) # stds across epochs, mean across words
    return np.argpartition(voxels_stds, K)

In [6]:
def mitchell_stable_voxels(fmriData, train_split_indices, K = 500):
    
    # Get total number of voxels
    voxels = fmriData[1]["bell"].shape[0]

    # Get scores of the voxels
    scores = []
    for vx in range(voxels):

        # Gathering epoch-wise brain activity
        repetitions = []
        for epoch in fmriData.keys():
            # store activations only for THAT vx voxel
            repetitions.append(np.array([fmriData[epoch][word][vx] for word in fmriData[epoch].keys()]))

        # (epochs, words) = (6, 58)
        repetitions = np.array(repetitions)

        # Compute voxel scores ONLY wrt. the training slice of words
        voxel_correlation_score = []
        for i in range(repetitions.shape[0]):
            for j in range(i+1, repetitions.shape[0]):
                # (6, 6) but without triangular down and diagonal = (36 - 6) / 2 = 15 values 
                voxel_correlation_score.append(np.correlate(repetitions[i, train_split_indices], repetitions[j, train_split_indices]))

        voxel_correlation_score = np.array(voxel_correlation_score)
        scores.append(np.mean(voxel_correlation_score))
    
    # indices of the most stable voxels
    return np.argpartition(scores, -K)[-K:]

In [7]:
def r2_best_voxels(scores, K, threshold = 0.2):
    scores = np.array(scores)
    r2_selected_voxels = np.where(scores > threshold)[0]
    return np.array(
        sorted( # sort by score, pick first K indices
            list(zip(scores[r2_selected_voxels], r2_selected_voxels)), 
            key = lambda x: x[0]
        )
    )[:K, 1].astype(np.int32).tolist()
        

### Training loop

In [8]:
def voxels_compute_fold_accuracy(predictors, y_pred, y_test, selected_voxels):

    true_positives = []
    for i, pi in enumerate(y_pred):

        pi = pi[selected_voxels]
        best_sample_match = np.argmax([cosim(pi, y_test[j, selected_voxels]) for j in range(y_test.shape[0])])
    
        true_positives.append(int(best_sample_match == i)) # ground truth is aligned with the sample by index, it should match

    return np.mean(true_positives)

In [9]:
# ---- Loading the dataset
epoch = 1
K = 500
VOXELWISE_ACC_THRESHOLD = 0.2

semantic_features = load_sorted_semantic_features()
N_words = len(semantic_features.keys())

In [10]:
subjects = 1
accuracies = np.zeros((subjects, 3))

for subject in range(1, subjects+1):

    print(f"**** Subject {subject} ****")

    # Subject data
    completeFmriData = get_mitchell_original_data(subject=subject)
    fmriData = completeFmriData[epoch]
    n_voxels = fmriData["bell"].shape[0]

    # K-fold cross val
    accuracies_r2 = []
    accuracies_most_stable = []
    accuracies_mitchell = []
    
    for i in tqdm(range(k_folds)):

        # the pair of left out samples rotates
        train_indices = np.array(list(range(samples_per_fold * i)) + list(range((samples_per_fold * (i+1)), n_samples)), dtype=np.int32)
        test_indices = np.array(list(range((samples_per_fold * i), samples_per_fold * (i + 1))), dtype=np.int32)

        X = []
        Y = []
        for word in semantic_features.keys():
            if word in fmriData.keys():
                x = np.array(semantic_features[word]["values"])
                y = np.array(fmriData[word])
                X.append(x)
                Y.append(y)

        X = np.array(X)
        Y = np.array(Y)

        # Train-test split
        X_train, X_test, y_train, y_test = X[train_indices], X[test_indices], Y[train_indices], Y[test_indices]
        
        # Predicting & scoring
        # Lasso is sparsity inducing
        predictors = [make_pipeline(StandardScaler(), LinearRegression()) for i in range(n_voxels)]
        scores = []

        # Fit each voxel predictor
        for j, model in enumerate(predictors):
            model.fit(X_train, y_train[:, j])
            scores.append(model.score(X_test, y_test[:, j]))

        # Voxel selections
        r2_voxels = r2_best_voxels(scores, K=K, threshold=VOXELWISE_ACC_THRESHOLD)
        mitchell_voxels = mitchell_stable_voxels(completeFmriData, train_indices, K=K)
        most_stable_voxels = lowest_std_voxels(completeFmriData, train_indices, K=K)

        # R2 best voxels
        fold_accuracy = voxels_compute_fold_accuracy(predictors, X_test, y_test, r2_voxels)
        accuracies_r2.append(fold_accuracy)

        # Lowest std.dev voxels
        fold_accuracy = voxels_compute_fold_accuracy(predictors, X_test, y_test, most_stable_voxels)
        accuracies_most_stable.append(fold_accuracy)

        # Mitchell stable voxels
        fold_accuracy = voxels_compute_fold_accuracy(predictors, X_test, y_test, mitchell_voxels)
        accuracies_mitchell.append(fold_accuracy)

    # Subjects accuracies
    accuracies[subject-1] = np.array(
        [np.mean(accuracies_r2), np.mean(accuracies_most_stable), np.mean(accuracies_mitchell)]
    )

    with open('accuracies_semantic2fmri.npy', 'wb') as f:
        np.save(f, accuracies)


**** Subject 1 ****


  0%|          | 1/1770 [00:05<2:56:48,  6.00s/it]

[0.5]


  1%|▏         | 25/1770 [01:21<1:32:10,  3.17s/it]

In [None]:
# R2, lowest std, most stable (mitchell)
accuracies[:,0].mean(), accuracies[:,1].mean(), accuracies[:,2].mean()

Note from professor: there is no need to determine the set of best predicted voxels across multiple folds. We just compute the accuracy for each fold and then average.

**Observation**

In this case fitting is way more expensive, as 21k voxels are considered.

In [None]:
from scipy.stats import pearsonr
import matplotlib.pyplot as plt

def best_K_predict(X, indices, predictors):
    predictors = [predictors[idx] for idx in indices]
    y_hat = np.array([predictor.predict(X) for predictor in predictors]) # voxels, sample
    return y_hat.reshape(y_hat.shape[1], y_hat.shape[0]) # sample, voxels

# voxel_indices

y_hat = best_K_predict(X_train, most_stable_voxels, predictors)
y = y_train[:, most_stable_voxels]

RDM_hat = np.matmul(y_hat, np.matrix.transpose(y_hat))

RDM = np.matmul(y, np.matrix.transpose(y))

test_pearson = pearsonr(
    RDM_hat.flatten(),
    RDM.flatten()
)

print(f"Test RDMs R^2:\t{test_pearson}")

plt.subplot(121)
plt.title("Truth")
plt.imshow(RDM)
plt.colorbar()

plt.subplot(122)
plt.title("Prediction")
plt.imshow(RDM_hat)
plt.colorbar()

**Observation**

Here the the voxels from the last cross_val iteration have been selected. For these voxels, the object to object distance matrices have similar patterns.