In [1]:
# Load packages
from natsort import os_sorted
import tensorflow.keras
import pandas as pd
import numpy as np
import math
import glob
import os

In [7]:
# Compile the data
# Lie = 0 and Truth = 1

CSV_len = 9600
subjects = pd.read_csv('Subjects.csv')

def preprocess(folder):
    
    initial = pd.concat(map(pd.read_csv, os_sorted(glob.glob(os.path.join(folder, '*.csv')))), ignore_index = True)
    initial['Truth'] = 0

    for index, row in subjects.iterrows():
        if row['Truth'] == 1:
            start = index * CSV_len
            initial.loc[start : start + (CSV_len - 1), 'Truth'] = 1
    
    return initial

ASR  = preprocess('LieWaves/Preprocessing/ASR')
ATAR = preprocess('LieWaves/Preprocessing/ATAR')
BPF  = preprocess('LieWaves/Preprocessing/BPF')
ICA  = preprocess('LieWaves/Preprocessing/ICA')

print(ASR)
print(ATAR)
print(BPF)
print(ICA)

         EEG.AF3    EEG.T7    EEG.Pz    EEG.T8    EEG.AF4  Truth
0      -43.19554   4.66139  -0.64575  23.82558  -62.21445      1
1      -63.68361   1.01177  -4.66672   4.57042  -86.64381      1
2      -84.83486 -12.12162 -19.39024 -24.10019 -109.01842      1
3      -84.44360 -14.67686 -31.12580 -31.14285 -107.22171      1
4      -80.87276  -6.00513 -30.95662 -33.21887 -104.48116      1
...          ...       ...       ...       ...        ...    ...
518395  -5.43493  -8.02602  -4.94244   3.13627    1.80464      0
518396   1.46504   4.63407  10.93335  20.79820    9.05531      0
518397   0.84562  12.25325  16.73756  29.07788   13.40324      0
518398   6.34726   8.18967  10.69846  20.65893   10.84177      0
518399   3.17551  -5.29942  -0.86906   9.15166    1.90670      0

[518400 rows x 6 columns]
         EEG.AF3   EEG.T7    EEG.Pz    EEG.T8   EEG.AF4  Truth
0      -23.06646 -3.41870  -4.71324  -3.49983 -31.21507      1
1      -63.18020 -3.45809 -10.15932  -4.01336 -84.79472      1
2   

In [13]:
# Partition the data into 80% training data and 20% testing data

all_data = pd.concat([ASR, ATAR, BPF, ICA], ignore_index = True)
all_data = np.array_split(all_data, 54 * 4)

num_rows = len(all_data)
first80p = math.floor(0.8 * num_rows)
last20p = num_rows - first80p

x_train = [df.drop(['Truth'], axis = 1) for df in all_data[:first80p]]
y_train = [df['Truth'] for df in all_data[:first80p]]

x_test = [df.drop(['Truth'], axis = 1) for df in all_data[-last20p:]]
y_test = [df['Truth'] for df in all_data[-last20p:]]

x_train = np.array(x_train)
y_train = np.array(y_train)
x_test = np.array(x_test)
y_test = np.array(y_test)

#x_train = np.reshape(x_train, (x_train.shape[0], 1, x_train.shape[1]))
#x_test = np.reshape(x_test, (x_test.shape[0], 1, x_test.shape[1]))

print('Shape of x_train:\t' + str(x_train.shape))
print('Shape of y_train:\t' + str(y_train.shape))
print('Shape of x_test:\t' + str(x_test.shape))
print('Shape of y_test:\t' + str(y_test.shape))
print('Number of classes:\t' + str(np.max(y_train) - np.min(y_train) + 1))

  return bound(*args, **kwds)


Shape of x_train:	(172, 9600, 5)
Shape of y_train:	(172, 9600)
Shape of x_test:	(44, 9600, 5)
Shape of y_test:	(44, 9600)
Number of classes:	2


In [14]:
# One-Hot Encoding
# Transforms a scalar label to a k-dimensional vector
# Lie   = 0 = [ 1 , 0 ]
# Truth = 1 = [ 0 , 1 ]

def to_one_hot(y, num_class = 2):
    
    results = np.zeros((len(y), num_class))
    for i, label in enumerate(y): results[i, label] = 1.
    return results

y_train_vec = to_one_hot(y_train)
y_test_vec = to_one_hot(y_test)

print('Shape of y_train_vec:\t' + str(y_train_vec.shape))
print('Shape of y_test_vec:\t' + str(y_test_vec.shape))

Shape of y_train_vec:	(172, 2)
Shape of y_test_vec:	(44, 2)


In [15]:
# Randomly parition the training set into validation and non-validation sets

train_rows = len(y_train_vec)
train_80p = math.floor(0.8 * train_rows)

rand_indices = np.random.permutation(train_rows)
train_indices = rand_indices[0: train_80p]
valid_indices = rand_indices[train_80p: train_rows]

x_trn = x_train[train_indices, :]
y_trn = y_train_vec[train_indices, :]

x_val = x_train[valid_indices, :]
y_val = y_train_vec[valid_indices, :]

print('Shape of x_trn:\t\t' + str(x_trn.shape))
print('Shape of y_trn:\t\t' + str(y_trn.shape))
print('Shape of x_val:\t\t' + str(x_val.shape))
print('Shape of y_val:\t\t' + str(y_val.shape))

Shape of x_trn:		(137, 9600, 5)
Shape of y_trn:		(137, 2)
Shape of x_val:		(35, 9600, 5)
Shape of y_val:		(35, 2)


In [16]:
# Build the model

from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras import optimizers

model = models.Sequential()

# Convolutional layers
model.add(layers.Conv1D(256, 1, activation = 'relu', input_shape = (9600, 5)))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling1D(pool_size = 2, strides = 1, padding = 'same'))
model.add(layers.Dropout(0.25))
model.add(layers.Conv1D(128, 1, activation = 'relu'))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling1D(pool_size = 2, strides = 1, padding = 'same'))
model.add(layers.Dropout(0.25))
model.add(layers.Conv1D(64, 1, activation = 'relu'))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling1D(pool_size = 2, strides = 1, padding = 'same'))
model.add(layers.Dropout(0.25))

# Fully-connected layers
model.add(layers.Flatten())
model.add(layers.Dense(256, activation = 'relu'))
model.add(layers.Dense(128, activation = 'relu'))
model.add(layers.Dense(64, activation = 'relu'))
model.add(layers.Dense(2, activation = 'softmax'))

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d (Conv1D)             (None, 9600, 256)         1536      
                                                                 
 batch_normalization (BatchN  (None, 9600, 256)        1024      
 ormalization)                                                   
                                                                 
 max_pooling1d (MaxPooling1D  (None, 9600, 256)        0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 9600, 256)         0         
                                                                 
 conv1d_1 (Conv1D)           (None, 9600, 128)         32896     
                                                                 
 batch_normalization_1 (Batc  (None, 9600, 128)        5

In [17]:
# Define model optimizer and loss function

model.compile(
    optimizers.Adam(learning_rate = 0.001),
    loss = 'categorical_crossentropy',
    metrics = ['accuracy']
)

In [18]:
# Train the model and store parameters and loss values

history = model.fit(x_trn, y_trn, batch_size = 3, epochs = 25, validation_data = (x_val, y_val))
model.save('seqAllOne.keras')

Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25
Epoch 17/25
Epoch 18/25
Epoch 19/25
Epoch 20/25
Epoch 21/25
Epoch 22/25
Epoch 23/25
Epoch 24/25
Epoch 25/25


In [38]:
#from tensorflow.keras import models

#model = models.load_model('seq18.h5')
#model.save('seq18.keras')
#model = models.load_model('seq18.keras')

# Evaluate testing accuracy on the testing dataset 

loss_and_acc = model.evaluate(x_test, y_test_vec)



In [40]:
# Test single data points
# Lie   = 0 = [ 1 , 0 ]
# Truth = 1 = [ 0 , 1 ]

test_lie = pd.read_csv(os.path.join('LieWaves/Preprocessing/ICA/S24S2.csv'))
test_lie = np.reshape(np.array(test_lie), (1, 9600, 5))
pred_lie = model.predict(test_lie)
print("Lie Test:", pred_lie[0])

test_truth = pd.read_csv(os.path.join('LieWaves/Preprocessing/BPF/S27S1.csv'))
test_truth = np.reshape(np.array(test_truth), (1, 9600, 5))
pred_truth = model.predict(test_truth)
print("Truth Test:", pred_truth[0])

Lie Test: [1. 0.]
Truth Test: [0. 1.]


In [42]:
# Unit Testing

import unittest 

class TestModel(unittest.TestCase):
    
    def test_lie_lie(self):
        self.assertGreaterEqual(pred_lie[0][0], 0)
        self.assertLessEqual(pred_lie[0][0], 1)
        
    def test_lie_truth(self):
        self.assertGreaterEqual(pred_lie[0][1], 0)
        self.assertLessEqual(pred_lie[0][1], 1)
        
    def test_truth_lie(self):
        self.assertGreaterEqual(pred_truth[0][0], 0)
        self.assertLessEqual(pred_truth[0][0], 1)
        
    def test_truth_truth(self):
        self.assertGreaterEqual(pred_truth[0][1], 0)
        self.assertLessEqual(pred_truth[0][1], 1)

unittest.main(argv = [''], verbosity = 2, exit = False)

test_lie_lie (__main__.TestModel) ... ok
test_lie_truth (__main__.TestModel) ... ok
test_truth_lie (__main__.TestModel) ... ok
test_truth_truth (__main__.TestModel) ... ok

----------------------------------------------------------------------
Ran 4 tests in 0.016s

OK


<unittest.main.TestProgram at 0x1883e0a3220>