Use CNN Network to process the data

In [1]:
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.preprocessing import StandardScaler
from keras.layers import Input, Dense, Flatten, Reshape, BatchNormalization
from keras.layers import Conv1D, UpSampling1D, MaxPooling1D, AveragePooling1D
from keras.models import Model, Sequential
from keras.layers import ReLU
from keras.optimizers import RMSprop
import tensorflow as tf


csv_file = '/Users/robert/src/MachineLearning/processed_result/merged_data.csv'
sname = ["ted", "Lei", "Lema", "Erin", "Claire", "jaden", "wenxin"]
lblList = ['Relaxation', 'CPT Test', 'Stroop Test', 'Math Test', 'Video 1', 'Video 2']
feat = ["EDA"]
sc = 'Shimmer_FCF4_GSR_Skin_Conductance_CAL'
eda = 'Normalized_GSR'
lbl = 'Event_Label'

sf_EDA = 4
window = 0.25

In [2]:
from sklearn.metrics import accuracy_score, f1_score

In [3]:
df = pd.read_csv(csv_file)
df.columns

Index(['Name', 'EDA', 'Label'], dtype='object')

In [4]:
class cnn_model:
    def __init__(self, **kwargs):
        self.df = pd.read_csv(csv_file)

        self.batch_size = int(sf_EDA*window) 

        self.sname = self.df["Name"].unique()
        self.K = len(self.df["Label"].unique())
        
    def one_hot_enc(self, r, k):
        new_r = np.zeros((r.shape[0],k))
        for i, val in enumerate(r):
            new_r[i, val-1] = 1

        return new_r
    
    def get_data(self, test_id, v_batch_size, v_feat_list, df):
        
        cnt=0
        
        for name in self.sname:
            df_s = df[df["Name"] == name]

            n = (len(df_s)//v_batch_size)*v_batch_size
            df_s = df_s[:n]
            s = StandardScaler().fit_transform(df_s[v_feat_list])
            s = s.reshape(int(s.shape[0]/v_batch_size), s.shape[1],  v_batch_size)

            lbl_m = np.zeros((s.shape[0],1))
            lbl = df_s["Label"].values.astype(int)
            for i in range(s.shape[0]):
                lbl_m[i] = int((stats.mode(lbl[i * v_batch_size : (i + 1) * v_batch_size - 1]))[0].squeeze())
            y_k = lbl_m.astype(int)
            s_y = self.one_hot_enc(lbl_m.astype(int), self.K).astype(int)
            if name==test_id:
                x_test = s
                y_test = s_y
                yk_test = y_k
            else:
                if cnt:
                    x_train = np.concatenate((x_train, s), axis=0)
                    y_train = np.concatenate((y_train, s_y), axis=0)
                    yk_train = np.concatenate((yk_train, y_k), axis=0)
                else:
                    x_train = s
                    y_train = s_y
                    yk_train = y_k
                cnt +=1


        print ("merged train:", x_train.shape, y_train.shape)
        print ("merged test :", x_test.shape, y_test.shape)
        return x_train, y_train, x_test, y_test, yk_train, yk_test

    def cnn_model(self, v_batch_size, n_feat):
        
        input_sig = Input(shape=(n_feat, v_batch_size))
        x = Conv1D(v_batch_size,4, activation='relu', padding='same')(input_sig)

        x1 = BatchNormalization()(x)
        flat = Flatten()(x1)
        encoded = Dense(4, activation='relu')(flat)
        cls = Dense(5, activation='softmax')(encoded)

        model= Model(input_sig, cls)

        return model 
    
    def test_model (self, v_df):
        scores = []
        for name in self.sname:
            print("============= test subject " +name+ " ==================")
            x_train, y_train, x_test, y_test, yk, yk_test = self.get_data (test_id = name,
                                                                           v_batch_size=sf_EDA,
                                                                           v_feat_list=feat, 
                                                                           df=self.df)
            model = self.cnn_model(v_batch_size=sf_EDA, n_feat=len(feat))
            model.compile(optimizer=RMSprop(learning_rate=0.00025), loss="categorical_crossentropy")
            history = model.fit(x_train, y_train, epochs=4)

            pred_train = model.predict(x_train)
            pred_test = model.predict(x_test)
            acc = accuracy_score(np.argmax(y_test, axis=1), np.argmax(pred_test, axis=1))
            f1 = f1_score(np.argmax(y_test, axis=1), np.argmax(pred_test, axis=1), average='weighted')

            scores.append([name, acc, f1])
        return scores

In [None]:
cnn_m = cnn_model()
scores = cnn_m.test_model(df)

