# Gesture Recognition with Hidden Markov Models

In [3]:
import numpy as np
import matplotlib.pyplot as plt
from os import listdir
import pickle as pkl

## HMM Filters
### Forward Filter

In [4]:
def forward_filt(A, B, pi, obs):
    K = len(obs) #number of observations
    S = len(A) #number of states
    #q_ki = P(s_k = i, y_1:k)
    alpha = np.zeros((K,S))
    scale_factor = np.zeros(K)

    #initialization
    alpha[0] = np.multiply(B[:,obs[0]],pi) #4x1 vector  
    scale_factor[0] = 1/np.maximum(1e-300,np.sum(alpha[0]))
    alpha[0] *= scale_factor[0] #normalize

    for k in range(1,K):
        B_k = np.diag(B[:,obs[k]])
        alpha[k] = np.dot(np.dot(B_k,A.T),alpha[k-1]) #P(j,o..) = P(i,o..)P(j|i)P(o|j)
        scale_factor[k] = 1/np.maximum(1e-300,np.sum(alpha[k]))
        alpha[k] = alpha[k] * scale_factor[k] #normalize

    # #Equivalent For Loop
    # for k in range(1,len(obs)):
    #     for j in range(len(A)):
    #         for i in range(len(A)):
    #             alpha[k,j] += alpha[k-1,i]*A[i,j]*B[j,obs[k]]
    #     scale_factor[k] = 1/np.sum(alpha[k])
    #     alpha[k] = alpha[k] * scale_factor[k]

    return [alpha, scale_factor]

### Backward Filter

In [5]:
def backward_filt(A, B, obs, scale_factor):
    nstates = len(A)
    K = len(obs)
    beta = np.zeros((K,nstates))
    #initialization
    beta[K-1] = np.ones(nstates)
    #beta[K-1][beta[K-1] == 0] = 1e-10
    #beta[K-1] = beta[K-1]/len(A)
    beta[K-1] *= scale_factor[K-1]
    for k in range(K-1,0,-1):
        B_k = np.diag(B[:,obs[k]]) #diagonalize for elementwise multiplication w/ dot prod
        beta[k-1] = np.dot(np.dot(A,B_k),beta[k]) #beta[k-1] = A * B_k * beta[k]
        beta[k-1] *= scale_factor[k-1]
        
    # #Equivalent For Loop
    # for k in range(K-1,0,-1):
    #     for i in range(nstates):
    #         for j in range(nstates):
    #             beta[k-1,i]  += beta[k,j]*B[j,obs[k]]*A[i,j]
    #     beta[k-1] = beta[k-1] * scale_factor[K-k]
    return beta


### Smoother

In [6]:
def smoother(alpha, beta):
    K = len(alpha) #number of observations/time steps
    gamma = np.zeros(alpha.shape)

    for k in range(K):
        alpha_k = alpha[k]
        beta_k = beta[k]
        num = np.multiply(alpha_k, beta_k)
        den = np.sum(num) #sum across all states
        gamma[k] = num/np.maximum(1e-300,den) #normalize

    # #Equivalent For Loop
    # for k in range(K):
    #     for i in range(alpha.shape[1]):
    #         gamma[k,i] = alpha[k,i] * beta[k,i]
    #     gamma[k] = gamma[k]/np.sum(gamma[k])
    return gamma

### Paired States

In [7]:
def paired_st(A, B, alpha, beta, obs):
    K = len(alpha) #number of observations
    S = len(A) #number of states
    xi = np.zeros([K,S,S])
    for k in range(K):
        alpha_k = alpha[k,:,np.newaxis] #add new axis so mmultiplication will work elementwize
        beta_k1 = beta[(k+1)%K,:]
        o_k1 = obs[(k+1)%K] #next observation (loop around to start for last obs)
        B_k1 = B[:,o_k1] #likelihood of next observation for given state

        xi[k] = alpha_k * A * B_k1 * beta_k1
        xi[k] = xi[k]/np.maximum(1e-300,np.sum(xi[k])) #normalize

        # #Equivalent For Loop
        # for i in range(S):
        #     for j in range(S):
        #         xi[k,i,j] = alpha[k,i]*A[i,j]*B_k1[j]*beta_k1[j]
        # xi[k] = xi[k]/np.sum(xi[k]) #normalize
    return xi

## Baum-Welsch Algorithm

In [8]:
def baum_welsch(HMM, obs, N): #(initial guesses,,, observations, number of iterations)
    A_est = np.copy(HMM[0])
    B_est = np.copy(HMM[1])
    pi_est = np.copy(HMM[2])
    K = len(obs) #number of observations
    S = len(A_est) #number of states
    O = B_est.shape[1] #number of observation states
    l = np.zeros(N) #likelihoods
    for n in range(N):
        #E-Step---------------
        #HMM Filtering
        #---------------------
        alpha, c = forward_filt(A_est, B_est, pi_est, obs) #run forward filter
        beta = backward_filt(A_est, B_est, obs, c) #run backward filter
        gamma = smoother(alpha, beta) #run smoother
        xi = paired_st(A_est, B_est, alpha, beta, obs)

        l[n] = -np.sum(np.log(c)) #log likelihood of obs on HMM

        #M-Step---------------
        #update parameter estimates
        #---------------------
        #update pi_est
        pi_est = gamma[0]
        
        #update A_est
        A_num = np.sum(xi,axis=0) #expected num of jumps from i to j
        A_den = np.sum(gamma,axis=0)[:,np.newaxis] #total expected visits to i
        A_est = A_num/np.maximum(1e-10,A_den)

        #update B_est
        B_den = np.sum(gamma,axis=0)
        for o in range(B_est.shape[1]): #loop through all possible observations
            B_num = np.sum(gamma[obs == o],axis=0) #sum over observations o 
            B_est[:,o] = B_num/np.maximum(1e-10,B_den)

        # #Equivalent For Loop
        # for i in range(S): #current state
        #     for j in range(S): #next state
        #         A_num = np.sum(xi[:,i,j])
        #         A_den = np.sum(gamma[:,i])
        #         A_est[i,j] = A_num/np.maximum(1e-10,A_den)
        #     for o in range(O): #observation state
        #         B_num = 0
        #         for k in range(K): #observation
        #             if(obs[k] == o):
        #                 B_num += gamma[k,i]
        #         B_den = np.sum(gamma[:,i])
        #         B_est[i,o] = B_num/np.maximum(1e-10,B_den)
    return [A_est, B_est, pi_est, l]

In [20]:
#generate random probability matrix (rows sum to 1)
def rand_prob_mat(shape):
    p = np.zeros(shape)
    nrows = shape[0]
    if(len(shape) == 1):
        p = np.random.rand(nrows)
        p = p/np.sum(p)
    elif(len(shape) == 2):
        ncols = shape[1]
        for i in range(nrows):
            p[i,:] = np.random.rand(1,ncols)
            p[i,:] = p[i,:]/np.sum(p[i,:])
    return p

## K-Means Algorithm

In [9]:
class MyKMeans:
    def __init__(self, n_clusters, cutoff=0.05, max_iter = 500):
        self.n_clusters = n_clusters
        self.centroids = np.zeros(n_clusters)
        self.cutoff = cutoff
        self.max_iter = max_iter

    def _sos(self, arr): #sum of squares
        return np.sum(np.power(arr,2),axis=tuple(range(1,arr.ndim)))

    #CREDIT: Eli told me about this trick to calculate distances on the entire arrays
    #        to avoid looping though every element
    def _dist_arrs(self, X1, X2): #l2 norm between vectors in 2 arrays
        x1_sos = np.expand_dims(self._sos(X1), axis=1)
        x2_sos = self._sos(X2)
        dot_sos = -2*X1.dot(X2.T)
        sos = np.round(x1_sos+x2_sos+dot_sos,10) #round to 10 decimals to avoid negative due to machine error
        return np.sqrt(sos)

    def _dist(self,x1,x2): #l2 norm between 2 vectors
        sq_err = np.power((x1 - x2),2)
        sm = np.sum(sq_err)
        return np.sqrt(sm)

    def _calc_centroids(self, X, labels): #calculate new centroids as mean position of all x with same label
        n_cents = np.array([np.mean(X[labels == i],axis=0) for i in range(self.n_clusters)])
        n_cents[np.isnan(n_cents)] = self.centroids[np.isnan(n_cents)] #any centroids with no members remain stationary
        return np.array([np.mean(X[labels == i],axis=0) for i in range(self.n_clusters)])
    
    def _nearest_centroid_arr(self, X): #calculate nearest centroid for each element in array
        dist = self._dist_arrs(X,self.centroids)
        labels = np.argmin(dist,axis=1) #label with index of nearest centroid
        return labels

    def _nearest_centroid(self, x):
        min_dist = 1e100
        label = 0
        for i,cent in enumerate(self.centroids):
            d = self._dist(x,cent)
            if(d < min_dist):
                label = i
                min_dist = d
        # dist = [self._dist(x,cent) for cent in self.centroids] #calculate distances to centroids
        # label = np.argmin(dist) #label with index of closest centroid
        return label

    def fit(self,X):
        X = np.array(X)
        cent_idx = np.random.choice(range(len(X)), self.n_clusters, replace=False) #indices of x's that will be used as initial centroids
        self.centroids = np.array([X[i] for i in cent_idx])
        xlabels = self._nearest_centroid_arr(X) #label each sample with nearest centroid
        training = True
        cnt = 0
        while training:
            cnt = cnt + 1
            n_centroids = self._calc_centroids(X, xlabels) #new centroids at mean of labeled samples
            diff = np.max([self._dist(n_centroids[i], centroid) for i,centroid in enumerate(self.centroids)]) #distance centroids moved
            self.centroids = n_centroids #update centroids
            xlabels = self._nearest_centroid_arr(X) #label each sample x by nearest centroid
            training = diff > self.cutoff #exit loop if no centroid moves by more than cutoff
            if(cnt > self.max_iter):
                print('Maximum Iterations',self.max_iter,'reached before cunvergence')
                training = False
        return self

    def predict(self,x):
        if(x.ndim > 1):
            return self._nearest_centroid_arr(x)
        else:
            return self._nearest_centroid(x)

## Training Models
### Import Data

In [10]:
TRAIN_DIR_PATH = '../train/'

train_files = listdir(TRAIN_DIR_PATH)

#CREDIT: Eli gave me the idea to use a dictionary like this rather
#        than the messy attempt I had at just using arrays
keys = ['wave','inf','eight','circle','beat3','beat4']
obs_dict = {k:[] for k in keys}
all_obs = []
for f in train_files:
    obs = np.genfromtxt(fname=TRAIN_DIR_PATH + f, usecols=[1,2,3,4,5,6]) #read observations from file
    all_obs.append(obs)
    if('wave' in f):
        obs_dict['wave'].append(obs)
    elif('inf' in f):
        obs_dict['inf'].append(obs)
    elif('eight' in f):
        obs_dict['eight'].append(obs)
    elif('circle' in f):
        obs_dict['circle'].append(obs)
    elif('beat3' in f):
        obs_dict['beat3'].append(obs)
    elif('beat4' in f):
        obs_dict['beat4'].append(obs)
all_obs = np.concatenate(all_obs)

### Cluster Observations using KMeans

In [11]:
nstates = 20 #number of hidden states
nobs = 100 #number of observation clusters

In [12]:
kmeans = MyKMeans(n_clusters=nobs).fit(all_obs) #train k-means on all observations

c_obs_dict = {}
for k in keys:
    print('clustering',k,'...')
    c_obs_dict[k] = [kmeans.predict(obs) for obs in obs_dict[k]] #classified/clustered observations
print('Done!')

clustering wave ...
clustering inf ...
clustering eight ...
clustering circle ...
clustering beat3 ...
clustering beat4 ...
Done!


### Pickle KMeans

In [13]:
#store trained kmeans in pkl because they take forever to train
fname = 'kmeans.pkl'
f = open(fname,'wb')
pkl.dump(kmeans,f)
f.close()

### Load Pickled KMeans

In [14]:
#Read pickled kmeans file to avoid retraining models
fname = 'kmeans.pkl'
f = open(fname,'rb')
kmeans = pkl.load(f)
f.close

c_obs_dict = {}
for k in keys:
    c_obs_dict[k] = [kmeans.predict(obs) for obs in obs_dict[k]] #classified/clustered observations

### Train HMMs

In [27]:
N = 50 #number of iterations for Baum-Welsch

np.random.seed(117)
A_rand = rand_prob_mat((nstates,nstates))
B_rand = rand_prob_mat((nstates,nobs))
pi_rand = rand_prob_mat((nstates,))

likelihoods = {}
HMMs = {}
for k in keys:
    print("training ",k,'...',end='',sep='')
    HMMs[k] = [A_rand, B_rand, pi_rand] #randomly initialize HMM
    big_obs_vect = np.concatenate(c_obs_dict[k])
    bw = baum_welsch(HMMs[k], big_obs_vect, N)
    HMMs[k] = bw[0:3]
    likelihoods[k] = bw[3]
    # for c_obs in c_obs_dict[k]: #loop through observation files
    #     HMMs[k] = baum_welsch(HMMs[k], c_obs, N) #train HMM
    #     print('.',end='')
    print('done')
    for param in HMMs[k]:
        assert not np.any(np.isnan(param)) #error if there are any nans
print('Done!')

trainingwave...

KeyboardInterrupt: 

### Pickle Trained HMMs

In [22]:
#store trained HMMs in pkl because they take forever to train
fname = 'HMMs.pkl'
f = open(fname,'wb')
pkl.dump(HMMs,f)
f.close()

### Load Pickled HMMs

In [15]:
#Read pickled HMM file to avoid retraining models
fname = 'HMMs.pkl'
f = open(fname,'rb')
HMMs = pkl.load(f)
f.close

<function BufferedReader.close>

## Predict Gestures

In [16]:
def obs_mod_lik(HMM, obs): #likelihood that given observations came from given HMM
    K = len(obs) #number of observations
    A, B, pi = HMM
    _, c = forward_filt(A, B, pi, obs) #forward filter
    lik = -np.sum(np.log(c)) #calculate likelihood of seeing these obs under this HMM
    return lik

In [17]:
def classify_gesture(HMMs, obs):
    keys = np.array(list(HMMs.keys()))
    l = np.zeros(len(keys))
    max_l = 0
    pred = 'none'
    c_obs = kmeans.predict(obs)
    for i,k in enumerate(keys):
        #c_obs = kmeans[k].predict(obs)
        l[i] = obs_mod_lik(HMMs[k], c_obs)
        if(k == 'wave'):
            l[i] *= 0.6 #likelihood of wave artificially increased because of
                        #its relationship with likelihood of inf
    # arrange likelihoods and keys in descending order
    sort_ind = l.argsort()
    sorted_l = l[sort_ind[::-1]]
    sorted_keys = keys[sort_ind[::-1]]
    return [sorted_keys,sorted_l] #returns gestures in order of decreasing likelihood and associated likelihoods

## Test

In [26]:
TEST_DIR_PATH = '../old_test/'
test_files = listdir(TEST_DIR_PATH)

for f in test_files:
    obs = np.genfromtxt(fname=TEST_DIR_PATH + f, usecols=[1,2,3,4,5,6]) #read observations from file
    pred,ls = classify_gesture(HMMs, obs)
    print('----',f,'----')
    for i in range(3):
        print(i+1, '. ', pred[i], '\t', ls[i], sep='')
    print()

---- beat3_31.txt ----
1. beat3	-412.87099308569594
2. beat4	-129239.38471487182
3. wave	-146468.01236039912

---- beat4_31.txt ----
1. beat4	-697.3715606016762
2. wave	-237663.1866077662
3. beat3	-258722.3781064947

---- circle31.txt ----
1. beat4	-168882.30119938627
2. circle	-198033.65725262315
3. wave	-260284.21891204698

---- eight31.txt ----
1. eight	-657.6731988453323
2. wave	-188808.46900902365
3. inf	-251111.4796806891

---- inf31.txt ----
1. inf	-1764.3464629249443
2. beat3	-332735.88949287566
3. wave	-353124.4498615668

---- wave31.txt ----
1. beat3	-151533.0547552234
2. beat4	-151955.42742992318
3. wave	-182779.20468186736

