In [2]:
import numpy as np
import helper_functions as hf
from scipy.stats import kurtosis, skew
from pywt import dwt
from biosppy.signals.eeg import eeg
from sklearn.decomposition import PCA
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC


In [3]:
X_train_eeg1, _ = hf.read_csv_to_matrix("task5/input/train_eeg1.csv", "Id")
X_train_eeg2, _ = hf.read_csv_to_matrix("task5/input/train_eeg2.csv", "Id")
X_train_emg, _ = hf.read_csv_to_matrix("task5/input/train_emg.csv", "Id")
X_test_eeg1, _ = hf.read_csv_to_matrix("task5/input/test_eeg1.csv", "Id")
X_test_eeg2, _ = hf.read_csv_to_matrix("task5/input/test_eeg2.csv", "Id")
X_test_emg, _ = hf.read_csv_to_matrix("task5/input/test_emg.csv", "Id")
y_train, _ = hf.read_csv_to_matrix("task5/input/train_labels.csv", "Id")
_, test_index = hf.read_csv_to_matrix("task5/input/sample.csv", "Id")
y_train = np.squeeze(y_train)


In [4]:
def feature_extraction(signal):
    # ensure numpy
    signal = np.array(signal)

    # mean
    mean = np.mean(signal, axis=1)

    # median
    median = np.median(signal, axis=1)

    # maximum amplitude
    maxAmp = np.abs(signal - mean).max()

    # variance
    sigma2 = signal.var(ddof=1, axis=1)

    # standard deviation
    sigma = signal.std(ddof=1, axis=1)

    # absolute deviation
    ad = np.sum(np.abs(signal - median), axis=1)

    # kurtosis
    kurt = kurtosis(signal, bias=False, axis=1)

    # skweness
    skewness = skew(signal, bias=False, axis=1)

    return np.r_[mean, median, maxAmp, sigma2, sigma, ad, kurt, skewness]


def eeg_feature_extraction(x):
    x_new = []
    analysis = eeg(signal=x.transpose(), sampling_rate=128, show=False)
    v1 = analysis['filtered'].transpose()
    v2 = analysis['theta'].transpose()
    v3 = analysis['alpha_low'].transpose()
    v4 = analysis['alpha_high'].transpose()
    v5 = analysis['beta'].transpose()
    v6 = analysis['gamma'].transpose()
    x_new.append(np.c_[v1, v2, v3, v4, v5, v6, feature_extraction(v1), feature_extraction(v2), feature_extraction(v3), feature_extraction(v4), feature_extraction(v5), feature_extraction(v6)])
    return np.asarray(x_new)


def emg_feature_extraction(x):
    x_new = []
    cA, cD = dwt(data=x, wavelet='db2')
    for idx in range(cA.shape[0]):
        to_append = np.r_[cA[idx], cD[idx], feature_extraction(cA[idx]), feature_extraction(cD[idx])]
        x_new.append(to_append)

    return np.asarray(x_new)


In [6]:
print("==> Train feature extraction")
print("==> EEG1")
X_train_eeg1_new = eeg_feature_extraction(X_train_eeg1)


==> Train feature extraction
==> EEG1


In [9]:
print("==> EEG2")
X_train_eeg2_new = eeg_feature_extraction(X_train_eeg2)


==> EEG2


In [11]:
print("==> EMG")
X_train_emg_new = emg_feature_extraction(X_train_emg)


==> EMG



KeyboardInterrupt



In [19]:
# Fuse extracted features
X_train = np.c_[X_train_eeg1_new, X_train_eeg2_new]


In [14]:
print("==> PCA")
pca = PCA(
    n_components=10,
    whiten=True
)

X_train = pca.fit_transform(X_train)


==> PCA


In [20]:
print("==> Initialize classifier")
clf = SVC(
    C=1.0,
    kernel='rbf',
    shrinking=True,
    probability=False,
    class_weight='balanced'
)



==> Initialize classifier


In [21]:
print("==> 3 fold crossvalidation")
scores = cross_val_score(
    estimator=clf,
    X=X_train,
    y=y_train,
    groups=None,
    scoring=hf.scorer(),
    cv=3,
    n_jobs=8,
    verbose=0,
    fit_params=None,
    pre_dispatch='2*n_jobs',
)

print("Accuracy: %0.4f (+/- %0.4f)" % (scores.mean(), scores.std() * 2))


==> 3 fold crossvalidation
