In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics
import matplotlib.pyplot as plt
from sklearn import metrics

from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

In [None]:
remove_baseline_time = False
# create a list of k folds
number_of_folds = 10
lower_bound = 10
# upper_bound = 550 
# Because we are cutting the length of each unit to match the length of power
upper_bound = 450 
power_length = 119

# If true, we ignore neural case and only calssfiy between positive and negative
binary_classification = False

In [None]:
def assign_units_to_folds(df, folds, lower_bound, upper_bound):
    """
    Execute before k-fold cross-validation: extract each unit, assign it to different folds in order
    df: The dataframe that contains data of each patient
    lower_bound: Experiment time's lower bound indicating the beginning of each unit
    upper_bound: Experiment time's upper bound indicating the ending of each unit
    """
    num_folds = len(folds)
    num_rows = len(df)
    j = 1
    i = fold_pointer = 0
    while j <= num_rows - 1:
        prev_time = df.iloc[j-1]["Time"]
        time = df.iloc[j]["Time"]
        # if the time jumps from upper_bound(250) to a time smaller than the lower_bound (-80)
        # we find a unit
        if(time < lower_bound and prev_time > upper_bound):
            unit = df.iloc[i:j]
            folds[fold_pointer].append(unit)
            fold_pointer = (fold_pointer + 1) % num_folds
            i = j
        j = j + 1
    last_unit = df.iloc[i : j]
    folds[fold_pointer].append(last_unit)
    
def concat_dataframes(fold_list, remove_columns_names):
    """
    concatenate lists of dataframes to one dataframe and drop the specified columns if needed
    fold_list: a list of folds 
    remove_columns_names: a list of names of columns you want to exclude
    """
    folds_concat = []
    for fold in fold_list:
        folds_concat.append(pd.concat(fold, ignore_index=True).drop(columns=remove_columns_names))
    return folds_concat

def get_features(subject_data):
    column_names = subject_data.columns
    alpha_columns = [i for i in column_names if "alpha" in i]
    beta_columns = [i for i in column_names if "beta" in i]
    theta_columns = [i for i in column_names if "theta" in i]
    
    alpha = subject_data.loc[:, alpha_columns]
    beta = subject_data.loc[:, beta_columns]
    theta = subject_data.loc[:, theta_columns]
    
    alpha_std = np.std(alpha, axis=1)
    beta_std = np.std(beta, axis=1)
    theta_std = np.std(theta, axis=1)
    
    alpha_mean = np.mean(alpha, axis=1)
    beta_mean = np.mean(beta, axis=1)
    theta_mean = np.mean(theta, axis=1)
    
    #Concate feature
    feature = np.array([theta_std,theta_mean,alpha_std,alpha_mean,beta_std,beta_mean])
    feature = feature.T

    return feature

def match_power(df, df_power):
    res = pd.DataFrame()
    zero_idx = df.index[df['Time'] == 0].tolist()
    for idx in zero_idx:
        df_temp = df.iloc[idx:idx + power_length, :]
        res = pd.concat([res, df_temp], axis=0)
    res = res.reset_index()
#     df_power.rename(columns={"0": "TP9", "1: "AF7", 2:"AF8", 3:"TP10"})
    df_power = df_power.set_axis(["TP9_Power", "AF7_Power", "AF8_Power", "TP10_Power"], axis=1)
                
    df_res = pd.concat([res, df_power], axis=1)
    return df_res

In [None]:
# Read negative, neutral, and positive data from data folder
subject_id = "02"
states = ["neg", "neu", "pos"]
bands = ["alpha", "beta", "theta"]
neg_neu_pos = []
    
for state in states:
    df = pd.DataFrame()
    for band in bands:
        df_temp = pd.read_csv(f"data/{subject_id}{state}_filt_{band}.csv")
        df_temp = df_temp.rename(columns={"TP9":f"TP9_{band}",
                                          "AF7":f"AF7_{band}",
                                          "AF8":f"AF8_{band}",
                                          "TP10":f"TP10_{band}"})
        # remove the time column for beta and theta
        if band in ["beta", "theta"]:
            df_temp = df_temp.drop([f"Time"], axis=1)
    
        df = pd.concat([df, df_temp], axis=1)
    neg_neu_pos.append(df)

subject_negative = neg_neu_pos[-1]
subject_neutral = neg_neu_pos[0]
subject_positive = neg_neu_pos[1]

In [None]:
neg_power = pd.read_csv(f"data/{subject_id}neg_Alpha_Power.csv", header = None)
neg_power = neg_power.transpose()

neu_power = pd.read_csv(f"data/{subject_id}neu_Alpha_Power.csv", header = None)
neu_power = neu_power.transpose()

pos_power = pd.read_csv(f"data/{subject_id}pos_Alpha_Power.csv", header = None)
pos_power = pos_power.transpose()

In [None]:
neg_power.tail()

In [None]:
subject_negative = match_power(subject_negative, neg_power)
subject_negative = subject_negative.drop(columns = ["index"])

subject_neutral = match_power(subject_neutral, neu_power)
subject_neutral = subject_neutral.drop(columns = ["index"])

subject_positive = match_power(subject_positive, pos_power)
subject_positive = subject_positive.drop(columns = ["index"])

In [None]:
# Suppose negative = -1; neutral = 0, and positive = 1
subject_negative["y"] = -1
subject_neutral["y"] = 0
subject_positive["y"] = 1

# Concatenate all three datasets
subject_data = pd.concat([subject_negative, subject_neutral, subject_positive], ignore_index=True)

if remove_baseline_time:
    subject_data = subject_data.loc[subject_data["Time"] > 0]
subject_data = subject_data.reset_index(drop=True)

In [None]:
# feature = get_features(subject_data)
# df_newFeature = pd.DataFrame(feature, columns = ['theta_std','theta_mean','alpha_std',
#                                                  'alpha_mean','beta_std','beta_mean'
#                                                 ])
# df = pd.concat([subject_data, df_newFeature], axis=1)
df=subject_data
len(df)

In [None]:
df.head()

In [None]:
# Calculate Frontal Alpha Asymmetry
power_AF8 = np.array(df["AF8_Power"])
power_AF7 = np.array(df["AF7_Power"])
#frontal_alpha_asymmetry = np.log(power_AF8 - power_AF7)
#df["frontal_alpha_asymmetry"] = frontal_alpha_asymmetry
df=df.dropna()
#df["frontal_alpha_asymmetry"] = df["frontal_alpha_asymmetry"].fillna(0)

In [None]:
df.tail()

In [None]:
df.describe()

In [None]:
df.isnull().values.any()

In [None]:
#y_column = df.pop("y")
#df["y"] = y_column.replace(np.nan, 0)
y_column=df["y"]

In [None]:
len(y_column)

In [None]:
number_of_neighbors = 10
if binary_classification:
    df = df[df['y'] != 0]
    #number_of_neighbors = 2
folds = [[] for i in range(number_of_folds)]
assign_units_to_folds(df, folds, lower_bound, upper_bound)
columns = df.columns
# Index(['Time', 'TP9_alpha', 'AF7_alpha', 'AF8_alpha', 'TP10_alpha', 'TP9_beta',
#        'AF7_beta', 'AF8_beta', 'TP10_beta', 'TP9_theta', 'AF7_theta',
#        'AF8_theta', 'TP10_theta', 'theta_std', 'theta_mean', 'alpha_std',
#        'alpha_mean', 'beta_std', 'beta_mean', 'y'],
columns_to_remove = []
columns_to_remove.append(columns[0])
columns_to_remove.extend([i for i in columns if "TP" in i])


#columns_to_remove.extend(columns[0:12])
print("Columns removed", columns_to_remove)
folds_concat = concat_dataframes(folds, columns_to_remove)

In [None]:
pd.concat(folds_concat[:-1], ignore_index=True)

In [None]:
names = [
   # "Adaboost",
     "RandomForest",
    # "GradientBoost",
    #'Nearest Neighbors',
  # 'LDA',
]

aTreeClassifier=RandomForestClassifier(n_estimators=100, max_features="sqrt", oob_score=True)
acc_res = {}
for name in names:
    accuracy_lst = []
    for _ in range(number_of_folds):
        if name == "Adaboost":
            clf = AdaBoostClassifier()
        if name == "RandomForest":
            clf = RandomForestClassifier(n_estimators=100, max_features="sqrt", oob_score=True)
        if name == "GradientBoost":
            clf = GradientBoostingClassifier()
        if name == 'Nearest Neighbors':
            clf = KNeighborsClassifier(n_neighbors=number_of_neighbors)
        if name == "LDA":
            clf = LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto')
        train_data = pd.concat(folds_concat[:-1], ignore_index=True)
        # take the last fold as the test set
        test_data = folds_concat[-1]
        # move the last fold to the beginning of the list of folds
        folds_concat = folds_concat[-1:] + folds_concat[:-1]
        train_X = train_data.iloc[:, :-1]
        train_Y = train_data.iloc[:, -1]
        test_X = test_data.iloc[:, :-1]
        test_Y = test_data.iloc[:, -1]
        clf.fit(train_X, train_Y)
        if name == "RandomForest":
            aTreeClassifier = clf
        y_predict = clf.predict(test_X)
        accuracy = metrics.accuracy_score(y_predict,test_Y)
        accuracy_lst.append(accuracy)
    avg_acc = round(sum(accuracy_lst) / len(accuracy_lst),3)
    acc_res[name] = avg_acc
    print(f"{name} yields accuracy of {avg_acc}")

In [None]:
train_X

In [None]:
from sklearn.model_selection import train_test_split
from sklearn import tree
x=df[['AF7_Power', 'AF8_Power']] 
y=df['y']# split data randomly
X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=0.4, random_state = 135)
#Models:
#1. trees
# aTreeClassifier1 = tree.DecisionTreeClassifier(max_depth=5)
# aTreeClassifier1 = aTreeClassifier1.fit(X_train, Y_train)
# Y_pred = aTreeClassifier1.predict( X_test )

aTreeClassifier = tree.DecisionTreeClassifier(max_depth=5)
aTreeClassifier = aTreeClassifier.fit(X_train, Y_train)
Y_pred = aTreeClassifier.predict( X_test )

print( 'Accuracy = ', sum( Y_pred == Y_test ) / len( Y_test ) )

In [None]:
import matplotlib.pyplot as plt
from sklearn.tree import plot_tree

# test = aTreeClassifier.estimators_[10]
fig = plt.figure(figsize=(15, 10))
# temp = aTreeClassifier.base_estimator_
plot_tree(aTreeClassifier, 
#           feature_names=wine.feature_names,
#           class_names=wine.target_names, 
          filled=True, impurity=True, 
          rounded=True)

In [None]:
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

colors = ['r', 'g', 'b']
X1, X2 = np.meshgrid(np.arange(start = test_X['AF7_Power'].min()-1, stop= test_X['AF7_Power'].max()+1, step = 0.01), np.arange(start = test_X['AF8_Power'].min()-1, stop= test_X['AF8_Power'].max()+1, step = 0.01))
print(X1.shape)
plt.contourf(X1, X2, aTreeClassifier.predict(np.array([X1.ravel(), X2.ravel()]).T).reshape(X1.shape), alpha=0.75, cmap = ListedColormap(colors))

# plt.contourf(X1, X2, aTreeClassifier.predict(np.array([X1, X2])).reshape(X1.shape), alpha=0.75, cmap = ListedColormap(colors) )
plt.xlim(X1.min(), X1.max())
plt.ylim(X2.min(), X2.max())
for i,j in enumerate(np.unique(test_Y)):
    plt.scatter(test_X['AF7_Power'][test_Y==j], test_X['AF8_Power'][test_Y==j], color=colors[i], label = j)
plt.title("Decision Tree Results (Test set)")
plt.xlabel("AF7_Power")
plt.ylabel("AF8_Power")
plt.legend()
plt.show()