In [1]:
import os
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from hmm import HMM
import re
import pickle

In [2]:
# define useful functions
def extract_prefix(text, pattern):
    match = re.match(pattern, text)
    if match:
        # If the first group (beat\d+_) is matched, use it; otherwise, use the second group
        prefix = match.group(1) if match.group(1) else match.group(2)
        return prefix
    
    return None


def quantization_model(data, n_clusters):
    X_all = []
    for v in data.values():
        X_all.extend(v)
    x_all = np.concatenate(X_all, axis=0)
    x_all = x_all[:,1:] # remove time column
    kmeans = KMeans(n_clusters=n_clusters, random_state=0, n_init="auto").fit(x_all)

    return kmeans

def train_model(data, key, n_clusters, n_hidden_states, q_model):
    X = np.concatenate(data[key], axis=0)
    X = X[:,1:] # remove time column

    y = q_model.predict(X)

    model = HMM(n_hidden=n_hidden_states, n_obs=n_clusters, verbose=False)
    loss = model.fit(y)
    model.save(filepath=f'pretrained_models/hmm_{key}.json')

    plt.plot(y)
    plt.savefig(f'plots/quantized_{key}.png')
    plt.clf()
    
    plt.plot(-np.array(loss))
    plt.savefig(f'plots/logloss_{key}.png')
    plt.clf()

In [20]:
n_clusters = 50
n_hidden_states = 10

# Training

The data contains six different motions: Wave, Infinity, Eight, Circle, Beat3, Beat4

In [6]:
data = {}
train_dir = 'data/train'

pattern = r"^(beat\d+)|([^\d]+)"

files = os.listdir(train_dir)
for fn in files:
    type = extract_prefix(fn, pattern)
    x = np.loadtxt(os.path.join(train_dir, fn))
    if type not in data:
        data[type] = [x]
    else:
        data[type].append(x)

ts , Wx, Wy, Wz, Ax, Ay, Az
(Time (millisecond), 3x Gyroscope (rad/sec), 3x Accelerometer (m/s2) )

In [7]:
# train models
n_clusters = 50
n_hidden_states = 10

q_model = quantization_model(data, n_clusters)

In [8]:
pickle.dump(q_model, open('pretrained_models/kmeans_50.pkl', 'wb'))

In [13]:
for key in data.keys():
    train_model(data, key, n_clusters, n_hidden_states, q_model)

<Figure size 640x480 with 0 Axes>

# Testing

In [34]:
# TODO: update directory to the appropriate test file
test_dir = 'data/test'

In [35]:
with open("pretrained_models/kmeans_50.pkl", "rb") as f:
    q_model = pickle.load(f)

In [36]:
model_path = 'pretrained_models/'
model_name = ['hmm_beat3', 'hmm_beat4', 'hmm_circle', 'hmm_eight', 'hmm_inf', 'hmm_wave']  
motion_name = ['beat3', 'beat4', 'circle', 'eight', 'inf', 'wave']  
results = {}

pattern = r"^(beat\d+)|([^\d]+)"
files = os.listdir(test_dir)
for fn in files:
    log_loss = []
    type = extract_prefix(fn, pattern)
    x = np.loadtxt(os.path.join(test_dir, fn))
    x = x[:,1:] # remove time column

    y = q_model.predict(x)
    for m in model_name:
        model = HMM(n_hidden=n_hidden_states, n_obs=n_clusters, verbose=False)
        model.load(filepath=f'{model_path}{m}.json')
        log_loss.append(model.predict(y))
                        
    results[fn] = log_loss

In [37]:
# report top scoring model for each test file
for k,v in results.items():
    indices_desc = np.argsort(v)[::-1]
    print(f'{k}: \n top-1: {motion_name[np.argmax(v)]} \n top-3: {[motion_name[i] for i in indices_desc[:3]]}')

test7.txt: 
 top-1: wave 
 top-3: ['wave', 'eight', 'beat3']
test6.txt: 
 top-1: eight 
 top-3: ['eight', 'inf', 'wave']
test4.txt: 
 top-1: beat4 
 top-3: ['beat4', 'beat3', 'wave']
test5.txt: 
 top-1: circle 
 top-3: ['circle', 'beat4', 'beat3']
test1.txt: 
 top-1: inf 
 top-3: ['inf', 'eight', 'beat3']
test2.txt: 
 top-1: beat4 
 top-3: ['beat4', 'beat3', 'inf']
test3.txt: 
 top-1: inf 
 top-3: ['inf', 'eight', 'beat3']
test8.txt: 
 top-1: beat4 
 top-3: ['beat4', 'beat3', 'wave']
