In [1]:
# Imports
import numpy
import matplotlib.pyplot as plt
import scipy.io
from scipy.signal import butter, lfilter
from sklearn.preprocessing import normalize
from sklearn.decomposition import PCA
from scipy.fftpack import fft, ifft

In [2]:
# Declarations

# Paths
SUBJECT_A_TRAIN_PATH = '../dataset/Matlab Format/Subject_A_Train.mat'
SUBJECT_A_TEST_PATH = '../dataset/Matlab Format/Subject_A_Test.mat'
SUBJECT_B_TRAIN_PATH = '../dataset/Matlab Format/Subject_B_Train.mat'
SUBJECT_B_TEST_PATH = '../dataset/Matlab Format/Subject_A_Test.mat'

# Input Constants
EPOCHS = 85
CHANNELS = 64
EXTRACTED_CHANNEL = 10 #Cz
EXTRACTED_CHANNELS = numpy.array([33, 10, 48, 50, 52, 55, 59, 61]) # [33, 10, 48, 50, 52, 55, 59, 61], [3,9, 10, 11, 17]
INTENSIFICATIONS = 12
START_WINDOW = 60
END_WINDOW = 144

SUBJECT_A_TEST_RESULTS = 'WQXPLZCOMRKO97YFZDEZ1DPI9NNVGRQDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VHNEHBTXOO1TDOILUEE5BFAEEXAW_K4R3MRU'
SUBJECT_B_TEST_RESULTS = 'MERMIROOMUHJPXJOHUVLEORZP3GLOO7AUFDKEFTWEOOALZOP9ROCGZET1Y19EWX65QUYU7NAK_4YCJDVDNGQXODBEV2B5EFDIDNR'

# 6x6 Grid That Is Displayed To The User
MATRIX = numpy.array([['A', 'B', 'C', 'D', 'E', 'F'], \
                      ['G', 'H', 'I', 'J', 'K', 'L'], \
                      ['M', 'N', 'O', 'P', 'Q', 'R'], \
                      ['S', 'T', 'U', 'V', 'W', 'X'], \
                      ['Y', 'Z', '1', '2', '3', '4'], \
                      ['5', '6', '7', '8', '9', '_']])

In [3]:
# Loading Data

Subject_A_Train = scipy.io.loadmat(SUBJECT_A_TRAIN_PATH)
Subject_A_Test = scipy.io.loadmat(SUBJECT_A_TEST_PATH)
Subject_B_Train = scipy.io.loadmat(SUBJECT_B_TRAIN_PATH)
Subject_B_Test = scipy.io.loadmat(SUBJECT_B_TEST_PATH)

In [4]:
# Detection

# A
# Train
Subject_A_Signal_Train = numpy.array(Subject_A_Train.get('Signal'));
Subject_A_TargetChar_Train = numpy.array(Subject_A_Train.get('TargetChar'))[0];
Subject_A_StimulusCode_Train = numpy.array(Subject_A_Train.get('StimulusCode'));
print('A Train')
print('Subject A Signal Train:', Subject_A_Signal_Train.shape)
print('Subject A TargetChar Train:', len(Subject_A_TargetChar_Train))
print('Subject A StimulusCode Train:', Subject_A_StimulusCode_Train.shape)
# Test
Subject_A_Signal_Test = numpy.array(Subject_A_Test.get('Signal'));
Subject_A_StimulusCode_Test = numpy.array(Subject_A_Test.get('StimulusCode'));
print('\nA Test')
print('Subject A Signal Test:', Subject_A_Signal_Test.shape)
print('Subject A TargetChar Test:', len(SUBJECT_A_TEST_RESULTS))
print('Subject A StimulusCode Test:', Subject_A_StimulusCode_Test.shape)

A Train
Subject A Signal Train: (85, 7794, 64)
Subject A TargetChar Train: 85
Subject A StimulusCode Train: (85, 7794)

A Test
Subject A Signal Test: (100, 7794, 64)
Subject A TargetChar Test: 100
Subject A StimulusCode Test: (100, 7794)


In [5]:
# Averaging

# Function To Get Average Of All Signals (And Channels) Within 1 Run
def calculate_runs_average(
        signals,
        stimulus_code,
        matrix,
        moving_average_filter = 10,
        repeatitions = 15,
        start_window = 0,
        end_window = 240
    ):
    
    window = end_window - start_window
    responses = numpy.zeros((signals.shape[0], matrix.shape[0] + matrix.shape[1], window, signals.shape[2]))
    
    for epoch in range(signals.shape[0]):
        
        # Sum of All Repeatitions -> Division of Sum -> Average
        for n in range(1, signals.shape[1]):
            if stimulus_code[epoch, n] == 0 and stimulus_code[epoch, n - 1] != 0:
                responses[epoch, int(stimulus_code[epoch, n - 1]) - 1] += signals[epoch, n + start_window - 24 : n + end_window - 24]
        responses[epoch] = responses[epoch] / repeatitions
        
        # Calculation of Common Average Reference
        for intensification in range(responses.shape[1]):
            for sample in range(responses.shape[2]):
                average = numpy.sum(responses[epoch, intensification, sample]) / responses.shape[3]
                responses[epoch, intensification, sample] = responses[epoch, intensification, sample] - average
        
        # Calculation of Z-Score
        for intensification in range(responses.shape[1]):
            for channel in range(responses.shape[3]):
                average = numpy.sum(responses[epoch, intensification, :, channel]) / responses.shape[2]
                std = numpy.std(responses[epoch, intensification, :, channel])
                responses[epoch, intensification, :, channel] = (responses[epoch, intensification, :, channel] - average) - std
    
    return responses

# A
# Train
Subject_A_Average_Signal_Train = calculate_runs_average(
    Subject_A_Signal_Train,
    Subject_A_StimulusCode_Train,
    MATRIX,
    start_window=START_WINDOW,
    end_window=END_WINDOW
)
print('Subject A Average Signal Train:', Subject_A_Average_Signal_Train.shape)
# Test
Subject_A_Average_Signal_Test = calculate_runs_average(
    Subject_A_Signal_Test,
    Subject_A_StimulusCode_Test,
    MATRIX,
    start_window=START_WINDOW,
    end_window=END_WINDOW
)
print('Subject A Average Signal Test:', Subject_A_Average_Signal_Test.shape)

Subject A Average Signal Train: (85, 12, 84, 64)
Subject A Average Signal Test: (100, 12, 84, 64)


In [6]:
# Function To Flatten The  Channels
def flatten_channels(
        average_signals,
        extracted_channels
    ):
    
    flattened_average_signals = numpy.zeros((average_signals.shape[0], average_signals.shape[1], average_signals.shape[2] * extracted_channels.shape[0]))
    
    for epoch in range(average_signals.shape[0]):
        
        for intensification in range(average_signals.shape[1]):
            
            for channel_index in range(extracted_channels.shape[0]):
                flattened_average_signals[epoch, intensification, (average_signals.shape[2] * channel_index):(average_signals.shape[2] * channel_index) + average_signals.shape[2]] = \
                    average_signals[epoch, intensification, :, extracted_channels[channel_index]]
    
    return flattened_average_signals

In [30]:
# Function To Calculate Weights Within 1 Session Using FLD
def calculate_weights_FLD(
        average_signals,
        target_char,
        matrix,
        extracted_channels,
        repeatitions = 15,
        start_window = 0,
        end_window = 240
    ):
    
    window = end_window - start_window
    intensifications = matrix.shape[0] + matrix.shape[1]
    
    preprocessed_classes = numpy.zeros((average_signals.shape[0], intensifications))
    
    # Looping Through Characters (85 Character)
    for epoch in range(average_signals.shape[0]):
        
        # Getting Index Of Chosen Character
        indices = numpy.where(matrix == target_char[epoch])
        chosen_column = indices[1][0]
        chosen_row = indices[0][0] + matrix.shape[1]
        
        for row_column in range(intensifications):
            if row_column == chosen_row or row_column == chosen_column:
                preprocessed_classes[epoch, row_column] = 1
            else:
                preprocessed_classes[epoch, row_column] = -1
    
    preprocessed_classes = numpy.reshape(preprocessed_classes, (average_signals.shape[0] * intensifications))
    
    flattened_signals = flatten_channels(average_signals, extracted_channels)
    flattened_signals = numpy.reshape(flattened_signals, (flattened_signals.shape[0] * flattened_signals.shape[1], flattened_signals.shape[2]))
    
    preprocessed_signals = numpy.ones((flattened_signals.shape[0], flattened_signals.shape[1] + 1))
    for i in range(flattened_signals.shape[0]):
        preprocessed_signals[i] = numpy.pad(flattened_signals[i], (0,1), 'constant', constant_values=(0,1))
    
    return numpy.matmul(
            numpy.matmul(
                numpy.linalg.inv(numpy.matmul(
                    preprocessed_signals.T, # XT
                    preprocessed_signals # X
                )), # (XT * X) ^ -1
                preprocessed_signals.T # XT
            ), # ((XT * X) ^ -1) * XT
        preprocessed_classes) # (((XT * X) ^ -1) * XT) * Y

# A
# Train
Subject_A_Weight_FLD_Train = calculate_weights_FLD(
    Subject_A_Average_Signal_Train,
    Subject_A_TargetChar_Train,
    MATRIX,
    EXTRACTED_CHANNELS,
    start_window=START_WINDOW,
    end_window=END_WINDOW
)
# Test
Subject_A_Weight_FLD_Test = calculate_weights_FLD(
    Subject_A_Average_Signal_Test,
    SUBJECT_A_TEST_RESULTS,
    MATRIX,
    EXTRACTED_CHANNELS,
    start_window=START_WINDOW,
    end_window=END_WINDOW
)

In [31]:
# Function To Predict Character
def predict_character(
        preprocessed_responses,
        weights,
        matrix
    ):
    
    intensifications = matrix.shape[0] + matrix.shape[1]

    predicted_classes = numpy.zeros((intensifications))
    for i in range(intensifications):
        predicted_classes[i] = numpy.matmul(numpy.transpose(weights), preprocessed_responses[i])
    
    column = numpy.where(predicted_classes == numpy.amax(predicted_classes[:6]))[0][0]
    row = numpy.where(predicted_classes == numpy.amax(predicted_classes[6:]))[0][0] - matrix.shape[1]

    return matrix[row][column]

In [32]:
# Function To Predict Characters
def predict_characters(
        average_signals,
        weights,
        extracted_channels,
        matrix,
        repeatitions = 15,
        start_window = 0,
        end_window = 240
    ):
    
    predicted_string = ''
    window = end_window - start_window
    intensifications = matrix.shape[0] + matrix.shape[1]
    
    flattened_signals = flatten_channels(average_signals, extracted_channels)
    
    for epoch in range(average_signals.shape[0]):
        
        preprocessed_signals = numpy.ones((flattened_signals.shape[1], flattened_signals.shape[2] + 1))
        for i in range(flattened_signals.shape[1]):
            preprocessed_signals[i] = numpy.pad(flattened_signals[epoch, i], (0,1), 'constant', constant_values=(0,1))
        
        predicted_string += predict_character(preprocessed_signals, weights, matrix)
    
    return predicted_string

# A
# Train
Subject_A_Train_Prediction = predict_characters(
    Subject_A_Average_Signal_Train,
    Subject_A_Weight_FLD_Train,
    EXTRACTED_CHANNELS,
    MATRIX,
    start_window=START_WINDOW,
    end_window=END_WINDOW
)
print('A Train True Value:', Subject_A_TargetChar_Train)
print('A Train Prediction:', Subject_A_Train_Prediction)
# Test
Subject_A_Test_Prediction = predict_characters(
    Subject_A_Average_Signal_Test,
    Subject_A_Weight_FLD_Train,
    EXTRACTED_CHANNELS,
    MATRIX,
    start_window=START_WINDOW,
    end_window=END_WINDOW
)
print('\nA Test True Value:', SUBJECT_A_TEST_RESULTS)
print('A Test Prediction:', Subject_A_Test_Prediction)

A Train True Value: EAEVQTDOJG8RBRGONCEDHCTUIDBPUHMEM6OUXOCFOUKWA4VJEFRZROLHYNQDW_EKTLBWXEPOUIKZERYOOTHQI
A Train Prediction: EAEVQTDOJG8RBRGOBCEDHCTUIDBPUHMEM6OUXOCFOUKWA4VJEFRZROLHYNQDW_EKTLBWXEPOUIKZERYOOTHQI

A Test True Value: WQXPLZCOMRKO97YFZDEZ1DPI9NNVGRQDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VHNEHBTXOO1TDOILUEE5BFAEEXAW_K4R3MRU
A Test Prediction: 9QX29NIIT4TOTUVB_UFBUVPD91NVGRQDHCVVS5FUOU7BJIU_MPOM_H7RVMUW5UVYV2Z9RBSXKO1TTQQXYE375OP9TXPA_JQOYO_I


In [35]:
# Function To Calculate Accuracy
def calculate_accuracy(
        target_characters,
        predicted_characters
    ):
    
    accuracy = 0
    for i in range(len(target_characters)):
        if target_characters[i] == predicted_characters[i]:
            accuracy += 1
    accuracy = (accuracy / len(target_characters)) * 100
    
    return accuracy

# A
# Train
print('A Train Prediction Accuracy:', calculate_accuracy(Subject_A_TargetChar_Train, Subject_A_Train_Prediction))
# Test
print('A Test Prediction Accuracy:', calculate_accuracy(SUBJECT_A_TEST_RESULTS, Subject_A_Test_Prediction))

A Train Prediction Accuracy: 98.82352941176471
A Test Prediction Accuracy: 28.000000000000004
