In [331]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import scipy.io as sio
import math

In [332]:
data_dir_root = os.path.join('./data', 'ThingsEEG-Text')
sbj = 'sub-10'
image_model = 'pytorch/cornet_s'
text_model = 'CLIPText'
roi = '17channels'
brain_dir = os.path.join(data_dir_root, 'brain_feature', roi, sbj)
image_dir_seen = os.path.join(data_dir_root, 'visual_feature/ThingsTrain', image_model, sbj)
image_dir_unseen = os.path.join(data_dir_root, 'visual_feature/ThingsTest', image_model, sbj)
text_dir_seen = os.path.join(data_dir_root, 'textual_feature/ThingsTrain/text', text_model, sbj)
text_dir_unseen = os.path.join(data_dir_root, 'textual_feature/ThingsTest/text', text_model, sbj)

# TODO: this is already doing some feature scaling it seems
brain_seen = sio.loadmat(os.path.join(brain_dir, 'eeg_train_data_within.mat'))['data'].astype('double') * 2.0
brain_seen = brain_seen[:,:,27:60] # 70ms-400ms
brain_seen = np.reshape(brain_seen, (brain_seen.shape[0], -1))
image_seen = sio.loadmat(os.path.join(image_dir_seen, 'feat_pca_train.mat'))['data'].astype('double')*50.0
text_seen = sio.loadmat(os.path.join(text_dir_seen, 'text_feat_train.mat'))['data'].astype('double')*2.0
label_seen = sio.loadmat(os.path.join(brain_dir, 'eeg_train_data_within.mat'))['class_idx'].T.astype('int')
image_seen = image_seen[:,0:100]

brain_unseen = sio.loadmat(os.path.join(brain_dir, 'eeg_test_data.mat'))['data'].astype('double')*2.0
brain_unseen = brain_unseen[:, :, 27:60]
brain_unseen = np.reshape(brain_unseen, (brain_unseen.shape[0], -1))
image_unseen = sio.loadmat(os.path.join(image_dir_unseen, 'feat_pca_test.mat'))['data'].astype('double')*50.0
text_unseen = sio.loadmat(os.path.join(text_dir_unseen, 'text_feat_test.mat'))['data'].astype('double')*2.0
label_unseen = sio.loadmat(os.path.join(brain_dir, 'eeg_test_data.mat'))['class_idx'].T.astype('int')
image_unseen = image_unseen[:, 0:100]

In [333]:
# TODO: proper credit
# https://medium.com/@koushikkushal95/logistic-regression-from-scratch-dfb8527a4226
# https://stackoverflow.com/questions/67513075/what-is-c-parameter-in-sklearn-logistic-regression
class CustomLogisticRegression:
    def __init__(self, learning_rate : float = 0.1, threshold : float = 0.5, regularization_strength : float = 1.0):
        self.epsilon = 1e-9
        self.learning_rate = learning_rate
        self.threshold = threshold
        self.weights = None
        self.bias = 0.0
        self.regularization_strength = 1.0 / regularization_strength if regularization_strength != 0.0 else 0.0
    
    def predict_probability(self, x : np.ndarray) -> np.ndarray:
        x_dot_weights = np.dot(x, self.weights) + self.bias
        probabilities = self._sigmoid(x_dot_weights)

        return probabilities

    def predict(self, x : np.ndarray) -> np.ndarray:
        prediction = self.predict_probability(x)
        return np.where(prediction > self.threshold, 1, 0)

    # Returns losses and log likelihoods
    def fit(self, x_train, y_train, epochs=100) -> tuple[np.ndarray, np.ndarray]:
        # Weights on each feature
        self.weights = np.zeros(x_train.shape[1])
        self.bias = 0.0
        
        losses = np.zeros((epochs,))
        log_likelihood = np.zeros((epochs,))

        # Gradient Descent
        for i in range(epochs):
            prediction = self.predict_probability(x_train)
            losses[i] = self._cross_entropy_loss(y_train, prediction)
            log_likelihood[i] = self._log_likelihood(x_train, y_train)
            error_weights, error_bias = self._compute_gradients(x_train, y_train, prediction)
            self._update_parameters(error_weights, error_bias)

        return losses, log_likelihood

    @staticmethod
    def _sigmoid(x : np.ndarray) -> np.ndarray:
        return 1.0 / (1.0 + np.exp(-x))
    
    def _cross_entropy_loss(self, y_true : np.ndarray, y_pred : np.ndarray) -> float:
        # Binary cross entropy loss
        y_zero_loss = y_true * np.log(y_pred + self.epsilon)
        y_one_loss = (1.0 - y_true) * np.log(1.0 - y_pred + self.epsilon)

        return -np.mean(y_zero_loss + y_one_loss)

    def _log_likelihood(self, x : np.ndarray, y : np.ndarray) -> float:
        penalty = self.regularization_strength * np.sum(self.weights ** 2.0)
        scores = np.dot(x, scores)

        return np.sum((y - 1.0) * scores - np.log(1.0 + np.exp(-scores))) - penalty

    def _compute_gradients(self, x : np.ndarray, y_true : np.ndarray, y_pred : np.ndarray) -> tuple[np.ndarray, float]:
        diff = y_pred - y_true
        penalty = 2.0 * self.regularization_strength * self.weights

        gradients_weights = (1.0 / x.shape[0]) * (np.dot(x.T, diff)) - penalty
        gradient_bias = (1.0 / x.shape[0]) * np.sum(diff)

        return gradients_weights, gradient_bias

    def _update_parameters(self, error_weights : np.ndarray, error_bias : float) -> None:
        self.weights -= self.learning_rate * error_weights
        self.bias -= self.learning_rate * error_bias

In [383]:
# OvR and OvO classifiers

# Process the data such that one class is highlighted in each instance of the data
#   get the resulting output vector, pick maximum probability
class OneVsRest:
    def __init__(self, model_constructor, num_classes : int, use_probability : bool = True):
        self.models = [model_constructor() for _ in range(num_classes)]
        self.num_classes = num_classes
        self.use_probability = use_probability

    def fit(self, x_train : np.ndarray, y_train : np.ndarray, epochs : int = 100) -> tuple[np.ndarray, np.ndarray]:
        total_loss = np.zeros((len(self.models), epochs))
        total_log_likelihood = np.zeros((len(self.models), epochs))

        for class_number, model in enumerate(self.models):
            # Get the binary version of the data
            #   label the current class as positive, and all other classes
            #   as negative
            y_ovr = np.where(y_train == class_number, 1.0, 0.0)
            loss, log_likelihood = model.fit(x_train, y_ovr, epochs=epochs)

            total_loss[class_number, :] += loss
            total_log_likelihood[class_number, :] += log_likelihood

        return total_loss.mean(axis=0), total_log_likelihood.mean(axis=0)

    def predict(self, x : np.ndarray) -> np.ndarray:
        # Get a prediction from every model
        #   each prediction is a probability from 0-1 for each instance
        #       (or just the predicted class if we're not using probability)
        #   arrange these into a matrix
        #   get the argmax from every column of the matrix
        #   consider that to be the class
        matrix = np.zeros((x.shape[0], self.num_classes))

        for class_number, model in enumerate(self.models):
            votes = model.predict_probability(x) if self.use_probability else model.predict(x)
            matrix[:, class_number] = votes

        predicted_classes = np.argmax(matrix, axis=1)
        
        return predicted_classes

class OneVsOne:
    def __init__(self, model_constructor, num_classes : int, use_probability : bool = True):
        self.num_classes = num_classes
        self.models = [model_constructor() for _ in range((num_classes * (num_classes - 1)) // 2)]
        self.use_probability = use_probability

    def fit(self, x_train : np.ndarray, y_train : np.ndarray, epochs : int = 100) -> tuple[np.ndarray, np.ndarray]:
        idx = 0

        total_loss = np.zeros((len(self.models), epochs))
        total_log_likelihood = np.zeros((len(self.models), epochs))

        for class_pos in range(self.num_classes):
            for class_neg in range(class_pos + 1, self.num_classes):
                model = self.models[idx]

                # Filter out classes not in one of the classes
                class_mask = ((y_train == class_pos) | (y_train == class_neg))

                x_ovo = x_train[class_mask]
                # Mark positive class as 1, negative class as 0
                y_ovo = np.where(y_train[class_mask] == class_pos, 1.0, 0.0)

                # Train
                loss, log_likelihood = model.fit(x_ovo, y_ovo, epochs=epochs)

                total_loss[idx, :] += loss
                total_log_likelihood[idx, :] += log_likelihood

                idx += 1

        return total_loss.mean(axis=0), total_log_likelihood.mean(axis=0)

    def predict(self, x : np.ndarray) -> np.ndarray:
        # Each model votes on one class
        #   class with most votes is selected
        #   note very similar to OvR,
        #       but each model votes on only the positive/negative instances
        #       instead of all instances
        idx = 0

        votes_matrix = np.zeros((x.shape[0], self.num_classes))

        for class_pos in range(self.num_classes):
            for class_neg in range(class_pos + 1, self.num_classes):
                model = self.models[idx]

                # On each instance, determine if it was positive/negative
                #   vote for that class
                votes = model.predict_probability(x) if self.use_probability else model.predict(x)

                votes_matrix[:, class_pos] += votes
                votes_matrix[:, class_neg] += 1.0 - votes

                idx += 1

        return np.argmax(votes_matrix, axis=1)

In [384]:
# Various train/test splits
SAMPLES_PER_CLASS = 10

# Only sampling from seen data
def train_test_split(num_classes : int = 50, split_percentage : float = 0.7):
    split_index = math.ceil(SAMPLES_PER_CLASS * split_percentage)

    # All instances where the seen class is within the classes we want
    seen_classes_filter = np.squeeze(label_seen <= num_classes)
    # unseen_classes_filter = np.squeeze(label_unseen <= num_classes)

    # Filter modalities to get only instances with the allowed classes
    brain_seen_reduced = brain_seen[seen_classes_filter]
    image_seen_reduced = image_seen[seen_classes_filter]
    text_seen_reduced = text_seen[seen_classes_filter]
    label_seen_reduced = label_seen[seen_classes_filter]

    # brain_unseen_reduced = brain_unseen[unseen_classes_filter]
    # image_unseen_reduced = image_unseen[unseen_classes_filter]
    # text_unseen_reduced = text_unseen[unseen_classes_filter]
    # label_unseen_reduced = label_unseen[unseen_classes_filter]

    brain_features = brain_seen.shape[1]
    image_features = image_seen.shape[1]
    text_features = text_seen.shape[1]

    # Per class number of training/test examples
    num_training = split_index
    num_test = SAMPLES_PER_CLASS - split_index

    def split_data(seen_instances : np.ndarray, num_features : int) -> tuple[np.ndarray, np.ndarray]:
        train_out = np.zeros((num_training * num_classes, num_features))
        test_out = np.zeros((num_test * num_classes, num_features))

        for i in range(num_classes):
            curr = seen_instances[i * SAMPLES_PER_CLASS:(i + 1) * SAMPLES_PER_CLASS, :]
            train_data = curr[:split_index, :]
            test_data = curr[split_index:, :]

            train_out[i * num_training:(i + 1) * num_training, :] = train_data
            test_out[i * num_test:(i + 1) * num_test, :] = test_data

        return train_out, test_out

    brain_train, brain_test = split_data(brain_seen_reduced, brain_features)
    image_train, image_test = split_data(image_seen_reduced, image_features)
    text_train, text_test = split_data(text_seen_reduced, text_features)
    label_train, label_test = split_data(label_seen_reduced, 1)

    # 0-indexing is more convenient for OvR/OvO
    #   also ensure label dimensions are 1d array
    label_train = label_train.squeeze() - 1
    label_test = label_test.squeeze() - 1

    return pd.DataFrame({
        "Modality": ["Brain", "Image", "Text", "Label"],
        "Train": [brain_train, image_train, text_train, label_train],
        "Test": [brain_test, image_test, text_test, label_test]
    })

def get_data_from_train_test(dataframe : pd.DataFrame, modality : str, type : str):
    modality_index = dataframe.index[dataframe["Modality"]==modality]
    return dataframe.loc[modality_index, type].item()

In [380]:
# Feature scaling
def normal_scale_features(x : np.ndarray) -> np.ndarray:
    out = np.zeros(x.shape)

    for feature in range(x.shape[1]):
        feature_vector = x[:, feature]

        std = feature_vector.std()
        mean = feature_vector.mean()

        out[:, feature] = (feature_vector - mean) / std

    return out

In [337]:
# Filter instances with too many outliers in them
# Returns truth series, where False represents an outlier
#   any row contains more than 10% outliers, discard it, represents a bad instance
def outlier_mask(x : np.ndarray, tolerance : float = 0.1) -> np.ndarray:
    out = np.zeros(x.shape)

    for feature in range(x.shape[1]):
        feature_vector = x[:, feature]
        # Calculate IQR
        lb = np.quantile(feature_vector.squeeze(), 0.25).item()
        ub = np.quantile(feature_vector.squeeze(), 0.75).item()
        iqr = ub - lb
        whisker_length = 1.5  # in reference to the histogram whisker length

        out[:, feature] = ~((feature_vector < (lb - whisker_length * iqr)) | (feature_vector > (ub + whisker_length * iqr)))

    instance_filter = np.zeros(x.shape[0])

    for instance in range(x.shape[0]):
        instance_filter[instance] = out[instance, :].mean() > (1.0 - tolerance)

    return instance_filter.astype(np.bool)
# TODO: strip noisy features?

In [357]:
# Test all combinations of modalities
BRAIN_BIT = 1
TEXT_BIT = 2
IMAGE_BIT = 3

all_modalities = pd.DataFrame({
    "Combination": ["Brain", "Text", "Image", "BrainText", "BrainImage", "TextImage", "All"],
    "Bitmap": [BRAIN_BIT, TEXT_BIT, IMAGE_BIT, BRAIN_BIT | TEXT_BIT, BRAIN_BIT | IMAGE_BIT, TEXT_BIT | IMAGE_BIT, BRAIN_BIT | TEXT_BIT | IMAGE_BIT]
})

def get_combined_data(brain_data : np.ndarray, text_data : np.ndarray, image_data : np.ndarray, bitmap : int) -> np.ndarray:
    modalities_data = []

    if bitmap & BRAIN_BIT:
        modalities_data.append(brain_data)
    if bitmap & TEXT_BIT:
        modalities_data.append(text_data)
    if bitmap & IMAGE_BIT:
        modalities_data.append(image_data)

    return np.hstack(tuple(modalities_data))

In [386]:
# Create some baseline parameters for future testing of other features
from sklearn.linear_model import LogisticRegression  # Baseline sklearn model

# Create a baseline train/test split, and extract training dataset and testing dataset
BASELINE_NUM_CLASSES = 50
BASELINE_TRAIN_TEST_SPLIT = 0.7

BASELINE_SPLIT_DATA = train_test_split(BASELINE_NUM_CLASSES, BASELINE_TRAIN_TEST_SPLIT)

baseline_data_train = get_data_from_train_test(BASELINE_SPLIT_DATA, "Brain", "Train")
baseline_data_test = get_data_from_train_test(BASELINE_SPLIT_DATA, "Brain", "Test")
BASELINE_LABEL_TRAIN = get_data_from_train_test(BASELINE_SPLIT_DATA, "Label", "Train")
BASELINE_LABEL_TEST = get_data_from_train_test(BASELINE_SPLIT_DATA, "Label", "Test")

# Perform standard scaling on data
BASELINE_DATA_TRAIN = normal_scale_features(baseline_data_train)
BASELINE_DATA_TEST = normal_scale_features(baseline_data_test)

# Standard hyperparameters
BASELINE_LEARNING_RATE = 0.1
BASELINE_THRESHOLD = 0.5
BASELINE_REGULARIZATION = 1.0
BASELINE_CUSTOM_MODEL = lambda: CustomLogisticRegression(BASELINE_LEARNING_RATE, BASELINE_THRESHOLD, BASELINE_REGULARIZATION)
BASELINE_USE_PROBABILITY = True
BASELINE_MUTLICLASS = OneVsRest(BASELINE_CUSTOM_MODEL, BASELINE_NUM_CLASSES, BASELINE_USE_PROBABILITY)
BASELINE_SKLEARN_MODEL = LogisticRegression(penalty="l2", multi_class="ovr", C=1.0, max_iter=100, random_state=0)

In [None]:
# Test 1.
#   vary train test splits

In [389]:
# TODO: to remove
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# brain_train_outliers = outlier_mask(brain_train)
# brain_test_outliers = outlier_mask(brain_test)

# label_train_outliers = label_train[brain_train_outliers]
# label_test_outliers = label_test[brain_test_outliers]

# print(label_train.shape)
# print(label_test.shape)
# print(label_train_outliers.shape)
# print(label_test_outliers.shape)

# brain_train_scaled = normal_scale_features(brain_train[brain_train_outliers])
# brain_test_scaled = normal_scale_features(brain_test[brain_test_outliers])

all_train = get_combined_data(
    get_data_from_train_test(BASELINE_SPLIT_DATA, "Brain", "Train"),
    get_data_from_train_test(BASELINE_SPLIT_DATA, "Text", "Train"),
    get_data_from_train_test(BASELINE_SPLIT_DATA, "Image", "Train"),
    BRAIN_BIT | TEXT_BIT | IMAGE_BIT
)
all_test = get_combined_data(
    get_data_from_train_test(BASELINE_SPLIT_DATA, "Brain", "Test"),
    get_data_from_train_test(BASELINE_SPLIT_DATA, "Text", "Test"),
    get_data_from_train_test(BASELINE_SPLIT_DATA, "Image", "Test"),
    BRAIN_BIT | TEXT_BIT | IMAGE_BIT
)

all_train_scaled = normal_scale_features(all_train)
all_test_scaled = normal_scale_features(all_test)

train_features = BASELINE_DATA_TRAIN
test_features = BASELINE_DATA_TEST
train_labels = BASELINE_LABEL_TRAIN
test_labels = BASELINE_LABEL_TEST

# Note threshold parameter doesn't affect OvR since OvR uses probabilities
logistic_model_constructor = lambda: CustomLogisticRegression(0.01, 0.5, 1.0)
ovr = OneVsRest(logistic_model_constructor, BASELINE_NUM_CLASSES)
losses_ovr, log_likelihood_ovr = ovr.fit(train_features, train_labels, 100)
ovr_pred = ovr.predict(test_features)

ovo = OneVsOne(logistic_model_constructor, BASELINE_NUM_CLASSES)
ovo.fit(train_features, train_labels, 100)
ovo_pred = ovo.predict(test_features)

model = LogisticRegression(multi_class="ovr", max_iter=100, random_state=0, penalty="l2", C=1.0)
model.fit(train_features, train_labels)
sk_pred = model.predict(test_features)

def get_stats(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    diverse_stats = precision_recall_fscore_support(y_true, y_pred, average="weighted", zero_division=0.0)

    return accuracy, diverse_stats[0], diverse_stats[1], diverse_stats[2]

ovo_stats = get_stats(test_labels, ovo_pred)
ovr_stats = get_stats(test_labels, ovr_pred)
base_stats = get_stats(test_labels, sk_pred)

df = pd.DataFrame({
    "Classifier": ["OvO", "OvR", "Baseline"],
    "Accuracy": [ovo_stats[0], ovr_stats[0], base_stats[0]],
    "Precision": [ovo_stats[1], ovr_stats[1], base_stats[1]],
    "Recall": [ovo_stats[2], ovr_stats[2], base_stats[2]],
    "F1-Score": [ovo_stats[3], ovr_stats[3], base_stats[3]]
})

losses_df = pd.DataFrame({
    "Epoch": list(range(100)),
    "Loss": losses_ovr,
    "LogLikelihood": log_likelihood_ovr
})

long_df = df.melt("Classifier", var_name="ScoreType", value_name="Score")

sns.barplot(long_df, x="Classifier", y="Score", hue="ScoreType")
sns.lineplot(losses_df, x="Epoch", y="Loss")
sns.lineplot(losses_df, x="Epoch", y="LogLikelihood")

# TODO: grid search hyperparameters (likely little effect, but can test learning rate, convergence, and regularization)
# TODO: compare between only brain/text/image, and the other combinations of modalities
# TODO: we have very few samples per class compared to number of classes, making this a good candidate for
#   a few-shot learning paradigm
# TODO: why did we choose logistic regression
# TODO: training/validation procedure
#   why OvR better than OvO?
#   test various test/train splits and their impact on performance
#   
# TODO: plot loss/log likelihood throughout training
# TODO: plot final test performance across multiple statistics
# TODO: improve the model (using multiple modalities, using few-shot learning paradigm, using feature scaling, using regularization, using SGD - most likely)
# TODO: clear explanation of accuracy, precision, recall, f1-score (likely already have, maybe in context of comparison between baseline and custom)
# TODO: use confusion matrices and precision-recall curves (can we even do this for multiclass?)
# TODO: the problem identified in 2.1 is the small number of samples per class, we fixed this with the few-shot learning paradigm

# TODO: few-shot learning uses a similarity function, that learns from the small set of training data
#   this is basically just the same as logistic regression??! or any learning model??!
#   one-way to implement FSL is data-level, incorporate more features through some method (we can do this by combining modalities)
#   regularization is one way to fix FSL, limiting parameter values to prevent overfitting
#   metric-level FSL?
#   https://www.digitalocean.com/community/tutorials/few-shot-learning

AttributeError: module 'numpy' has no attribute 'zeroes'