In [1]:
import librosa
import librosa.display as display
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict 
import os
import pandas as pd
import numpy as np
import argparse
import pathlib

In [15]:
from sklearn.svm import SVC, LinearSVC, NuSVC
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import SGDClassifier
from sklearn.decomposition import PCA, FastICA, TruncatedSVD
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.multiclass import OneVsOneClassifier
from sklearn.multiclass import OneVsRestClassifier
from sklearn.model_selection import GridSearchCV
from scipy.signal import convolve
from tpot import TPOTClassifier

In [3]:
class Config:
	RECORDING_PATH_ROOT = "D:\Storage\EEGRecordings\Part_three\\"
	RECORDING_NUM_SAMPLES = 700
	SENSORS_LABELS = ["F3", "FC5", "AF3", "F7", "T7", "P7", "O1", "O2", "P8", "T8", "F8", "AF4", "FC6", "F4"]
	DATASET_TRAINING_VALIDATION_RATIO = 0.1
	CHECKPOINTS_DIR = './checkpoints'
	TENSORBOARD_LOGDIR = "logs"

In [4]:
def normalization(data):
    if type(data) != np.ndarray:
        raise Exception("Input data must be of type np.ndarray.")
    max_data = np.max(data)
    min_data = np.min(data)
    data = (data - min_data) / (max_data - min_data  + 1e-6)
    return data


def plot_single_signal(signal):
	signal = np.copy(signal)
	if type(signal) != np.ndarray:
		raise Exception("Input data must be of type np.ndarray.")
	plt.figure()
	plt.plot(signal)
	plt.xticks(np.arange(signal.shape[0]))
	plt.show()


def plot_recording(data: np.ndarray):
	data = np.copy(data)
	if type(data) != np.ndarray:
		raise Exception("Input data must be of type np.ndarray.")
	
	for row_idx in range(data.shape[1]):
		data[:, row_idx] = data[:, row_idx] + row_idx

	plt.figure()
	for idx in range(data.shape[1]):
		plt.plot(data[:, idx])
	plt.xticks(np.arange(data.shape[0]))
	plt.show()

def plot_mfcc(mfccs):
	plt.figure(figsize=(10, 4))
	display.specshow(mfccs, x_axis='time')
	plt.colorbar()
	plt.title('MFCC')
	plt.tight_layout()
	plt.show()

def show_gabor(I, **kwargs):
	# utility function to show image
	plt.figure()
	plt.axis('off')
	plt.imshow(I, cmap=plt.gray(), **kwargs)
	plt.show()

In [5]:
def get_recording_files_paths(mode="training") -> List[List]:
	# data_root = os.path.join(Config.RECORDING_PATH_ROOT, "\Park\Surdoiu_Tudor\Day_1")
	if mode == "training":
		data_root = Config.RECORDING_PATH_ROOT + "\\train"
	if mode == "testing":
		data_root = Config.RECORDING_PATH_ROOT + "\\test"

	data_root = pathlib.Path(data_root)
	classes_dir = list(data_root.glob('*'))
	print(f"Number of classes directories: [{len(classes_dir)}]")

	all_file_paths = []
	number_of_samples = 0

	for class_dir in classes_dir:
		recording_files = list(pathlib.Path(class_dir).glob('*'))
		all_file_paths.append([str(path) for path in recording_files])
		number_of_samples += len(recording_files)

	print(f"Scanned [{number_of_samples}] images")
	return all_file_paths

In [6]:
def compute_mfcc(recording):
	transformed_recording = None

	for i in range(recording.shape[1]):
		current_channel = librosa.feature.mfcc(recording[:, i], sr=160, n_mfcc=10, hop_length=10, n_fft=40)
		if transformed_recording is None:
			transformed_recording = current_channel
		else:
			transformed_recording = np.append(transformed_recording, current_channel, axis=0)

	assert transformed_recording is not None
	# print(transformed_recording.shape)
	return transformed_recording

In [7]:
def load_recording(path, use_mfcc=False):
	df = pd.read_csv(path, skiprows=[0], header=None, names=["COUNTER", "INTERPOLATED", "F3", "FC5", "AF3", "F7", "T7", "P7", "O1", "O2", "P8", "T8", "F8", "AF4", "FC6", "F4", "RAW_CQ", "GYROX"]) # "GYROY", "MARKER", "MARKER_HARDWARE", "SYNC", "TIME_STAMP_s", "TIME_STAMP_ms", "CQ_AF3", "CQ_F7", "CQ_F3", "CQ_FC5", "CQ_T7", "CQ_P7", "CQ_O1", "CQ_O2", "CQ_P8", "CQ_T8", "CQ_FC6", "CQ_F4", "CQ_F8", "CQ_AF4", "CQ_CMS", "CQ_DRL"])

	df = df[:Config.RECORDING_NUM_SAMPLES]
	df = df[Config.SENSORS_LABELS]
	recording = df.values
	recording.dtype = np.float64
	recording = normalization(recording)

	if recording.shape[0] < Config.RECORDING_NUM_SAMPLES:
		recording = np.pad(recording, ((0, Config.RECORDING_NUM_SAMPLES - recording.shape[0]), (0, 0)), mode="edge")
	
	if recording.shape[0] != Config.RECORDING_NUM_SAMPLES:
		raise Exception(f"Session number of samples is super not OK: [{recording.shape[0]}]")

	if use_mfcc:
		recording = compute_mfcc(recording)

	recording = np.transpose(recording)
	recording = recording.flatten()
	return recording

In [8]:
def create_training_dataset(shuffle=True, use_mfcc=False, use_gabor=False):
	recordings = []
	labels = []
	# dateset_file_paths = tf.data.Dataset.from_tensor_slices(get_recording_files_paths())

	gabor_filters = [genGabor((40, 1), omega=i) for i in np.arange(0.1, 1, 0.3)]
	print(f"Number of gabor filters used: [{len(gabor_filters)}]")

	for n, class_file_list in enumerate(get_recording_files_paths()):
		for m, file_path in enumerate(class_file_list):
			recording = load_recording(file_path, use_mfcc)

			if use_gabor == True:
				recording = np.empty((0), dtype=recording.dtype)
				for gabor in gabor_filters:
					recording = np.append(recording, convolve(recording, gabor, mode="valid"))
			
			recordings.append(recording)
			labels.append(n)

	return (recordings, labels)

In [9]:
def create_testing_dataset(use_mfcc=False, use_gabor=False):
	recordings = []
	labels = []

	gabor_filters = [genGabor((40, 1), omega=i) for i in np.arange(0.1, 1, 0.2)]

	for n, class_file_list in enumerate(get_recording_files_paths(mode="testing")):
		for m, file_path in enumerate(class_file_list):
			recording = load_recording(file_path, use_mfcc)

			if use_gabor == True:
				recording = np.empty((0), dtype=recording.dtype)
				for gabor in gabor_filters:
					recording = np.append(recording, convolve(recording, gabor, mode="valid"))
			
			recordings.append(recording)
			labels.append(n)

	return (recordings, labels)

In [10]:
def genGabor(sz, omega=0.5, theta=0, func=np.cos, K=np.pi):
	radius = (int(sz[0]/2.0), int(sz[1]/2.0))
	[x, y] = np.meshgrid(range(-radius[0], radius[0]+1), range(-radius[1], radius[1]+1))

	x1 = x * np.cos(theta) + y * np.sin(theta)
	y1 = -x * np.sin(theta) + y * np.cos(theta)
    
	gauss = omega**2 / (4*np.pi * K**2) * np.exp(- omega**2 / (8*K**2) * ( 4 * x1**2 + y1**2))
	sinusoid = func(omega * x1) * np.exp(K**2 / 2)
	gabor = gauss * sinusoid
	gabor = np.array(gabor)
	return gabor.flatten()


In [31]:
def evaluatePipeline(pipeline, pipelineName, param_grid, x_train, y_train, x_test, y_test):
    
	search = GridSearchCV(pipeline, param_grid, iid=False, cv=5)

	search.fit(x_train, y_train)
	print("Best parameter (CV score=%0.3f):" % search.best_score_)
	print(search.best_params_)

	result = search.predict(x_test)
	result_train = search.predict(x_train)
	print(result)

	print(pipelineName)
	print("Accuracy:    ", accuracy_score(y_test, result))
	print("Precission:  ", precision_score(y_test, result, average="binary"))
	print("Accuracy train:    ", accuracy_score(y_train, result_train))
	print("Precission train:  ", precision_score(y_train, result_train, average="binary"))
	print("Recall:  ", recall_score(y_test, result, average="binary"))
	print("F1:  ", f1_score(y_test, result, average="binary"))
	print("")


In [12]:
def create_classification_pipeline(classifier):
	return Pipeline([
		('pca', PCA(copy=True, iterated_power=10, n_components=None,
                     random_state=None, svd_solver='randomized', tol=0.0,
                     whiten=False)),
        ('standardscaler', MinMaxScaler(copy=True)),
		("Classifier", OneVsRestClassifier(classifier))
	])

In [13]:
use_mfcc = False
use_gabor = False

x_train, y_train = create_training_dataset(use_mfcc=use_mfcc, use_gabor=False)
x_test, y_test = create_testing_dataset(use_mfcc=use_mfcc, use_gabor=False)

x_train = np.asarray(x_train)
y_train = np.asarray(y_train)
x_test = np.asarray(x_test)
y_test = np.asarray(y_test)

print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

Number of gabor filters used: [3]
Number of classes directories: [2]
Scanned [318] images
Number of classes directories: [2]
Scanned [78] images
(318, 9800)
(318,)
(78, 9800)
(78,)


Hinge: (soft-margin) Support Vector Machines.
Log: Logistic Regression.
Least-Squares: Ridge Regression.
Epsilon-Insensitive: (soft-margin) Support Vector Regression.

In [32]:
param_grid = {
	'Classifier__estimator__alpha': [0.0001,  0.001,  0.005,  0.01],
    'Classifier__estimator__penalty': ["l2", "l1", "elasticnet"],    
    'Classifier__estimator__loss': ["hinge", "log", "squared_loss", "huber", "perceptron"]
}

pipeline = create_classification_pipeline(SGDClassifier(max_iter=2000, early_stopping=True))
evaluatePipeline(pipeline, "SGDClassifier", param_grid, x_train, y_train, x_test, y_test)

Best parameter (CV score=0.534):
{'Classifier__estimator__alpha': 0.0001, 'Classifier__estimator__loss': 'squared_loss', 'Classifier__estimator__penalty': 'l1'}
[0 0 0 1 0 0 1 1 0 0 0 0 0 0 0 1 1 1 0 0 0 1 0 1 0 0 0 1 1 1 0 1 0 1 1 1 1
 1 1 0 0 1 0 1 0 0 0 0 1 0 1 0 0 1 1 0 1 0 0 1 1 0 0 0 0 0 1 1 0 1 1 0 1 1
 0 0 0 0]
SGDClassifier
Accuracy:     0.46153846153846156
Precission:   0.45454545454545453


ValueError: Classification metrics can't handle a mix of continuous-multioutput and binary targets

Pipeline(memory=None,
         steps=[('pca',
                 PCA(copy=True, iterated_power=10, n_components=None,
                     random_state=None, svd_solver='randomized', tol=0.0,
                     whiten=False)),
                ('standardscaler',
                 StandardScaler(copy=True, with_mean=True, with_std=True)),
                ('xgbclassifier',
                 XGBClassifier(base_score=0.5, booster='gbtree',
                               colsample_bylevel=1, colsample_bynode=1,
                               colsample_bytree=1, gamma=0, learning_rate=1.0,
                               max_delta_step=0, max_depth=2,
                               min_child_weight=14, missing=None,
                               n_estimators=100, n_jobs=1, nthread=1,
                               objective='binary:logistic', random_state=0,
                               reg_alpha=0, reg_lambda=1, scale_pos_weight=1,
                               seed=None, silent=None, subsa