In [1]:
%matplotlib inline

Single-subject example of SVM classification based on entire brain's voxels for CIMAQ memory encoding task (fMRI data). 




In [2]:
import os
import sys
import glob
import numpy as np
import pandas as pd
import nilearn
import scipy
import nibabel as nb
import sklearn
import seaborn as sns
import itertools

from numpy import nan as NaN
from matplotlib import pyplot as plt
from nilearn import image, plotting
from nilearn import datasets
from nilearn.plotting import plot_stat_map, plot_roi, plot_anat, plot_img, show
from nilearn.input_data import NiftiMasker
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, precision_score, f1_score
from sklearn.model_selection import cross_val_predict, cross_val_score
from sklearn.preprocessing import MinMaxScaler

#libraries need to be installed in conda environment with pip install



Step 1: vectorize beta maps to derive features

Use Nilearn's NiftiMasker to convert beta-images to a vectorized data matrix (vectorize the brain) as input for machine learning.

Masking: a normalized functional data mask (outputted by NIAK) to determine which voxels to include in the data matrix

In [22]:
#subject id
id = '122922'

#Subject's anatomical scan (for display); normalized (non-linear)
#define directory where subject's functional mask and anatomical scan reside
anat_dir = '/Users/mombot/Documents/Simexp/CIMAQ/Data/anat/122922'
#subject's anatomical scan
anat = nb.load(os.path.join(anat_dir, 'anat_sub122922_nuc_stereonl.nii'))

#mask of subject's functional MRI data
mask = nb.load(os.path.join(anat_dir, 'func_sub122922_mask_stereonl.nii'))

#visualizing mask with subjet's anatomical image as background
plot_roi(roi_img=mask, bg_img=anat, cmap='Paired')

#sanity check: the functional mask from the NIAK preprocessing output directory (anat)  
#contains the functional MRI voxels (task epi scans)
tscores = '/Users/mombot/Documents/Simexp/CIMAQ/Data/test/Output/122922/MultiModels/EncMinCTL_tscores_sub122922.nii'
plot_stat_map(stat_map_img=tscores, bg_img=mask, cut_coords=(0, 0, 0), threshold=0.2, colorbar=True)

betas_enc = glob.glob('/Users/mombot/Documents/Simexp/CIMAQ/Data/Nistats/Betas/122922/SingleModel/concat*.nii')[0]

#load 4D image (concatenated beta maps, each is a 3D file) corresponding to trial events, 
#in temporal order.
#betas_all = glob.glob('/Users/mombot/Documents/Simexp/CIMAQ/Data/Nistats/Betas/122922/OneModelPerTrial_A/concat*.nii')[0]

##OR: upload series of 3D images in nilearn using the wild card
#https://nilearn.github.io/modules/generated/nilearn.image.load_img.html#nilearn.image.load_img
betas_all = nilearn.image.load_img(img='/Users/mombot/Documents/Simexp/CIMAQ/Data/Nistats/Betas/122922/OneModelPerTrial_A/betas*nii', wildcards=True)

#use NiftiMasker class to convert images into data matrices for decoding
#create 2D array (numpy) as input for scikit-learn for decoding
masker = NiftiMasker(mask_img=mask, standardize=True)

# give the masker a filename and convert beta maps into a 2D array
#78 rows = encoding trials, 69924 columns = brain voxels, value = beta value
X_encTrials = masker.fit_transform(betas_enc)
#177 rows = all trials, 69924 columns = brain voxels, value = beta value
X_allTrials = masker.fit_transform(betas_all)

print(X_encTrials.shape) #78 rows = trials, 69924 cols = voxels, val = beta weights
print(X_allTrials.shape) #117 rows = trials, 69924 cols = voxels, val = beta weights

#plt.imshow(X_encTrials, aspect='auto')
#plt.colorbar()
#plt.title('encoding feature matrix')
#plt.xlabel('features')
#plt.ylabel('trials')
#plt.show()

(78, 69924)
(117, 69924)


Step 2: import behav labels

In [23]:
#Encoding trial labels (78 labels): miss, wrong source, correct source
label_dir = '/Users/mombot/Documents/Simexp/CIMAQ/Data/test/Output/Events'
label_enc = glob.glob(os.path.join(label_dir, 'sub-*EncTrialTypes.tsv'))[0]
label_all = glob.glob(os.path.join(label_dir, 'sub-*AllTrialTypes.tsv'))[0]

enc_labels = pd.read_csv(label_enc, sep='\t')
y_enc = enc_labels['enctrial_type'] #transform DataFrame into 1D array by extracting column
print(y_enc.head())
enc_labels.enctrial_type.value_counts()


0    correctsource
1      wrongsource
2      wrongsource
3           missed
4      wrongsource
Name: enctrial_type, dtype: object


wrongsource      30
correctsource    25
missed           23
Name: enctrial_type, dtype: int64

In [24]:
all_labels = pd.read_csv(label_all, sep='\t')
y_all = all_labels['trial_type']
print(y_all.head())
all_labels.trial_type.value_counts()

0    Enc
1    CTL
2    Enc
3    Enc
4    CTL
Name: trial_type, dtype: object


Enc    78
CTL    39
Name: trial_type, dtype: int64

Step 3: stratify the data into training and testing sets

See scikit-learn documentation here:
https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html

Define a training and a testing sample
Split the sample to training/test with a 60/40 ratio, stratify trials by condition, and  shuffle the data

In [25]:
#X_encTrials, y_enc 
#X_allTrials, y_all 

X_train, X_test, y_train, y_test = train_test_split(
    #X_encTrials,
    #y_enc,
    X_allTrials, # x
    y_all, # y
    test_size = 0.4, # 60%/40% split
    shuffle = True, # shuffle dataset before splitting
    stratify = y_all, # keep distribution of conditions consistent betw. train & test sets
    random_state = 123) # same shuffle each time

print('training:', len(X_train),
     'testing:', len(X_test))

#fig,(ax1,ax2) = plt.subplots(2)
#sns.countplot(y_train, ax=ax1, order=['Enc','CTL'])
#ax1.set_title('Train')
#sns.countplot(y_test, ax=ax2, order=['Enc','CTL'])
#ax2.set_title('Test')

training: 70 testing: 47


Step 4: train an SVM  model

In [26]:
my_first_svc = SVC(kernel='linear') #define the model
my_first_svc.fit(X_train, y_train) #train the model

# predict the training data based on the model
y_pred = my_first_svc.predict(X_train)

# calculate the model accuracy
acc = my_first_svc.score(X_train, y_train)

# calculate the model precision, recall and f1 in one report
cr = classification_report(y_true=y_train,
                      y_pred = y_pred)

# get a table to help us break down these scores
cm = confusion_matrix(y_true=y_train, y_pred = y_pred)

# print results
print('accuracy:', acc)
print(cr)
print(cm)

# plot confusion matrix (training data)
cmdf = pd.DataFrame(cm, index = ['Control','Encoding'], columns = ['Control','Encoding'])
sns.heatmap(cmdf, cmap = 'RdBu_r')
plt.xlabel('Predicted')
plt.ylabel('Observed')
# label cells in matrix
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j+0.5, i+0.5, format(cm[i, j], 'd'),
                 horizontalalignment="center",
                 color="white")

accuracy: 1.0
              precision    recall  f1-score   support

         CTL       1.00      1.00      1.00        23
         Enc       1.00      1.00      1.00        47

    accuracy                           1.00        70
   macro avg       1.00      1.00      1.00        70
weighted avg       1.00      1.00      1.00        70

[[23  0]
 [ 0 47]]


  ax.set(xticks=xticks, yticks=yticks)


In [27]:
#set up cross-validation to evaluate model performance
#within 10 folds of training set

# predict
y_pred = cross_val_predict(my_first_svc, X_train, y_train,
                           groups=y_train, cv=10)
# scores
acc = cross_val_score(my_first_svc, X_train, y_train,
                     groups=y_train, cv=10)

#Look at accuracy of prediction for each fold of the cross-validation
for i in range(10):
    print('Fold %s -- Acc = %s'%(i, acc[i]))

Fold 0 -- Acc = 1.0
Fold 1 -- Acc = 0.875
Fold 2 -- Acc = 0.875
Fold 3 -- Acc = 1.0
Fold 4 -- Acc = 1.0
Fold 5 -- Acc = 1.0
Fold 6 -- Acc = 1.0
Fold 7 -- Acc = 0.8333333333333334
Fold 8 -- Acc = 0.8333333333333334
Fold 9 -- Acc = 0.8333333333333334


In [28]:
#look at the overall accuracy of the model

overall_acc = accuracy_score(y_pred = y_pred, y_true = y_train)
overall_cr = classification_report(y_pred = y_pred, y_true = y_train)
overall_cm = confusion_matrix(y_pred = y_pred, y_true = y_train)
print('Accuracy:',overall_acc)
print(overall_cr)

thresh = overall_cm.max() / 2
cmdf = pd.DataFrame(overall_cm, index = ['CTL','Enc'], columns = ['CTL','Enc'])
sns.heatmap(cmdf, cmap='copper')
plt.xlabel('Predicted')
plt.ylabel('Observed')
for i, j in itertools.product(range(overall_cm.shape[0]), range(overall_cm.shape[1])):
        plt.text(j+0.5, i+0.5, format(overall_cm[i, j], 'd'),
                 horizontalalignment="center",
                 color="white")

Accuracy: 0.9285714285714286
              precision    recall  f1-score   support

         CTL       0.91      0.87      0.89        23
         Enc       0.94      0.96      0.95        47

    accuracy                           0.93        70
   macro avg       0.92      0.91      0.92        70
weighted avg       0.93      0.93      0.93        70



  ax.set(xticks=xticks, yticks=yticks)


In [29]:
#Scale the training data
scaler = MinMaxScaler().fit(X_train)
X_train_scl = scaler.transform(X_train)

plt.imshow(X_train, aspect='auto')
plt.colorbar()
plt.title('Training Data')
plt.xlabel('features')
plt.ylabel('subjects')

Text(522.9424, 0.5, 'subjects')

In [30]:
plt.imshow(X_train_scl, aspect='auto')
plt.colorbar()
plt.title('Scaled Training Data')
plt.xlabel('features')
plt.ylabel('subjects')

Text(521.8211200000001, 0.5, 'subjects')

In [13]:
#Repeat steps with scaled data

# predict
y_pred = cross_val_predict(my_first_svc, X_train_scl, y_train,
                           groups=y_train, cv=10)

# get scores
overall_acc = accuracy_score(y_pred = y_pred, y_true = y_train)
overall_cr = classification_report(y_pred = y_pred, y_true = y_train)
overall_cm = confusion_matrix(y_pred = y_pred, y_true = y_train)
print('Accuracy:',overall_acc)
print(overall_cr)

# plot
thresh = overall_cm.max() / 2
cmdf = pd.DataFrame(overall_cm, index = ['Control','Encoding'], columns = ['Control','Encoding'])
sns.heatmap(cmdf, cmap='copper')
plt.xlabel('Predicted')
plt.ylabel('Observed')
for i, j in itertools.product(range(overall_cm.shape[0]), range(overall_cm.shape[1])):
        plt.text(j+0.5, i+0.5, format(overall_cm[i, j], 'd'),
                 horizontalalignment="center",
                 color="white")

Accuracy: 0.7428571428571429
              precision    recall  f1-score   support

         CTL       0.63      0.52      0.57        23
         Enc       0.78      0.85      0.82        47

    accuracy                           0.74        70
   macro avg       0.71      0.69      0.69        70
weighted avg       0.73      0.74      0.74        70



  ax.set(xticks=xticks, yticks=yticks)


In [31]:
# Test model on unseen data from the test set
# Use the Scaler that was fit to X_train and apply to X_test,
# rather than creating a new Scaler for X_test
X_test_scl = scaler.transform(X_test)

my_first_svc.fit(X_train_scl, y_train) # fit to training data
y_pred = my_first_svc.predict(X_test_scl) # classify age class using testing data
acc = my_first_svc.score(X_test_scl, y_test) # get accuracy
cr = classification_report(y_pred=y_pred, y_true=y_test) # get prec., recall & f1
cm = confusion_matrix(y_pred=y_pred, y_true=y_test) # get confusion matrix

# print results
print('accuracy =', acc)
print(cr)

# plot results
thresh = cm.max() / 2
cmdf = pd.DataFrame(cm, index = ['Control','Encoding'], columns = ['Control','Encoding'])
sns.heatmap(cmdf, cmap='RdBu_r')
plt.xlabel('Predicted')
plt.ylabel('Observed')
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j+0.5, i+0.5, format(cm[i, j], 'd'),
                 horizontalalignment="center",
                 color="white")

accuracy = 0.9361702127659575
              precision    recall  f1-score   support

         CTL       0.93      0.88      0.90        16
         Enc       0.94      0.97      0.95        31

    accuracy                           0.94        47
   macro avg       0.94      0.92      0.93        47
weighted avg       0.94      0.94      0.94        47



  ax.set(xticks=xticks, yticks=yticks)
