In [1]:
import model
import numpy as np
import os
import time
from multiprocessing import Process
import random

In [2]:
def prepare_training_set(type, source):
    folder = f'{source}/{type}'
    samples = os.listdir(folder)

    train_sequences = []

    for sample in samples:

        sequence = []
    
        f = open(os.path.join(folder,sample))
        text = f.read()

        text = text.split('\n')

        for point in text:
            if len(point) < 2:
                continue

            point = point.replace('(',"")
            point = point.replace(')',"")

            point = point.split(',')
            point[0] = float(point[0])
            point[1] = float(point[1])
            point[2] = float(point[2])

            sequence.append(point)

        train_sequences.append(sequence)

    return train_sequences

def prepare_test_set(type, source):
    # test data
    folder = f'{source}/{type}'
    samples = os.listdir(folder)

    test_sequences = []

    for sample in samples:

        sequence = []
    
        f = open(os.path.join(folder,sample))
        text = f.read()

        text = text.split('\n')

        for point in text:
            if len(point) < 2:
                continue

            point = point.replace('(',"")
            point = point.replace(')',"")

            point = point.split(',')
            point[0] = float(point[0])
            point[1] = float(point[1])
            point[2] = float(point[2])

            sequence.append(point)

        test_sequences.append([sequence, type])
        
    return test_sequences

In [3]:
def model_info(model):
    info = model.model_info()
    print(f"Model : {info[0]}")
    print(f"Hidden states : {info[1]}")
    print(f"PI : {info[2]}")
    print(f"A : {info[3]}")
    print(f"Mean : {info[4]}")
    print(f"Variance : {info[5]}")

In [4]:
def train(model, set):
    model.train(set) 

In [5]:
def test(models, sequence, true_label):
    best = -np.inf
    label = ''

    print(10*"*")
    for model in models:
        score = model.classify(sequence)
        print(f"Model : {model.get_label()} | score = {score} | for class : {true_label}")
        if score > best:
            best = score
            label = model.get_label()
    print(10*"*")

    return true_label == label

In [6]:
# Initialize models + number of hidden states
hidden_states = 3

# circle
HMM_circle = model.HMM(hidden_states, 'circle')

# diagonal left
HMM_diagonal_left = model.HMM(hidden_states, 'diagonal_left')

# diagonal right
HMM_diagonal_right = model.HMM(hidden_states, 'diagonal_right')

# Horizontal
HMM_horizontal = model.HMM(hidden_states, 'horizontal')

# Vertical
HMM_vertical = model.HMM(hidden_states, 'vertical')

model_set = [HMM_circle, HMM_diagonal_left, HMM_diagonal_right, HMM_horizontal, HMM_vertical]

In [7]:
source_training = 'clean_data'
source_test = 'clean_data2'

circle_training_set = prepare_training_set('circle', source_training)
circle_test_set = prepare_test_set('circle', source_test)

diagonal_left_training_set = prepare_training_set('diagonal_left', source_training)
diagonal_left_test_set = prepare_test_set('diagonal_left', source_test)

diagonal_right_training_set = prepare_training_set('diagonal_right', source_training)
diagonal_right_test_set = prepare_test_set('diagonal_right', source_test)

horizontal_training_set = prepare_training_set('horizontal', source_training)
horizontal_test_set = prepare_test_set('horizontal', source_test)

vertical_training_set = prepare_training_set('vertical', source_training)
vertical_test_set = prepare_test_set('vertical', source_test)


test_set = circle_test_set+diagonal_left_test_set+diagonal_right_test_set+horizontal_test_set+vertical_test_set
random.shuffle(test_set)

processes = [
    Process(target=train, args=(HMM_circle, circle_training_set)),
    Process(target=train, args=(HMM_diagonal_left, diagonal_left_training_set)),
    Process(target=train, args=(HMM_diagonal_right, diagonal_right_training_set)),
    Process(target=train, args=(HMM_horizontal, horizontal_training_set)),
    Process(target=train, args=(HMM_vertical, vertical_training_set)),
]

In [8]:
# train models

start = time.time()

for p in processes:
    p.start()

for p in processes:
    p.join()

end = time.time()

  terms += self.A[i][state] * self.B(sequence[t+1], state) * beta[t+1][state]
  terms += self.A[i][state] * self.B(sequence[t+1], state) * beta[t+1][state]
  log_gamma -= np.max(log_gamma, axis=1, keepdims=True)
  terms += self.A[i][state] * self.B(sequence[t+1], state) * beta[t+1][state]
  log_gamma -= np.max(log_gamma, axis=1, keepdims=True)
  terms += self.A[i][state] * self.B(sequence[t+1], state) * beta[t+1][state]
  terms += self.A[i][state] * self.B(sequence[t+1], state) * beta[t+1][state]
  xi_ij = np.exp(log_xi)
  xi_ij = np.exp(log_xi)
  log_gamma -= np.max(log_gamma, axis=1, keepdims=True)
  log_gamma -= np.max(log_gamma, axis=1, keepdims=True)
  log_gamma -= np.max(log_gamma, axis=1, keepdims=True)
  xi_ij = np.exp(log_xi)
  xi_ij = np.exp(log_xi)
  xi_ij = np.exp(log_xi)
  terms += self.A[i][state] * self.B(sequence[t+1], state) * beta[t+1][state]
  terms += self.A[i][state] * self.B(sequence[t+1], state) * beta[t+1][state]
  terms += self.A[i][state] * self.B(sequence[t+1

In [9]:
# test models

correct = 0
for sequence in test_set:
    
    if test(model_set, np.array(sequence[0]), sequence[1]):
        correct += 1

accuracy = correct/len(test_set)*100
print("Accuracy : ",accuracy,"%")
print("Training time: ",(end-start),"s")

**********
Model : circle | score = 1.4319456220014484 | for class : horizontal
Model : diagonal_left | score = 1.43194562200147 | for class : horizontal
Model : diagonal_right | score = 1.4319456220014564 | for class : horizontal
Model : horizontal | score = 1.431945622001492 | for class : horizontal
Model : vertical | score = 1.4319456220014632 | for class : horizontal
**********
**********
Model : circle | score = 1.4319456220014484 | for class : vertical
Model : diagonal_left | score = 1.431945622001471 | for class : vertical
Model : diagonal_right | score = 1.4319456220014573 | for class : vertical
Model : horizontal | score = 1.4319456220014941 | for class : vertical
Model : vertical | score = 1.4319456220014641 | for class : vertical
**********
**********
Model : circle | score = 1.4319456220014524 | for class : circle
Model : diagonal_left | score = 1.4319456220014906 | for class : circle
Model : diagonal_right | score = 1.4319456220014668 | for class : circle
Model : horizonta

In [10]:
'''
for model in model_set:
    model_info(model)
'''

'\nfor model in model_set:\n    model_info(model)\n'