In [1]:
import numpy as np
import pickle
import sklearn.metrics as metrics
import argparse
import myutils

from keras.layers import Conv2D, MaxPooling2D, LSTM, Dense, Flatten, Permute, Reshape
from keras.models import Sequential, model_from_json
from keras.optimizers import Adam
from keras.losses import binary_crossentropy, categorical_crossentropy

Using TensorFlow backend.


In [2]:
method = 0
direction = 0
augment = 0
print('Method: {}, Direction: {}, Augment: {}'.format(method, direction, augment))

Method: 0, Direction: 0, Augment: 0


In [3]:
#path = 'C:/ASM/DevData/activity_quaternion/'
path=''   
with open(path+'data/processed.pkl', 'rb') as file:
    data = pickle.load(file)

In [4]:
def get_train_test_data(data):
    xtrain, ytrain = np.empty((0, 18)), np.empty((0,), dtype=int)
    xtest, ytest = np.empty((0, 18)), np.empty((0,), dtype=int)
    for s in range(4):
        for sess in range(6):
            if s>0 and sess>=4:
                xtest = np.concatenate((xtest, data[s][sess]['right']))
                ytest = np.concatenate((ytest, data[s][sess]['labels'][:, -1]))
            else:
                xtrain = np.concatenate((xtrain, data[s][sess]['right']))
                ytrain = np.concatenate((ytrain, data[s][sess]['labels'][:, -1]))

                
    ytrain = myutils.change_labels(ytrain)
    ytest = myutils.change_labels(ytest)
    
    res ={'xtrain':xtrain, 'xtest':xtest, 'ytrain':ytrain, 'ytest':ytest}    
    return res

In [5]:
def get_windows(x, y, win_len, step_size, augment_vals=[]):    
    num_classes=18
    r, c = x.shape
    ix = np.arange(0, r-win_len, step_size)
    count = len(ix)
    resX, resY = np.zeros((count, win_len, c)), np.zeros((count, num_classes), dtype=int)
        
    for i in range(count):
        s = ix[i]        
        resX[i, :, :] = x[s:s+win_len, :]
        label = y[s+win_len-1]
        resY[i, label] = 1
    
    resX = resX.reshape((count, win_len, c, 1))
    return resX, resY    

In [6]:
def get_train_test_windows(data, win_len, step_size, augment_vals=[]):
    
    xtrain, xtest, ytrain, ytest = data['xtrain'], data['xtest'], data['ytrain'], data['ytest']    
    xtrain_norm, xtest_norm = myutils.znorm(xtrain[:, :9], xtest[:, :9]) 
    
    if method == 0 and direction == 0:
        xtrain = np.concatenate((xtrain_norm[:, :6], xtrain[:, -3:]), axis = 1)
        xtest = np.concatenate((xtest_norm[:, :6], xtest[:, -3:]), axis = 1)
    elif method ==0 and direction==1:
        xtrain = np.concatenate((xtrain_norm[:, :6], xtrain[:, -9:]), axis = 1)
        xtest = np.concatenate((xtest_norm[:, :6], xtest[:, -9:]), axis = 1)
    elif method==1 and direction==0:
        xtrain, xtest = xtrain_norm[:, :6], xtest_norm[:, :6]
    
    print('label max: ', np.max(ytrain), np.max(ytest))    
    xtrain, ytrain = get_windows(xtrain, ytrain, win_len, step_size, augment_vals)
    xtest, ytest = get_windows(xtest, ytest, win_len, step_size, augment_vals)
    print('label max: ', np.max(ytrain), np.max(ytest))    
    
    res ={'xtrain':xtrain, 'xtest':xtest, 'ytrain':ytrain, 'ytest':ytest}    
    return res

In [7]:
def train_model(X_train, Y_train, batch_size, epochs):    
    NUM_FILTERS = 64    
    FILTER_SHAPE = (5, 1)
    NUM_UNITS_LSTM = 128
    NUM_CLASSES = Y_train.shape[1]
    
    input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3])
    print(input_shape)
    

    model = Sequential()
    model.add(Conv2D(NUM_FILTERS, kernel_size=FILTER_SHAPE, padding ='valid', strides=(1, 1), activation='relu',
                     input_shape=input_shape))
    #model.add(MaxPooling2D(pool_size=(4, 1), strides=(4, 1)))
    model.add(Conv2D(NUM_FILTERS, kernel_size=FILTER_SHAPE, padding ='valid', strides=(1, 1), activation='relu'))
    model.add(Conv2D(NUM_FILTERS, kernel_size=FILTER_SHAPE, padding ='valid', strides=(1, 1), activation='relu'))
    model.add(Conv2D(NUM_FILTERS, kernel_size=FILTER_SHAPE, padding ='valid', strides=(1, 1), activation='relu'))
        
    #model.add(Permute((2, 1, 3)))    
    
    model.add(Flatten())    
    model.add(Dense(100, activation='relu'))
    model.add(Dense(100, activation='relu'))    
    
    #model.add(Reshape((-1, NUM_FILTERS*input_shape[1])))
    #model.add(LSTM(NUM_UNITS_LSTM, return_sequences=True))
    #model.add(LSTM(NUM_UNITS_LSTM))
    
    model.add(Dense(NUM_CLASSES, activation='softmax'))

    print(model.summary())
    model.compile(loss = 'categorical_crossentropy',
                  optimizer= 'rmsprop',
                  metrics=['accuracy'])

    model.fit(X_train, Y_train,
              batch_size=batch_size,            
              epochs=epochs, verbose=1)

    return model
    

In [8]:
def test_model(model, X_test, Y_test):
    count = Y_test.shape[0]
    
    Y_test = np.argmax(Y_test, axis=1).reshape((count,1))    
    Y_prob = model.predict(X_test, verbose=0)
    Y_pred = np.argmax(Y_prob, axis=1).reshape((count,1))    
    Y = np.concatenate((Y_test, Y_pred, Y_prob), axis=1)

    return Y

In [None]:
win_len = 30
step_size = 15
batch_size = 64
epochs = 100
model_str=str(method)+str(direction)+str(augment)

augment_vals = []
if augment==1:    
    augment_vals = [.8, .9, 1.1, 1.2]

print('\n##################################################')
print('Window len: {}, Method: {}, Direction: {}, Augment: {}, Augment_Vals:{}'.format(win_len, method, direction, augment, augment_vals))
print('##################################################\n')

d = get_train_test_data(data)
xtrain, xtest, ytrain, ytest = d['xtrain'], d['xtest'], d['ytrain'], d['ytest']
print('label max: ', np.max(ytrain), np.max(ytest))    
#print(np.unique(ytrain), np.unique(ytest))
print('data shapes: ', xtrain.shape, ytrain.shape, xtest.shape, ytest.shape)
print('')

w = get_train_test_windows(d, win_len, step_size, augment_vals)
xtrain, xtest, ytrain, ytest = w['xtrain'], w['xtest'], w['ytrain'], w['ytest']
print('\nlabel max: ', np.max(ytrain), np.max(ytest))    
print('window data shapes: ', xtrain.shape, ytrain.shape, xtest.shape, ytest.shape)
print('label sums: ', np.sum(ytrain), np.sum(ytest))

model = train_model(xtrain, ytrain, batch_size, epochs)
model.save(path + "models/model_{}_{}.h5".format(model_str, win_len))

Y = test_model(model, xtest, ytest)
model.save(path + "results/result_{}_{}.csv".format(model_str, win_len))

test_true = Y[:, 0]
test_pred = Y[:, 1]
print("\tTest fscore(weighted):\t{} ".format(metrics.f1_score(test_true, test_pred, average='weighted')))
print("\tTest fscore:\t{} ".format(metrics.f1_score(test_true, test_pred, average=None)))
