In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.ensemble import *
from sklearn.model_selection import *
from sklearn.metrics import *
from sklearn.svm import SVR
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsRegressor
from sklearn.linear_model import *
from sklearn.preprocessing import *

import scipy
from scipy import stats
from scipy import spatial

import seaborn as sns

from functions import calcBandwidth
from functions import calcFD
from functions import calcBandPowers
from functions import getIMFstats
from functions import getStatistics

In [None]:
# Import features and labels
features = np.load("../data/other/features_raw.npy")
labels = np.load("../data/other/labels.npy")

In [None]:
# Reshape features to get the channels before the samples
features_eeg = features.reshape(2717, 20, -1)
features_eeg.shape

In [None]:
# Function for deducting right hemisphere channels from the left hemisphere channels.
def deduct_features_after_extraction(features):
    left = [0,2,3,4,5,6,7,8,9]
    right = [10,12,13,14,15,16,17,18,19]
    neutral = [1,11]
    for nF, feature in enumerate(features):
        multiplyer = int(feature.shape[1] / 20)
        for nR, row in enumerate(feature):
            for c in range(0,len(left)):
                deduction = row[(left[c]*multiplyer):((left[c]+1)*multiplyer)]-row[(right[c]*multiplyer):((right[c]+1)*multiplyer)]
                if c == 0:
                    temp_array = deduction
                else:
                    temp_array = np.hstack((temp_array, deduction))
            temp_array = np.hstack((temp_array,row[(neutral[0]*multiplyer):((neutral[0]+1)*multiplyer)]))
            temp_array = np.hstack((temp_array,row[(neutral[1]*multiplyer):((neutral[1]+1)*multiplyer)]))
            if nR == 0:
                temp_feature = temp_array
            else:
                temp_feature = np.vstack((temp_feature, temp_array))
        if nF == 0:
            result = temp_feature
        else:
            result = np.vstack((result, temp_feature))
    return result

In [None]:
# Running functions to create the features. Functions are specified in functions.py
features_eeg_FD = calcFD(features_eeg, 50)
features_eeg_bp = calcBandPowers(features_eeg)
features_eeg_stat = getStatistics(features_eeg)
features_eeg_hht = getIMFstats(features_eeg)

# Apply the deduction function
deductFD = deduct_features_after_extraction(features_eeg_FD.reshape(1,2717,20))
deductStat = deduct_features_after_extraction(features_eeg_stat.reshape(1,2717,-1))
deductBP = deduct_features_after_extraction(features_eeg_bp.reshape(1,2717,-1))
deductHHT = deduct_features_after_extraction(features_eeg_hht.reshape(1,2717,-1))

# Combine feature setup 1
features_result = np.hstack((deductFD, deductStat))
features_result = np.hstack((features_result, deductBP))

In [None]:
# Split setup 1 features into train/test
X_train, X_test, A_train, A_test = train_test_split(
    features_result, labels[0], test_size=0.3, random_state=42)
X_train, X_test, V_train, V_test = train_test_split(
    features_result, labels[1], test_size=0.3, random_state=42)

# Split setup 2 features into train/test
X_train2, X_test2, A_train2, A_test2 = train_test_split(
    deductHHT, labels[0], test_size=0.3, random_state=42)
X_train2, X_test2, V_train2, V_test2 = train_test_split(
    deductHHT, labels[1], test_size=0.3, random_state=42)

In [None]:
# Running all models for setup 1
LR_Arousal_1 = LinearRegression()
LR_Arousal_1.fit(X_train, A_train)
LR_Arousal_1_results = LR_Arousal_1.predict(X_test)
print("Arousal, MAE:", mean_absolute_error(A_test, LR_Arousal_1_results))
print("Arousal, bandwidth:", calcBandwidth(A_test, LR_Arousal_1_results))

KNN_Arousal_1 = KNeighborsRegressor(20)
KNN_Arousal_1.fit(X_train, A_train)
KNN_Arousal_1_results = KNN_Arousal_1.predict(X_test)
print("Arousal, MAE:", mean_absolute_error(A_test, KNN_Arousal_1_results))
print("Arousal, bandwidth:", calcBandwidth(A_test, KNN_Arousal_1_results))

SVR_Arousal_1 = SVR(C = 0.6, epsilon = 0.05)
SVR_Arousal_1.fit(X_train, A_train)
SVR_Arousal_1_results = SVR_Arousal_1.predict(X_test)
print("Arousal, MAE:", mean_absolute_error(A_test, SVR_Arousal_1_results))
print("Arousal, bandwidth:", calcBandwidth(A_test, SVR_Arousal_1_results))

LR_Valence_1 = LinearRegression()
LR_Valence_1.fit(X_train, V_train)
LR_Valence_1_results = LR_Valence_1.predict(X_test)
print("\nValence, MAE:", mean_absolute_error(V_test, LR_Valence_1_results))
print("Valence, bandwidth:", calcBandwidth(V_test, LR_Valence_1_results))

KNN_Valence_1 = KNeighborsRegressor(25)
KNN_Valence_1.fit(X_train, V_train)
KNN_Valence_1_results = KNN_Valence_1.predict(X_test)
print("Valence, MAE:", mean_absolute_error(V_test, KNN_Valence_1_results))
print("Valence, bandwidth:", calcBandwidth(V_test, KNN_Valence_1_results))

SVR_Valence_1 = SVR(C = 3, epsilon = 0.1)
SVR_Valence_1.fit(X_train, V_train)
SVR_Valence_1_results = SVR_Valence_1.predict(X_test)
print("Valence, MAE:", mean_absolute_error(V_test, SVR_Valence_1_results))
print("Valence, bandwidth:", calcBandwidth(V_test, SVR_Valence_1_results))

In [None]:
# Running all models for setup 2
LR_Arousal_2 = LinearRegression()
LR_Arousal_2.fit(X_train2, A_train2)
LR_Arousal_2_results = LR_Arousal_2.predict(X_test2)
print("Arousal, MAE:", mean_absolute_error(A_test2, LR_Arousal_2_results))
print("Arousal, bandwidth:", calcBandwidth(A_test2, LR_Arousal_2_results))

KNN_Arousal_2 = KNeighborsRegressor(17, weights = "distance")
KNN_Arousal_2.fit(X_train2, A_train2)
KNN_Arousal_2_results = KNN_Arousal_2.predict(X_test2)
print("Arousal, MAE:", mean_absolute_error(A_test2, KNN_Arousal_2_results))
print("Arousal, bandwidth:", calcBandwidth(A_test2, KNN_Arousal_2_results))

SVR_Arousal_2 = SVR(C = 0.6, epsilon = 0.1)
SVR_Arousal_2.fit(X_train2, A_train2)
SVR_Arousal_2_results = SVR_Arousal_2.predict(X_test2)
print("Arousal, MAE:", mean_absolute_error(A_test2, SVR_Arousal_2_results))
print("Arousal, bandwidth:", calcBandwidth(A_test2, SVR_Arousal_2_results))

LR_Valence_2 = LinearRegression()
LR_Valence_2.fit(X_train2, V_train2)
LR_Valence_2_results = LR_Valence_2.predict(X_test2)
print("\nValence, MAE:", mean_absolute_error(V_test2, LR_Valence_2_results))
print("Valence, bandwidth:", calcBandwidth(V_test2, LR_Valence_2_results))

KNN_Valence_2 = KNeighborsRegressor(17, p = 2)
KNN_Valence_2.fit(X_train2, V_train2)
KNN_Valence_2_results = KNN_Valence_2.predict(X_test2)
print("Valence, MAE:", mean_absolute_error(V_test2, KNN_Valence_2_results))
print("Valence, bandwidth:", calcBandwidth(V_test2, KNN_Valence_2_results))

SVR_Valence_2 = SVR(C = 0.8, epsilon = 0.1)
SVR_Valence_2.fit(X_train2, V_train2)
SVR_Valence_2_results = SVR_Valence_2.predict(X_test2)
print("\nValence, MAE:", mean_absolute_error(V_test2, SVR_Valence_2_results))
print("Valence, bandwidth:", calcBandwidth(V_test2, SVR_Valence_2_results))

In [None]:
# Function for calculating the bins inside the predictions
def calcBins(predictions, true, element):
    dimensions = [[1,7], [2,8]]
    result = []
    tres = 0.1 * (np.max(true) - np.min(true))
    
    for n in range(dimensions[element][0], dimensions[element][1]):
        temp_n = 0
        sum1 = 0
        for p,t in zip(predictions, true):
            if (t >= n) & (t < n+1):
                var = spatial.distance.euclidean(p,t)
                if var < tres:
                    temp_n += 1
                sum1 += 1
        if temp_n != 0:
            result += [temp_n/sum1*100]
        else:
            result += [0]
    return result

In [None]:
# Applied
t1 = calcBins(LR_Arousal_1_results, A_test,0)
t2= calcBins(LR_Arousal_2_results, A_test2, 0)
t3 = calcBins(KNN_Arousal_1_results, A_test, 0)
t4 = calcBins(KNN_Arousal_2_results, A_test2, 0)
t5 = calcBins(SVR_Arousal_1_results, A_test, 0)
t6 = calcBins(SVR_Arousal_2_results, A_test2, 0)

t10 = calcBins(LR_Valence_1_results, V_test,1)
t11 = calcBins(LR_Valence_2_results, V_test2, 1)
t12 = calcBins(KNN_Valence_1_results, V_test, 1)
t13 = calcBins(KNN_Valence_2_results, V_test2, 1)
t14 = calcBins(SVR_Valence_1_results, V_test, 1)
t15 = calcBins(SVR_Valence_2_results, V_test2, 1)

arousalBins = [t1, t2, t3, t4, t5, t6]
valenceBins = [t10, t11, t12, t13, t14, t15]

In [None]:
# Prediction heatmap arousal
plt.figure(dpi = 250)
ax = sns.heatmap(np.array(test7), linewidth=0.9)
plt.xlabel("True arousal")
plt.ylabel("Model")
plt.xticks(np.arange(7),["1","2","3","4","5","6","7"])
plt.yticks(np.arange(7), ["LR-1", "LR-2", "KNNR-1", "KNNR-2", "SVR-1", "SVR-2"], rotation = 20)
plt.savefig("../Visuals/heatmap_predictions_arousal.png", bbox_inches="tight")
plt.show()

# Prediction heatmap valence
plt.figure(dpi = 250)
ax = sns.heatmap(np.array(test8), linewidth=0.9)
plt.xlabel("True valence")
plt.ylabel("Model")
plt.xticks(np.arange(7),["2","3","4","5","6","7","8"])
plt.yticks(np.arange(7), ["LR-1", "LR-2", "KNNR-1", "KNNR-2", "SVR-1", "SVR-2"], rotation = 20)
plt.savefig("../Visuals/heatmap_predictions_valence.png", bbox_inches="tight")
plt.show()

In [None]:
# Independent t-test
stats.ttest_ind(SVR_Arousal_1_results, KNN_Arousal_2_results)
stats.ttest_ind(KNN_Valence_1_results, KNN_Valence_2_results)

In [None]:
# Pearson's r correlations between arousal and setup 1
correlations = []
for i in range(features_result.shape[1]):
    correlations += [scipy.stats.pearsonr(features_result[:,i], labels[0])[0]]
    
# Pearson's r correlations between arousal and setup 2
correlations_hht = []
for i in range(deductHHT.shape[1]):
    correlations_hht += [scipy.stats.pearsonr(deductHHT[:,i], labels[0])[0]]

# Pearson's r correlations between valence and setup 1
correlations2 = []
for i in range(features_result.shape[1]):
    correlations2 += [scipy.stats.pearsonr(features_result[:,i], labels[1])[0]]

# Pearson's r correlations between valence and setup 2
correlations_hht2 = []
for i in range(deductHHT.shape[1]):
    correlations_hht2 += [scipy.stats.pearsonr(deductHHT[:,i], labels[1])[0]]

In [None]:
# Correlation plot setup 1
sns.set(color_codes=True)

plt.figure(dpi = 150)
sns.distplot(correlations, label = "Arousal")
sns.distplot(correlations2, label = "Valence")
plt.legend()
plt.xlabel("Pearsons' r correlation")
plt.ylabel("Number of features correlated")
plt.savefig("../Visuals/correlations_setup_1.png")
plt.show()

In [None]:
# Correlation plot setup 2
sns.set(color_codes=True)

plt.figure(dpi = 150)
sns.distplot(correlations_hht, label = "Arousal")
sns.distplot(correlations_hht2, label = "Valence")
plt.legend()
plt.xlabel("Pearsons' r correlation")
plt.ylabel("Number of features correlated")
plt.savefig("../Visuals/correlations_setup_2.png")
plt.show()

In [None]:
# Prediction matrix of the two-dimensional model arousal-valence
plt.figure(dpi = 250)
sns.scatterplot(x = A_test, y = V_test, marker = "+", label="Label")
sns.scatterplot(x = KNN_Arousal_2_results, y = KNN_Valence_2_results, label = "Prediction")
plt.xlim(1,7)
plt.ylim(3,8)
plt.xlabel("Arousal")
plt.ylabel("Valence")
plt.legend()
plt.savefig("../Visuals/prediction_matrix.png", bbox_inches="tight")
plt.show()

In [None]:
# Density estimations for labels
sns.set(color_codes=True)

plt.figure(dpi = 150)
sns.distplot(labels[0], label = "Arousal")
sns.distplot(labels[1], label = "Valence")
plt.legend()
plt.xlabel("Value for emotion")
plt.ylabel("Density estimation")
plt.savefig("../Visuals/distribution_arousal_valence.png")
plt.show()

In [None]:
# Adjustment of a function in pyhht package, so that the plot becomes better
def plot_imfs2(signal, imfs, time_samples=None, fignum=None, show=True):
    """
    Plot the signal, IMFs and residue.
    Parameters
    ----------
    signal : array-like, shape (n_samples,)
        The input signal.
    imfs : array-like, shape (n_imfs, n_samples)
        Matrix of IMFs as generated with the `EMD.decompose` method.
    time_samples : array-like, shape (n_samples), optional
        Time instants of the signal samples.
        (defaults to `np.arange(1, len(signal))`)
    fignum : int, optional
        Matplotlib figure number (by default a new figure is created)
    show : bool, optional
        Whether to display the plot. Defaults to True, set to False if further
        plotting needs to be done.
    Returns
    -------
    `matplotlib.figure.Figure`
        The figure (new or existing) in which the decomposition is plotted.
    Examples
    --------
    >>> from pyhht.visualization import plot_imfs
    >>> import numpy as np
    >>> from pyhht import EMD
    >>> t = np.linspace(0, 1, 1000)
    >>> modes = np.sin(2 * np.pi * 5 * t) + np.sin(2 * np.pi * 10 * t)
    >>> x = modes + t
    >>> decomposer = EMD(x)
    >>> imfs = decomposer.decompose()
    >>> plot_imfs(x, imfs, t) #doctest: +SKIP
    .. plot:: ../../docs/examples/simple_emd.py
    """
    is_bivariate = np.any(np.iscomplex(signal))
    if time_samples is None:
        time_samples = np.arange(signal.shape[0])

    n_imfs = imfs.shape[0]

    fig = plt.figure(num=fignum, dpi = 250)
    axis_extent = max(np.max(np.abs(imfs[:-1, :]), axis=0))

    # Plot original signal
    ax = plt.subplot(n_imfs + 1, 1, 1)
    if is_bivariate:
        ax.plot(time_samples, np.real(signal), 'b')
        ax.plot(time_samples, np.imag(signal), 'k--')
    else:
        ax.plot(time_samples, signal)
    ax.axis([time_samples[0], time_samples[-1], signal.min(), signal.max()])
    ax.tick_params(which='both', left=False, bottom=False, labelleft=False,
                   labelbottom=False)
    ax.grid(False)
    ax.set_ylabel('Signal')
    ax.set_title('Empirical Mode Decomposition')

    # Plot the IMFs
    for i in range(n_imfs - 1):
        ax = plt.subplot(n_imfs + 1, 1, i + 2)
        if is_bivariate:
            ax.plot(time_samples, np.real(imfs[i]), 'b')
            ax.plot(time_samples, np.imag(imfs[i]), 'k--')
        else:
            ax.plot(time_samples, imfs[i])
        ax.axis([time_samples[0], time_samples[-1], -axis_extent, axis_extent])
        ax.tick_params(which='both', left=False, bottom=False, labelleft=False,
                       labelbottom=False)
        ax.grid(False)
        ax.set_ylabel('imf' + str(i + 1))

    # Plot the residue
    ax = plt.subplot(n_imfs + 1, 1, n_imfs + 1)
    if is_bivariate:
        ax.plot(time_samples, np.real(imfs[-1]), 'r')
        ax.plot(time_samples, np.imag(imfs[-1]), 'r--')
    else:
        ax.plot(time_samples, imfs[-1])
    ax.axis('tight')
    ax.tick_params(which='both', left=False, bottom=False, labelleft=False,
                   labelbottom=False)
    ax.grid(False)
    ax.set_ylabel('res.')

    if show:  # pragma: no cover
        plt.savefig("../Visuals/IMF_example.png", bbox_inches="tight")
        plt.show()
    return fig

In [None]:
# Plot IMFs
plot_imfs2(x, imfs, t)

In [None]:
# Plot signal example
plt.figure(dpi = 150)

plt.plot(np.linspace(0,2, 1332), features.reshape(2717, 20, -1)[0][0])
plt.xlabel("Seconds")
plt.ylabel("Processed signal")
plt.savefig("../Visuals/raw_signal_example.png")
plt.show()