In [22]:
import math
import rtde_control
import rtde_receive
from psychopy import visual, core
import numpy as np
import time

IP_address = "169.254.85.188"

# --|Connect to the robot|--
rtde_c = rtde_control.RTDEControlInterface(IP_address) 
rtde_r = rtde_receive.RTDEReceiveInterface(IP_address)  

speed = [0,0,0,0,0,0]

rest_period_length = 1


def d2r(degrees):
    return degrees * (math.pi / 180)

def state(s):
    state.value = s
def t_rest(t):
    t_rest.value = t
def dir(d):
    dir.value = d

state(0)
t_rest(0)
dir(0)

def velo_joint(joint, direction):
    t_start = rtde_c.initPeriod()
    speed[joint] = 0.75 * direction
    rtde_c.speedJ(speed, 0.01, 1.0/500)
    if rtde_r.getActualQ()[joint] < d2r(-100) and direction == -1:
        direction = 1
        rtde_c.speedStop()
        print("Change Direction 1")
    if rtde_r.getActualQ()[joint] > d2r(100) and direction == 1:
        direction = -1
        rtde_c.speedStop()
        print("Change Direction -1")
    rtde_c.waitPeriod(t_start)
    return direction

def FSM(current_joint, blink, MI_classification_result):
    if state.value == 0:
        if blink == 1:
            state(1)
    elif state.value == 1:
        if MI_classification_result != 0:
            dir(MI_classification_result)
            state(2)
    elif state.value == 2:
        velo_joint(current_joint, dir.value)

        if blink == 1:
            t_rest(time.perf_counter() + rest_period_length)
            state(3)
    elif state.value == 3:
        if time.perf_counter() > t_rest.value:
            state(0)


In [25]:
type(5//3)

int

In [2]:
import numpy as np

In [39]:
Pscores = np.array([2,8])
k = 3

np.where(np.array([k,k]) < Pscores,'#751818','gray')

array(['#751818', 'gray'], dtype='<U7')

In [4]:
a = np.array([0, 1, 2])
b = 1

c = a+b
c

array([1, 2, 3])

# Scores calculation simulation (Double model)

In [333]:
n_trials  = [15, 15, 15, 0]     # Number of each class (None, Right, Left, Down)
t_cue = 3
classification_cycle_period = 0.1

events_id = {'none': 0, 'right': 1, 'left': 2}

def confusion_matrix(TP,FN,FP,TN):
    # -- |Results calculation| --
    accuracy = (TP + TN)/(TP + FP + TN + FN)

    if TP + FP == 0:
        precision = np.NaN
    else:
        precision = TP/(TP + FP)
    
    if TP + FN == 0:
        recall = np.NaN
    else:
        recall = TP/(TP + FN)
        
    f1 = 2*(precision*recall)/(precision + recall)

    # -- |Results display| --
    print('Confusion Matrix')
    print('--------------------------------------')
    print('|            |       Predicted       |')
    print('|            -------------------------   Accuracy  = {0:}'.format(accuracy))
    print('|            |     P     |     N     |')
    print('--------------------------------------   Precision = {0:}'.format(precision))
    print("|        | P |   {0:^5d}   |   {1:^5d}   |   Recall    = {2:}".format(TP, FN, recall))
    print('| Actual -----------------------------')
    print("|        | N |   {0:^5d}   |   {1:^5d}   |   F1-score  = {2:}".format(FP, TN, f1))
    print('--------------------------------------')

    return [accuracy, precision, recall, f1]

# -- |Create random label sequence| --
labels = np.zeros(n_trials[0], dtype='int')
for i in range(1,len(n_trials)):
    labels = np.concatenate((labels,np.full(n_trials[i],i, dtype='int')))
np.random.shuffle(labels)

j = 0
p = []
Pscores = np.array([0,0])
Nscores = np.array([0,0])
Pscores_history = []
Nscores_history = []
while j < len(labels):
    label = labels[j]
    for i in range(2):
        Pscores[i] = np.random.randint(0,30)
        Nscores[i] = 30 - Pscores[i]
    Pscores_history.append([label, Pscores])
    Nscores_history.append([label, Nscores])
    Pscores = np.array([0,0])
    Nscores = np.array([0,0])
    j += 1

Pscores_history = np.array(Pscores_history, dtype='object')
Nscores_history = np.array(Nscores_history, dtype='object')

positive = []
negative = []
for l in range(np.max(list(events_id.values())) + 1):
    Pscores_total = np.array([sh[1] for sh in Pscores_history if sh[0] == l])
    Pscores_total = np.sum(Pscores_total, axis = 0)
    positive.append(Pscores_total)

    Nscores_total = np.array([sh[1] for sh in Nscores_history if sh[0] == l])
    Nscores_total = np.sum(Nscores_total, axis = 0)
    negative.append(Nscores_total)

positive = np.array(positive)
negative = np.array(negative)
print('Left Model Performance')
confusion_matrix(positive[events_id['left']][0],negative[events_id['left']][0],(np.sum(positive[:,0]) - positive[events_id['left']])[0],(np.sum(negative[:,0]) - negative[events_id['left']])[0])
print('Right Model Performance')
confusion_matrix(positive[events_id['right']][1],negative[events_id['right']][1],(np.sum(positive[:,0]) - positive[events_id['right']])[1],(np.sum(negative[:,0]) - negative[events_id['right']])[1])

Left Model Performance
Confusion Matrix
--------------------------------------
|            |       Predicted       |
|            -------------------------   Accuracy  = 0.4288888888888889
|            |     P     |     N     |
--------------------------------------   Precision = 0.24564183835182252
|        | P |    155    |    295    |   Recall    = 0.34444444444444444
| Actual -----------------------------
|        | N |    476    |    424    |   F1-score  = 0.28677150786308975
--------------------------------------
Right Model Performance
Confusion Matrix
--------------------------------------
|            |       Predicted       |
|            -------------------------   Accuracy  = 0.585925925925926
|            |     P     |     N     |
--------------------------------------   Precision = 0.41362916006339145
|        | P |    261    |    189    |   Recall    = 0.58
| Actual -----------------------------
|        | N |    370    |    530    |   F1-score  = 0.4828862164662349
---

[0.585925925925926, 0.41362916006339145, 0.58, 0.4828862164662349]

# Scores calculation simulation (Single model)

In [607]:
n_trials  = [0, 15, 15, 0]     # Number of each class (None, Right, Left, Down)
t_cue = 3
classification_cycle_period = 0.1

decision_threshold = 0

events_id = {'none': 0, 'right': 1, 'left': 2}

def confusion_matrix(TP,FN,FP,TN):
    # -- |Results calculation| --
    accuracy = (TP + TN)/(TP + FP + TN + FN)

    if TP + FP == 0:
        precision = np.NaN
    else:
        precision = TP/(TP + FP)
    
    if TP + FN == 0:
        recall = np.NaN
    else:
        recall = TP/(TP + FN)
        
    f1 = 2*(precision*recall)/(precision + recall)

    # -- |Results display| --
    print('Confusion Matrix')
    print('--------------------------------------')
    print('|            |       Predicted       |')
    print('|            -------------------------   Accuracy  = {0:}'.format(accuracy))
    print('|            |     P     |     N     |')
    print('--------------------------------------   Precision = {0:}'.format(precision))
    print("|        | P |   {0:^5d}   |   {1:^5d}   |   Recall    = {2:}".format(TP, FN, recall))
    print('| Actual -----------------------------')
    print("|        | N |   {0:^5d}   |   {1:^5d}   |   F1-score  = {2:}".format(FP, TN, f1))
    print('--------------------------------------')

    return [accuracy, precision, recall, f1]

# -- |Create random label sequence| --
labels = np.zeros(n_trials[0], dtype='int')
for i in range(1,len(n_trials)):
    labels = np.concatenate((labels,np.full(n_trials[i],i, dtype='int')))
np.random.shuffle(labels)

j = 0
p = []
Pscores = np.array([0,0])
Nscores = 0
Pscores_history = []
Nscores_history = []
while j < len(labels):
    label = labels[j]

    for i in range(30):
        prob = np.random.rand()
        p = [prob, 1-prob]

        if np.abs(p[0] - p[1]) > decision_threshold:
            Pscores[np.argmax(p)] += 1
        else:
            Nscores += 1

    Pscores_history.append([label, Pscores])
    Nscores_history.append([label, Nscores])

    Pscores = np.array([0,0])
    Nscores = 0

    j += 1

Pscores_history = np.array(Pscores_history, dtype='object')
Nscores_history = np.array(Nscores_history)

positive = []
negative = []
for l in range(np.max(list(events_id.values())) + 1):
    Pscores_total = np.array([sh[1] for sh in Pscores_history if sh[0] == l])
    Pscores_total = np.sum(Pscores_total, axis = 0)
    if Pscores_total.all() == 0:
        Pscores_total = np.array([0,0])
    positive.append(Pscores_total)

    Nscores_total = np.array([sh[1] for sh in Nscores_history if sh[0] == l])
    Nscores_total = np.sum(Nscores_total, axis = 0)
    negative.append(Nscores_total)

positive = np.array(positive, dtype = 'int')
negative = np.array([negative], dtype = 'int').T

matrix = np.concatenate((negative,positive),axis=1)
multiclass_confusion_matrix(matrix)

Confusion Matrix
--------------------------------------------------
|            |             Predicted             |
|            -------------------------------------
|            |     N     |     R     |     L     |
--------------------------------------------------
|        | N |     0     |     0     |     0     |
|        -----------------------------------------
| Actual | R |     0     |    220    |    230    |
         -----------------------------------------
|        | L |     0     |    218    |    232    |
--------------------------------------------------
Class N Performance:
   Accuracy  = 1.0
   Precision = nan
   Recall    = nan
   F1-score  = nan

Class R Performance:
   Accuracy  = 0.6676557863501483
   Precision = 0.502283105022831
   Recall    = 0.4888888888888889
   F1-score  = 0.49549549549549554

Class L Performance:
   Accuracy  = 0.6676557863501483
   Precision = 0.5021645021645021
   Recall    = 0.5155555555555555
   F1-score  = 0.5087719298245613



[[1.0, 0.6676557863501483, 0.6676557863501483],
 [nan, 0.502283105022831, 0.5021645021645021],
 [nan, 0.4888888888888889, 0.5155555555555555],
 [nan, 0.49549549549549554, 0.5087719298245613]]

In [582]:
def multiclass_confusion_matrix(matrix, class_name = ['N','R','L']):
    '''Display Confusion Matrix'''
    # -- |Results display| --
    print('Confusion Matrix')
    print('--------------------------------------------------')
    print('|            |             Predicted             |')
    print('|            -------------------------------------')
    print('|            |     {0}     |     {1}     |     {2}     |'.format(class_name[0],class_name[1],class_name[2]))
    print('--------------------------------------------------')
    print("|        | {0} |   {1:^5d}   |   {2:^5d}   |   {3:^5d}   |".format(class_name[0],matrix[0,0],matrix[0,1],matrix[0,2]))
    print('|        -----------------------------------------')
    print("| Actual | {0} |   {1:^5d}   |   {2:^5d}   |   {3:^5d}   |".format(class_name[1],matrix[1,0],matrix[1,1],matrix[1,2]))
    print('         -----------------------------------------')
    print("|        | {0} |   {1:^5d}   |   {2:^5d}   |   {3:^5d}   |".format(class_name[2],matrix[2,0],matrix[2,1],matrix[2,2]))
    print('--------------------------------------------------')

    accuracy = []
    precision = []
    recall = []
    f1_score  = []
    # -- |Results calculation| --
    for i in range(matrix.shape[0]):
        TP = matrix[i,i]
        FN = np.sum(matrix[i,:]) - matrix[i,i]
        FP = np.sum(matrix[:,i]) - matrix[i,i]
        TN = np.sum(matrix) - matrix[i,i]

        acc = (TP + TN)/(TP + FP + TN + FN)

        if TP + FP == 0:
            pre = np.NaN
        else:
            pre = TP/(TP + FP)
        
        if TP + FN == 0:
            rec = np.NaN
        else:
            rec = TP/(TP + FN)
            
        f1 = 2*(pre*rec)/(pre + rec)

        print(f'Class {class_name[i]} Performance:')
        print(f'   Accuracy  = {acc}')
        print(f'   Precision = {pre}')
        print(f'   Recall    = {rec}')
        print(f'   F1-score  = {f1}\n')

        accuracy.append(acc)
        precision.append(pre)
        recall.append(rec)
        f1_score.append(f1)


    return [accuracy, precision, recall, f1_score]

In [562]:
matrix = np.concatenate((negative,positive),axis=1)
matrix

array([[  0.,   0.,   0.],
       [  0., 220., 230.],
       [  0., 239., 211.]])

In [554]:
negative

array([[ 16],
       [234],
       [212]])

In [558]:
positive

array([[  8,   6],
       [111, 105],
       [118, 120]])

In [509]:
decision_threshold = 0.5
Pscores = np.array([0,0])

In [521]:
prob = np.random.rand()
p = [prob, 1-prob]

if np.abs(p[0] - p[1]) > decision_threshold:
    Pscores[np.argmax(p)] += 1
Pscores

array([4, 1])

In [23]:
import mne
import numpy as np

timewindow_lenght = 3
t_baseline_online_B = 0.5

# -- |Time Window Initialization| --
timewindow = np.full((7,int(250*timewindow_lenght)),1e-15)
channels = ['C3','Cz','C4','Pz','PO7','PO8','EOG'] # Set your target EEG channel name
info = mne.create_info(
    ch_names= channels,
    ch_types= ['eeg']*(len(channels) - 1) + ['eog'],
    sfreq= 250,  # OpenBCI Frequency acquistion
    verbose=False
)

timewindow.shape

(7, 750)

In [25]:
timewindow_mne = mne.io.RawArray(timewindow, info, verbose=False)
timewindow_mne_B = timewindow_mne.copy().set_eeg_reference('average', verbose=False)
timewindow_mne_B = timewindow_mne_B.filter(l_freq=2.0, h_freq=15.0, fir_design='firwin',picks ='all', verbose=False)

In [26]:
realtime_data_B = timewindow_mne_B.get_data()
realtime_data_B = realtime_data_B - np.array([np.mean(realtime_data_B[:int((t_baseline_online_B)*250)], axis = 1)]).T

In [27]:
threshold = 0.1

if sum(realtime_data_B[-1] > threshold) >= 2: blink = 1
else: blink = 0

blink

0