In [33]:
import os
import numpy as np
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as nps
import matplotlib.pyplot as plt

In [34]:
# helper functions

def series_append(series, list, keys):
    for i in range(64):
        series[keys[i]].append(float(list[i]))
    return series

def load_series(filename):
    with open(filename, 'r') as csv_in:
        csv_file = list(csv.reader(csv_in))
        series = {}
        keys = csv_file[0]
        for key in keys: series[key] = []
        for i in range(2, len(csv_file), 2):
            series = series_append(series, csv_file[i], keys)
        return [series, int((len(csv_file) - 2) / 2)]


def make_2D(series):
    l = []
    for key in series:
        if (key[-1] == 'v' or key[-1] == 'z'): continue
        key_list = []
        for e in series[key]:
            key_list.append([e]) # try brackets here
        l.append(key_list)
    return l

def E3(pred, truth):
    num_correct = 0
    for i in range(len(pred)):
        if (abs(pred[i] - truth[i]) <= (7.0/18.0)):
            num_correct += 1
    return (100.0 * float(num_correct) / float(len(pred)))

def E2(pred, truth):
    num_correct = 0
    for i in range(len(pred)):
        if (abs(pred[i] - truth[i]) <= (5.0/18.0)):
            num_correct += 1
    return (100.0 * float(num_correct) / float(len(pred)))

def E1(pred, truth):
    num_correct = 0
    for i in range(len(pred)):
        if (abs(pred[i] - truth[i]) <= (3.0/18.0)):
            num_correct += 1
    return (100.0 * float(num_correct) / float(len(pred)))

def E0(pred, truth):
    num_correct = 0
    for i in range(len(pred)):
        if (abs(pred[i] - truth[i]) <= (1.0/18.0)):
            num_correct += 1
    return (100.0 * float(num_correct) / float(len(pred)))

def eval(pred, truth):
    o_pred = []
    o_truth = []
    f_pred = []
    f_truth = []
    b_pred = []
    b_truth = []
    s_pred = []
    s_truth = []
    l_pred = []
    l_truth = []
    for p in pred:
        o_pred.append(p[0])
        f_pred.append(p[1])
        b_pred.append(p[2])
        s_pred.append(p[3])
        l_pred.append(p[4])
    for t in truth:
        o_truth.append(t[0])
        f_truth.append(t[1])
        b_truth.append(t[2])
        s_truth.append(t[3])
        l_truth.append(t[4])
    return [[E0(o_pred, o_truth), E1(o_pred, o_truth), E2(o_pred, o_truth), E3(o_pred, o_truth)],
            [E0(f_pred, f_truth), E1(f_pred, f_truth), E2(f_pred, f_truth), E3(f_pred, f_truth)],
            [E0(b_pred, b_truth), E1(b_pred, b_truth), E2(b_pred, b_truth), E3(b_pred, b_truth)],
            [E0(s_pred, s_truth), E1(s_pred, s_truth), E2(s_pred, s_truth), E3(s_pred, s_truth)],
            [E0(l_pred, l_truth), E1(l_pred, l_truth), E2(l_pred, l_truth), E3(l_pred, l_truth)]]

In [35]:
# loading data

raw_X_train_list = []
smoothed_X_train_list = []
trans_X_train_list = []
final_X_train_list = []

y_train_list = []

raw_X_test_list = []
smoothed_X_test_list = []
trans_X_test_list = []
final_X_test_list = []

y_test_list = []

with open("..\\test_examples.txt") as t:
    test_examples = t.readlines()

for example in test_examples:
    raw_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\1_unprocessed\\" + example[:-1])
    smoothed_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\2_smoothed\\" + example[:-1])
    trans_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\3_translation\\" + example[:-1])
    final_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\4_final\\" + example[:-1])

    raw_X_test_list.append(make_2D(raw_series))
    smoothed_X_test_list.append(make_2D(smoothed_series))
    trans_X_test_list.append(make_2D(trans_series))
    final_X_test_list.append(make_2D(final_series))
    
    y_test_list.append([float(example[4]) / 9, float(example[7]) / 9, float(example[10]) / 9, float(example[13]) / 9, float(example[16]) / 9])

raw_X_test = torch.tensor(raw_X_test_list)
smoothed_X_test = torch.tensor(smoothed_X_test_list)
trans_X_test = torch.tensor(trans_X_test_list)
final_X_test = torch.tensor(final_X_test_list)

y_test = torch.tensor(y_test_list)

print(raw_X_test.size())
print(smoothed_X_test.size())
print(trans_X_test.size())
print(final_X_test.size())
print(y_test.size())
print('\n\n')

with open("..\\training_examples.txt") as t:
    training_examples = t.readlines()

for example in training_examples:
    raw_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\1_unprocessed\\" + example[:-1])
    smoothed_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\2_smoothed\\" + example[:-1])
    trans_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\3_translation\\" + example[:-1])
    final_series, num_frames = load_series("..\\time_series\\Time_normalized_stages\\4_final\\" + example[:-1])

    raw_X_train_list.append(make_2D(raw_series))
    smoothed_X_train_list.append(make_2D(smoothed_series))
    trans_X_train_list.append(make_2D(trans_series))
    final_X_train_list.append(make_2D(final_series))
    
    y_train_list.append([float(example[4]) / 9, float(example[7]) / 9, float(example[10]) / 9, float(example[13]) / 9, float(example[16]) / 9])

raw_X_train = torch.tensor(raw_X_train_list)
smoothed_X_train = torch.tensor(smoothed_X_train_list)
trans_X_train = torch.tensor(trans_X_train_list)
final_X_train = torch.tensor(final_X_train_list)

y_train = torch.tensor(y_train_list)


raw_X_train = raw_X_train.permute(0, 1, 3, 2)
raw_X_test = raw_X_test.permute(0, 1, 3, 2)

smoothed_X_train = smoothed_X_train.permute(0, 1, 3, 2)
smoothed_X_test = smoothed_X_test.permute(0, 1, 3, 2)

trans_X_train = trans_X_train.permute(0, 1, 3, 2)
trans_X_test = trans_X_test.permute(0, 1, 3, 2)

final_X_train = final_X_train.permute(0, 1, 3, 2)
final_X_test = final_X_test.permute(0, 1, 3, 2)


print(raw_X_train.size())
print(smoothed_X_train.size())
print(trans_X_train.size())
print(final_X_train.size())
print(y_train.size())

torch.Size([93, 32, 7, 1])
torch.Size([93, 32, 7, 1])
torch.Size([93, 32, 7, 1])
torch.Size([93, 32, 7, 1])
torch.Size([93, 5])



torch.Size([371, 32, 1, 7])
torch.Size([371, 32, 1, 7])
torch.Size([371, 32, 1, 7])
torch.Size([371, 32, 1, 7])
torch.Size([371, 5])


In [36]:
# setting up CNN

conv_1 = 32
conv_2 = 64
conv_3 = 128
conv_4 = 256

fc_size1 = 256
fc_size2 = 512
fc_size3 = 256
fc_size4 = 128
fc_size5 = 64
output_size = 5
batch_size = 371
learning_rate = 0.001
tmp = 0.1
    

class ConvNet(nn.Module):

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.xavier_normal_(m.weight)
            if m.bias is not None:
                m.bias.data.fill_(1.0)

    def __init__(self):
        super(ConvNet, self).__init__()
        self.apply(self._init_weights)
        # conv layers
        self.conv1 = nn.Conv2d(conv_1, conv_2, (1,3), stride=1)
        self.conv2 = nn.Conv2d(conv_2, conv_3, (1,3), stride=1)
        self.conv3 = nn.Conv2d(conv_3, conv_4, (1,3), stride=1)
        # fc layers
        self.fc1 = nn.Linear(fc_size1, fc_size2)
        self.fc2 = nn.Linear(fc_size2, fc_size3)
        self.fc3 = nn.Linear(fc_size3, fc_size4)
        self.fc4 = nn.Linear(fc_size4, fc_size5)
        self.fc5 = nn.Linear(fc_size5, output_size)
        # sigmoid
        self.sigmoid = nn.Sigmoid()
        self.temperature = tmp
    
    def forward(self, x):
        # convolutions
        x = F.leaky_relu(self.conv1(x), negative_slope=0.1)
        x = F.leaky_relu(self.conv2(x), negative_slope=0.1)
        x = F.leaky_relu(self.conv3(x), negative_slope=0.1)

        # fc layers
        x = torch.flatten(x, 1)

        x = F.leaky_relu(self.fc1(x), negative_slope=0.1)
        x = F.leaky_relu(self.fc2(x), negative_slope=0.1)
        x = F.leaky_relu(self.fc3(x), negative_slope=0.1)
        x = F.leaky_relu(self.fc4(x), negative_slope=0.1)
        x = F.leaky_relu(self.fc5(x), negative_slope=0.1)

        x = self.sigmoid(x * self.temperature)
        return x

model = ConvNet()

loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

training_cycles = 20
num_epochs = 85

In [37]:
# training and evaluating raw series

min_loss = 100.0

errors = []

for i in range(training_cycles):
    model.apply(model._init_weights)
    test_losses = []
    for epoch in range(num_epochs):
        pred_y = model(raw_X_train)
        loss = loss_function(pred_y, y_train)

        test_y = model(raw_X_test)
        test_loss = loss_function(test_y, y_test)
        test_losses.append(test_loss.item())

        model.zero_grad()
        loss.backward()

        optimizer.step()

    if (test_losses [-1] < min_loss):
        min_loss = test_losses[-1]
        test_y = model(raw_X_test)

        errors.clear()
        errors = eval(test_y, y_test.tolist())

print('MSE: ' + str(min_loss))
print('ERRORS: ' + str(errors))

MSE: 0.05123830586671829
ERRORS: [[20.43010752688172, 67.74193548387096, 82.79569892473118, 90.3225806451613], [11.827956989247312, 68.81720430107526, 79.56989247311827, 95.6989247311828], [7.526881720430108, 24.731182795698924, 66.66666666666667, 88.17204301075269], [19.35483870967742, 54.83870967741935, 75.26881720430107, 90.3225806451613], [13.978494623655914, 65.59139784946237, 80.64516129032258, 91.39784946236558]]


In [38]:
# training and evaluating smoothed series

min_loss = 100.0

errors = []

for i in range(training_cycles):
    model.apply(model._init_weights)
    test_losses = []
    for epoch in range(num_epochs):
        pred_y = model(smoothed_X_train)
        loss = loss_function(pred_y, y_train)

        test_y = model(smoothed_X_test)
        test_loss = loss_function(test_y, y_test)
        test_losses.append(test_loss.item())

        model.zero_grad()
        loss.backward()

        optimizer.step()

    if (test_losses [-1] < min_loss):
        min_loss = test_losses[-1]
        test_y = model(smoothed_X_test)

        errors.clear()
        errors = eval(test_y, y_test.tolist())

print('MSE: ' + str(min_loss))
print('ERRORS: ' + str(errors))

MSE: 0.05107159540057182
ERRORS: [[21.50537634408602, 67.74193548387096, 82.79569892473118, 90.3225806451613], [9.67741935483871, 69.89247311827957, 80.64516129032258, 95.6989247311828], [5.376344086021505, 23.655913978494624, 66.66666666666667, 88.17204301075269], [19.35483870967742, 47.31182795698925, 75.26881720430107, 88.17204301075269], [16.129032258064516, 69.89247311827957, 81.72043010752688, 93.54838709677419]]


In [39]:
# training and evaluating trans series

min_loss = 100.0

errors = []

for i in range(training_cycles):
    model.apply(model._init_weights)
    test_losses = []
    for epoch in range(num_epochs):
        pred_y = model(trans_X_train)
        loss = loss_function(pred_y, y_train)

        test_y = model(trans_X_test)
        test_loss = loss_function(test_y, y_test)
        test_losses.append(test_loss.item())

        model.zero_grad()
        loss.backward()

        optimizer.step()

    if (test_losses [-1] < min_loss):
        min_loss = test_losses[-1]
        test_y = model(trans_X_test)

        errors.clear()
        errors = eval(test_y, y_test.tolist())

print('MSE: ' + str(min_loss))
print('ERRORS: ' + str(errors))

MSE: 0.04169731214642525
ERRORS: [[18.27956989247312, 59.13978494623656, 82.79569892473118, 91.39784946236558], [31.182795698924732, 68.81720430107526, 83.87096774193549, 96.7741935483871], [12.903225806451612, 45.16129032258065, 72.04301075268818, 91.39784946236558], [25.806451612903224, 64.51612903225806, 88.17204301075269, 95.6989247311828], [34.40860215053763, 69.89247311827957, 83.87096774193549, 95.6989247311828]]


In [40]:
# training and evaluating final series

min_loss = 100.0

errors = []

for i in range(training_cycles):
    model.apply(model._init_weights)
    test_losses = []
    for epoch in range(num_epochs):
        pred_y = model(final_X_train)
        loss = loss_function(pred_y, y_train)

        test_y = model(final_X_test)
        test_loss = loss_function(test_y, y_test)
        test_losses.append(test_loss.item())

        model.zero_grad()
        loss.backward()

        optimizer.step()

    if (test_losses [-1] < min_loss):
        min_loss = test_losses[-1]
        test_y = model(final_X_test)

        errors.clear()
        errors = eval(test_y, y_test.tolist())

print('MSE: ' + str(min_loss))
print('ERRORS: ' + str(errors))

MSE: 0.033109817653894424
ERRORS: [[11.827956989247312, 60.215053763440864, 84.94623655913979, 92.47311827956989], [43.01075268817204, 75.26881720430107, 88.17204301075269, 96.7741935483871], [30.107526881720432, 72.04301075268818, 83.87096774193549, 95.6989247311828], [22.580645161290324, 68.81720430107526, 93.54838709677419, 97.84946236559139], [32.25806451612903, 74.19354838709677, 87.09677419354838, 94.6236559139785]]
