In [None]:
## Librerias
import os 
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.model_selection import KFold,RepeatedKFold,train_test_split
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.preprocessing.image import ImageDataGenerator
from keras.preprocessing import image
import tensorflow as tf
from tensorflow.keras import Input, Model, optimizers,regularizers
from tensorflow.keras.constraints import max_norm
from tensorflow.keras.layers import Dense,\
                                    Conv2D,\
                                    BatchNormalization,\
                                    AveragePooling2D, \
                                    MaxPooling2D, \
                                    DepthwiseConv2D, \
                                    Activation, \
                                    Dropout,\
                                    Flatten

In [None]:
## Cargar data de training y test 

def get_dataset(dir_task,num_class):
    """
    Construye el dataset a partir de la direccion de carpetas y la cantidad de clases
    a clasificar.
    
    Argumentos: dir_task(Direccion de carpetas) y num_class(cantidad de clases)
    Output: inputs(imagenes) y targets(vector de clases) 
    """
    
    train_dir = os.path.join(dir_task, "train_data")
    test_dir = os.path.join(dir_task, "test_data")
    
    type_class = 'binary'
    if (num_class != 2):
        type_class = 'sparse'
    
    datagen_task = ImageDataGenerator(rescale=1./255)

    # Train
    
    data_task = datagen_task.flow_from_directory(
        train_dir, 
        batch_size = 100,
        target_size=(128, 128),
        class_mode = type_class,
        color_mode="rgb"
        )
    
    num_samples = 0
    for i in range(len(data_task)):
        num_samples += len(data_task[i][1]) 


    X_train = np.zeros(shape=(num_samples, 128, 128, 3))
    Y_train = np.zeros(shape=(num_samples))
    i=0

    for inputs_batch,labels_batch in data_task:
        X_train[i * 100 : (i + 1) * 100] =  inputs_batch
        Y_train[i * 100 : (i + 1) * 100] = labels_batch
        i += 1
        if i * 100 >= num_samples:
            break
    
    # Test
    
    test_task = datagen_task.flow_from_directory(
        test_dir, 
        batch_size = 100,
        target_size=(128, 128),
        class_mode = type_class
        )
    
    num_samples = 0
    for i in range(len(test_task)):
        num_samples += len(test_task[i][1]) 
    
    X_test = np.zeros(shape=(num_samples, 128, 128, 3))
    Y_test = np.zeros(shape=(num_samples))
    i=0

    for inputs_batch,labels_batch in test_task:
        X_test[i * 100 : (i + 1) * 100] =  inputs_batch
        Y_test[i * 100 : (i + 1) * 100] = labels_batch
        i += 1
        if i * 100 >= num_samples:
            break
    
    return X_train,X_test,Y_train,Y_test

In [None]:
def get_dataset_v2(dir_task,num_class):
    """
    Construye el dataset a partir de la direccion de carpetas y la cantidad de clases
    a clasificar. La versión 2 get_dataset obtiene las imagenes de las dos clases de forma ordenada por repeticiones/canales
    
    Argumentos: dir_task(Direccion de carpetas) y num_class(cantidad de clases)
    Output: inputs(imagenes) y targets(vector de clases) 
    """
    
    train_dir = os.path.join(dir_task, "train_data")
    test_dir = os.path.join(dir_task, "test_data")
    
    # Training
    
    X_train1 = np.zeros(shape=(250,128, 128, 3))
    X_train2 = np.zeros(shape=(250,128, 128, 3))
    X_train3 = np.zeros(shape=(250,128, 128, 3))
    
    fnames_tasks = [os.path.join(train_dir,fname) for fname in os.listdir(train_dir)]
    
    fnames = [os.path.join(fnames_tasks[0],fname) for fname in os.listdir(fnames_tasks[0])]
    for i in range(0,250):
        img_path = fnames[i]
        img = image.load_img(img_path,target_size=(128,128))
        x = image.img_to_array(img)
        x = x.astype('float32') / 255
        X_train1[i] = x
    
    fnames = [os.path.join(fnames_tasks[1],fname) for fname in os.listdir(fnames_tasks[1])]
    for i in range(0,250):
        img_path = fnames[i]
        img = image.load_img(img_path,target_size=(128,128))
        x = image.img_to_array(img)
        x = x.astype('float32') / 255
        X_train2[i] = x
    
    fnames = [os.path.join(fnames_tasks[2],fname) for fname in os.listdir(fnames_tasks[2])]
    for i in range(0,250):
        img_path = fnames[i]
        img = image.load_img(img_path,target_size=(128,128))
        x = image.img_to_array(img)
        x = x.astype('float32') / 255
        X_train3[i] = x
   
    X_train = np.concatenate((X_train1,X_train2,X_train3),axis = 0)
    
    Y_task1 = np.zeros((300,), dtype=np.float64)
    Y_task2 = np.ones((300,), dtype=np.float64)
    Y_task3 = 2*np.ones((300,), dtype=np.float64)
    
    Y_train = np.concatenate((Y_task1,Y_task2,Y_task3))
    # Testing
    
    type_class = 'binary'
    if (num_class != 2):
        type_class = 'sparse'
    
    datagen_task = ImageDataGenerator(rescale=1./255)
    
    test_task = datagen_task.flow_from_directory(
        test_dir, 
        batch_size = 100,
        target_size=(128, 128),
        class_mode = type_class
        )
    
    num_samples = 0
    for i in range(len(test_task)):
        num_samples += len(test_task[i][1]) 
    
    X_test = np.zeros(shape=(num_samples, 128, 128, 3))
    Y_test = np.zeros(shape=(num_samples))
    i=0

    for inputs_batch,labels_batch in test_task:
        X_test[i * 100 : (i + 1) * 100] =  inputs_batch
        Y_test[i * 100 : (i + 1) * 100] = labels_batch
        i += 1
        if i * 100 >= num_samples:
            break
            
    return X_train,X_test,Y_train,Y_test

In [None]:
def EEGNet_model(num_class):
    """
    Construye una EEGNet utilizando Tensorflow de forma secuencial.
    
    Argumentos: num_class
    Output: EEGNet como modelo de Tensorflow.keras
    """
    
    
    EEGNet = tf.keras.Sequential()

    # Block1
    regularizers.l2(1e-4)
    EEGNet.add(Conv2D(4, (1, 125),
                        padding='same',
                        use_bias=False,
                        name='tfconv',input_shape = (128,128,3)))
    EEGNet.add(BatchNormalization(axis=-1))
    EEGNet.add(DepthwiseConv2D((6, 1),
                             use_bias=False,
                             depth_multiplier=2,
                             depthwise_constraint=max_norm(1.),
                             name='sconv'))
    EEGNet.add(BatchNormalization(axis=-1))
    EEGNet.add(Activation('elu'))
    EEGNet.add(AveragePooling2D((1, 4)))
    EEGNet.add(Dropout(0.5))

    # Block 2

    EEGNet.add(Conv2D(8, (1, 32),
                             padding='same',
                             use_bias=False,
                             name='fs',
                             kernel_regularizer='l2'
                     ))
    EEGNet.add(BatchNormalization(axis=-1))
    EEGNet.add(Activation('elu'))
    EEGNet.add(AveragePooling2D((1, 8)))
    EEGNet.add(Dropout(0.5))

    # Output

    EEGNet.add(Flatten(name='flatten'))

    EEGNet.add(Dense(num_class,
                  name='dense',
                  kernel_constraint=max_norm(0.25)))
    EEGNet.add(Activation('softmax', name='softmax'))

    return EEGNet

In [None]:
def get_compile(model: tf.keras.Model):
    """
    Compila el modelo con un optimizador Adam (lr = 0.001), loss categorico y como metrica el accuracy
    
    Argumentos: CNN como modelo
    Output: Modelo compilado
    """
    model.compile(optimizer=optimizers.Adam(learning_rate=0.001),
                   loss='sparse_categorical_crossentropy',
                   metrics=['acc'])
    return model

In [None]:
def get_EEGNet(num_class):
    """
    Crea y compila una EEGNet lista para entrenarla y clasificar. Se adapta según la 
    cantidad de clases
    
    Argumentos: num_class
    Output: modelo listo 
    """
    
    model = EEGNet_model(num_class)
    model = get_compile(model)
    
    return model

In [None]:
def plot_acc(history,fold):
    plt.title('Train Accuracy vs Val Accuracy Fold:' + str(fold))
    plt.plot(history.history['acc'], label='Train Accuracy Fold ', color='black')
    plt.plot(history.history['val_acc'], label='Val Accuracy Fold ', color='red', linestyle = "dashdot")
    plt.legend()
    plt.show()
    
def plot_loss(history,fold):
    plt.title('Train Loss vs Val Loss Fold:' + str(fold))
    plt.plot(history.history['loss'], label='Train Loss Fold ', color='black')
    plt.plot(history.history['val_loss'], label='Val Loss Fold ', color='red', linestyle = "dashdot")
    plt.legend()
    plt.show()
    
def plot_metrics(history,fold):
    plot_acc(history,fold)
    plot_loss(history,fold)

In [None]:
def print_subject_results(val_per_fold,acc_per_fold,repetitions):
    """
    Imprime los accuracy conseguido en cada Fold y calcula el acc y loss promedio de los folds de cada run
    
    Argumentos: Vector de loss dim(folds * repeticiones)
                Vector de accuracy 
    Output: Acc promedio final
            Std promedio final
    """
    
    test_score_final = []
    val_score_final = []
    print('******************')
    print('Precision por run')
    print('**************************')
    print('*Val_acc --------Test_acc*')
    for i in range(0,repetitions):
        max_per_fold = max(acc_per_fold[0*i:5*(i+1)])
        test_score_final.append(max_per_fold[1])
        val_score_final.append(np.mean(val_per_fold[0*i:5*(i+1)]))

    val_mean = np.mean(val_score_final)    
    val_std = np.std(val_score_final)
    test_mean = np.mean(test_score_final)
    test_std = np.std(test_score_final)
        
    print(f'Val Accuracy:{val_mean*100} +- {val_std*100}')
    print(f'Test Accuracy:{test_mean*100} +- {test_std*100}')
    
    return val_mean,val_std,test_mean,test_std

In [None]:
def Kcross_validation(num_class,X_train,X_test,Y_train,Y_test,repetitions):
    """
    Aplica Repeated K-cross validation considerando la repeticiones deseadas
    
    Argumentos: num_class
                inputs(imagenes)
                targets(clases)
                repetitions
                
    Output: val_mean,val_std,test_mean,test_std
    """
    
    # Per-fold score containers 
    acc_per_fold = []
    val_per_fold = []
    kfold = RepeatedKFold(n_splits = 5, n_repeats = repetitions)
    fold_n = 1
    
    for train,val in kfold.split(X_train,Y_train):
        EEGNet = get_EEGNet(num_class)
        print('------------------------------------------------------------------------')
        print(f'Training for fold {fold_n} ...')

        pat = 25
        early_stopping = EarlyStopping(monitor='val_acc', mode='max', patience=pat, verbose=1)
        model_checkpoint = ModelCheckpoint('./model_checkpoint', verbose=1, save_best_only=True, monitor='val_acc',
        mode='max')
        
        history = EEGNet.fit(X_train[train] , Y_train[train], 
                             epochs = 50, steps_per_epoch = 2, validation_data = (X_train[val],Y_train[val]), 
                             callbacks=[early_stopping, model_checkpoint]
                            )
        plot_metrics(history,fold_n)
    
        EGGNetnew = tf.keras.models.load_model('./model_checkpoint')
        val_scores = EGGNetnew.evaluate(X_train[val],Y_train[val],verbose=0)
        test_scores = EGGNetnew.evaluate(X_test,Y_test,verbose=0)

        print(f'Val-Score for fold {fold_n}: {EEGNet.metrics_names[0]} of {val_scores[0]}; {EEGNet.metrics_names[1]} of {val_scores[1]*100}%')
        print(f'Test-Score for fold {fold_n}: {EEGNet.metrics_names[0]} of {test_scores[0]}; {EEGNet.metrics_names[1]} of {test_scores[1]*100}%')
        acc_per_fold.append((val_scores[1],test_scores[1]))
        val_per_fold.append(val_scores[1])

        # Increse number of fold

        fold_n = fold_n + 1
    
    val_mean,val_std,test_mean,test_std = print_subject_results(val_per_fold,acc_per_fold,repetitions)
    
    return val_mean,val_std,test_mean,test_std

In [None]:
def Kcross_validation_v2(num_class,X_train,X_test,Y_train,Y_test,repetitions):
    """
    Aplica Repeated K-cross validation considerando la repeticiones deseadas. Separa training y validation por canales 
    donde se valida con 1 canal. El canal utilizado en testeo se obtiene ya desde get_dataset
    
    Argumentos: num_class
                inputs(imagenes)
                targets(clases)
                repetitions
                
    Output: val_mean,val_std,test_mean,test_std
        
    """
    
    # Per-fold score containers 
    acc_per_fold = []
    val_per_fold = []
    kfold = RepeatedKFold(n_splits = 5, n_repeats = repetitions)
    
    fold_n = 1
    list_channels = np.array([0,1,2,3,4])
    step_per_chn = 50
    
    for train,val in kfold.split(list_channels):
        
        train_index = np.zeros(shape = 0)
        val_index = np.zeros(shape = 0)

        for i in train:
            temp = range(i*step_per_chn,(i+1)*step_per_chn)
            train_index = np.concatenate((train_index,temp))

        for k in val:
            temp = range(k*step_per_chn,(k+1)*step_per_chn)
            val_index = np.concatenate((val_index,temp))
        
        train_index = np.concatenate((train_index,train_index+250,train_index+500))
        val_index = np.concatenate((val_index,val_index+250,val_index+500))

        np.random.shuffle(train_index)
        np.random.shuffle(val_index)
        
        train_index = train_index.astype(int)
        val_index = val_index.astype(int)
        
        EEGNet = get_EEGNet(num_class)
        print('------------------------------------------------------------------------')
        print(f'Training for fold {fold_n} and Validating with Chn {val[0]+1}')

        pat = 50
        early_stopping = EarlyStopping(monitor='val_acc', mode='max', patience=pat, verbose=1)
        model_checkpoint = ModelCheckpoint('./model_checkpoint', verbose=1, save_best_only=True, monitor='val_acc',
        mode='max')
        
        history = EEGNet.fit(X_train[train_index] , Y_train[train_index], 
                             epochs = 100, steps_per_epoch = 5, validation_data = (X_train[val_index],Y_train[val_index]), 
                             callbacks=[early_stopping, model_checkpoint]
                            )
        plot_metrics(history,fold_n)
    
        EGGNetnew = tf.keras.models.load_model('./model_checkpoint')
        val_scores = EGGNetnew.evaluate(X_train[val_index],Y_train[val_index],verbose=0)
        test_scores = EGGNetnew.evaluate(X_test,Y_test,verbose=0)
    
        print(f'Val-Score for fold {fold_n}: {EEGNet.metrics_names[0]} of {val_scores[0]}; {EEGNet.metrics_names[1]} of {val_scores[1]*100}%')
        print(f'Test-Score for fold {fold_n}: {EEGNet.metrics_names[0]} of {test_scores[0]}; {EEGNet.metrics_names[1]} of {test_scores[1]*100}%')
        acc_per_fold.append((val_scores[1],test_scores[1]))
        val_per_fold.append(val_scores[1])
        
         # Increse number of fold
        fold_n = fold_n + 1
        
    val_mean,val_std,test_mean,test_std = print_subject_results(val_per_fold,acc_per_fold,repetitions)
    
    return val_mean,val_std,test_mean,test_std

In [None]:
resultados_finales = []
num_test = 1
num_class = 3
repetitions = 1

for i in range(num_test):
    dir_task = r'C:\Users\Lenovo\Documents\UTEC\Ciclo 7\ProyectoCNN\Python\Dataset\Test' + str(i+1)
    X_train,X_test,Y_train,Y_test = get_dataset_v2(dir_task,num_class)
    val_mean,val_std,test_mean,test_std = Kcross_validation_v2(num_class,X_train,X_test,Y_train,Y_test,repetitions)
    resultados_finales.append(f'Val: {val_mean} +- {val_std} // Test: {test_mean} +- {test_std}')
    
resultados_finales