### Linear Gaussian Model

#### Import Dependencies

In [1]:
import numpy as np 
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import LabelEncoder
%matplotlib inline

#### Auxiliary Functions

In [2]:
# Skeleton definition
NUI_SKELETON_POSITION_COUNT = 20

NONE = -1
HIP_CENTER = 0
SPINE = 1
SHOULDER_CENTER = 2
HEAD = 3
SHOULDER_LEFT = 4
ELBOW_LEFT = 5
WRIST_LEFT = 6
HAND_LEFT = 7
SHOULDER_RIGHT = 8
ELBOW_RIGHT = 9
WRIST_RIGHT = 10
HAND_RIGHT = 11
HIP_LEFT = 12
KNEE_LEFT = 13
ANKLE_LEFT = 14
FOOT_LEFT = 15
HIP_RIGHT = 16
KNEE_RIGHT = 17
ANKLE_RIGHT = 18
FOOT_RIGHT = 19

nui_skeleton_names = ( \
    'HIP_CENTER', 'SPINE', 'SHOULDER_CENTER', 'HEAD', \
    'SHOULDER_LEFT', 'ELBOW_LEFT', 'WRIST_LEFT', 'HAND_LEFT', \
    'SHOULDER_RIGHT', 'ELBOW_RIGHT', 'WRIST_RIGHT', 'HAND_RIGHT', \
    'HIP_LEFT', 'KNEE_LEFT', 'ANKLE_LEFT', 'FOOT_LEFT', \
    'HIP_RIGHT', 'KNEE_RIGHT', 'ANKLE_RIGHT', 'FOOT_RIGHT' )

nui_skeleton_conn = ( \
    NONE, \
    HIP_CENTER, \
    SPINE, \
    SHOULDER_CENTER, \
    # Left arm 
    SHOULDER_CENTER, \
    SHOULDER_LEFT,  \
    ELBOW_LEFT,  \
    WRIST_LEFT,  \
    # Right arm 
    SHOULDER_CENTER,  \
    SHOULDER_RIGHT,  \
    ELBOW_RIGHT,  \
    WRIST_RIGHT,  \
    # Left leg 
    HIP_CENTER,  \
    HIP_LEFT,  \
    KNEE_LEFT,  \
    ANKLE_LEFT,  \
    # Right leg 
    HIP_CENTER,  \
    HIP_RIGHT,  \
    KNEE_RIGHT,  \
    ANKLE_RIGHT,  \
)

In [3]:
def load_dataset(file=None):
    """
      Returns the data, the labels and the person id for each action
    """
    import scipy.io
    
    if file is None:
        ex = scipy.io.loadmat('data/data.mat')
    else:
        ex = scipy.io.loadmat(file)
        
    return ex['data'],ex['labels'],ex['individuals']

In [4]:
def log_normpdf(x, mu, sigma):
    """
      Computes the natural logarithm of the normal probability density function
      
    """
    p1 = np.log(1/(np.sqrt(2*np.pi)*sigma))
    p2 = ((x-mu)**2)/(2*(sigma**2))
    return p1-p2

In [5]:
def normalize_logprobs(log_probs):
    """
       Returns the log prob normalized so that when exponenciated
       it adds up to 1 (Useful to normalizes logprobs)
    """
    mm = np.max(log_probs)
    return log_probs - mm - np.log(np.sum(np.exp(log_probs - mm)))


In [6]:
def my_cov(x,y,w):
    """
      Useful function for fit_linear_gaussian
    """
    return np.sum(w*x*y)/np.sum(w)-np.sum(w*x)*np.sum(w*y)/np.sum(w)/np.sum(w)


In [7]:
def means_from_betas(betas, instance):
    
    return betas[0] + betas[1] * instance[0] + betas[2] * instance[1] + betas[3] * instance[2]

#### Training Functions and Classes

In [8]:
def fit_gaussian(X, W=None):
    """
      Compute the mean and variance of X, 
      You can ignore W for the moment
    """
    mean = np.mean(X,axis=0)
    variance = np.var(X,axis=0)
    sigma = np.sqrt(variance)
    return (mean, sigma)

In [9]:
def fit_linear_gaussian(Y,X,W = None):
    """
    Input:
      Y: vector of size Dx1 with the observations for the variable
      X: matrix DxV with the observations for the parent variables
                 of X. V is the number of parent variables
      W: vector of size D with the weights of the instances (ignore for the moment)
      
    Outout:
       The betas and sigma
    """
    #We add ones in the first row in numpy
    #https://stackoverflow.com/questions/8298797/inserting-a-row-at-a-specific-location-in-a-2d-array-in-numpy
    D = X.shape[0]

    X1 = np.insert(X, 0, np.ones(D), 1)

    V = X1.shape[1]
    
    lhs = [np.mean(Y * X1[:,i]) for i in range(V)]
    lhs = np.asarray(lhs)

    #V+1 * D @ D * V+1 = V+1 * V+1
    rhs = X1.T @ X1
    rhs = rhs/D

    betas = np.linalg.inv(rhs) @ lhs

    W = np.ones(D)
    cov_y = my_cov(Y, Y, W)
    cov_x = np.zeros((V, V))
    
    for i in range(1,V):
        for j in range (1,V):
            cov_x[i, j] = betas[i] * betas[j] * my_cov(X[:, i-1], X[:, j-1], W)

    variance = cov_y - np.matrix.sum(np.matrix(cov_x))
    
    sigma = np.sqrt(variance)
    
    if sigma == 0 or type(sigma) == 'complex':                                   
        sigma = .01                                                              
    else:                                                                        
        sigma = sigma + .01

    return (betas, sigma)

In [10]:
class Jointpart:
    def __init__(self, means, sigmas, betas = None):
        if betas is None:
            self.NaiveBayesModel(means,sigmas)
        else:
            self.LinearGaussianModel(betas, sigmas)

    def NaiveBayesModel(self, means, sigmas):
        self.means = means
        self.sigmas = sigmas

    def LinearGaussianModel(self, betas, sigmas):
        self.betas = betas
        self.sigmas = sigmas

In [11]:
class Model:
    def __init__(self, Graph):
        self.connectivity = Graph
        self.jointparts = []
        
        if Graph == None:
            self.modelType = "NB"
        else:
            self.modelType = "LGM"

    def AppendJointParams(self,jointpart):
        self.jointparts.append(jointpart)

    def SetClassPriors(self,priors):
        self.classPriors = priors

    def SetNumClasses(self,numClasses):
        self.numClasses = numClasses

In [12]:
def learn_model(dataset, labels, G=None):
    myModel = Model(G)
    numJoints = dataset.shape[0]
    
    unique, counts = np.unique(labels, return_counts=True)
    priors = [i/len(labels) for i in counts]
    myModel.SetClassPriors(np.asarray(priors))
    numClasses = len(unique)
    myModel.SetNumClasses(numClasses)
    labels = labels.squeeze()
    
    if myModel.modelType == "NB":        
        for joint in range(numJoints):
            meanList = []
            sigmaList = []
            for label in unique:
                m, s = fit_gaussian(dataset[joint,:,labels[:] == label])
                meanList.append(m)
                sigmaList.append(s)
            
            means = np.column_stack(meanList)
            sigmas = np.column_stack(sigmaList)
            
            myModel.AppendJointParams(Jointpart(means,sigmas))
        
        return myModel
        
    elif myModel.modelType == "LGM":
        for joint in range(numJoints):
            if myModel.connectivity[joint] == -1:

                meanList = []
                sigmaList = []
                for label in unique:
                    m, s = fit_gaussian(dataset[joint,:,labels[:] == label])
                    meanList.append(m)
                    sigmaList.append(s)

                means = np.column_stack(meanList)
                sigmas = np.column_stack(sigmaList)

                myModel.AppendJointParams(Jointpart(means,sigmas))
            
            else:
                betas = np.zeros((12,4))
                sigmas = np.zeros((3,4))

                for label in range(numClasses):
                    for coordinate in range (3):
                        Y = dataset[joint,coordinate,labels[:] == label]
                        X = dataset[myModel.connectivity[joint],:,labels[:] == label]
                        beta, sigma = fit_linear_gaussian(Y,X)
                        betas[coordinate*4:(coordinate+1)*4,label] = beta
                        sigmas[coordinate,label] = sigma
                        
                myModel.AppendJointParams(Jointpart(None,sigmas,betas))

        return myModel

In [13]:
def classify_instances(instances, model):
    output = np.zeros((instances.shape[2], model.numClasses))

    for i in range(instances.shape[2]):
        logProbs = compute_logprobs(instances[:,:,i],model)
        normLogProbs = normalize_logprobs(logProbs)
        expProbs = np.exp(normLogProbs)
        output[i,:] = expProbs

    return output

In [14]:
def compute_logprobs(example, model):
    """
    
       Input
           instance: a 20x3 matrix defining body positions of one instance
           model: as given by learn_model

       Output
           l: a vector of len #classes containing the loglikelihhod of the 
              instance

    """
    output = np.log(model.classPriors)
    
    if model.modelType == "NB":
        for label in range(model.numClasses):
            for joint in range(example.shape[0]):
                for coordinate in range(example.shape[1]):
                    output[label] += log_normpdf(example[joint,coordinate],
                                                 model.jointparts[joint].means[coordinate,label],
                                                 model.jointparts[joint].sigmas[coordinate,label])
    else:        
        for label in range(model.numClasses):
            for joint in range(example.shape[0]):
                for coordinate in range(example.shape[1]):
                    if joint == 0:
                        output[label] += log_normpdf(example[joint,coordinate],
                                                     model.jointparts[joint].means[coordinate,label],
                                                     model.jointparts[joint].sigmas[coordinate,label])
                    else:
                        output[label] += log_normpdf(example[joint,coordinate],
                                                     means_from_betas(model.jointparts[joint].betas[coordinate*4:(coordinate+1)*4,label], example[model.connectivity[joint], :]),
                                                     model.jointparts[joint].sigmas[coordinate,label])
                        
    
    return output

#### Evaluation Functions

In [15]:
def generate_random_lgm_samples(n, betas, sigma):
    """Function to generate random samples for a 
       Linear Gaussian Model
       Input:
           n: Number of samples
           betas: vector with the values the the betas from 0 to k
           sigma: standard deviation
    """
    X = np.random.randn(n,betas.shape[0]-1)
    Y = np.random.randn(n)*sigma + np.sum(X*betas[1:],axis=1)+betas[0]
    return X,Y

betas = np.array([1,2,10,4,5,6])
sigma = 0.1
n=100
X,Y=generate_random_lgm_samples(n,betas,sigma)
# This following call should output  betas and sigma close to the above ones
fit_linear_gaussian(Y,X)

(array([1.00617222, 2.00327439, 9.98813227, 4.00495957, 4.98809169,
        5.98360449]), 0.1215204625638084)

In [16]:
def accuracy(predClass,labels):
    labels = labels.squeeze()
    out = np.sum([predClass[i] == labels[i] for i in range(len(predClass))])
    return out/len(predClass)

In [17]:
def stratified_K_fold_cross_val(dataset,labels,K=10,G=None):
    labels = labels.squeeze()
    accuracies = []
    skf = StratifiedKFold(n_splits=K)
    
    for train_index, test_index in skf.split(np.reshape(dataset,(2045,20,3)), labels[:]):
        trainX, testX = dataset[:,:,train_index], dataset[:,:,test_index]
        trainY, testY = labels[train_index], labels[test_index]
        
        myModel = learn_model(trainX,trainY,G)
        probs = classify_instances(testX, myModel)
        resultClass = np.argmax(probs, axis=1)
        acc = accuracy(resultClass,testY)
        accuracies.append(acc)

    return (np.mean(accuracies),np.var(accuracies))

In [18]:
def leave_one_out_cross_val(dataset,labels,individuals,G=None):
    labels = labels.squeeze()
    individuals = individuals.squeeze()
    accuracies = []

    for ind in np.unique(individuals):
        train_index = individuals != ind
        test_index = individuals == ind

        trainX, testX = dataset[:,:,train_index], dataset[:,:,test_index]
        trainY, testY = labels[train_index], labels[test_index]

        myModel = learn_model(trainX,trainY,G)
        probs = classify_instances(testX, myModel)
        resultClass = np.argmax(probs, axis=1)
        acc = accuracy(resultClass,testY)
        accuracies.append(acc)

    return (np.mean(accuracies),np.var(accuracies))

#### Results

#### Validation

In [19]:
import scipy.io
import numpy as np

d = scipy.io.loadmat('data/validation_data.mat')

data_small        = d['data_small']
labels_small      = d['labels_small'].squeeze()
individuals_small = d['individuals_small'].squeeze()
train_indexes     = np.array(d['train_indexes'].squeeze(),dtype=bool)
test_indexes      = np.array(d['test_indexes'].squeeze(),dtype=bool)
model_nb          = d['model_nb']
model_lg          = d['model_lg']
accur_lg          = d['accur_lg']
accur_nb          = d['accur_nb']

In [20]:
le = LabelEncoder()
le.fit(labels_small)
transformed_labels_small = le.transform(labels_small)

# Params of following model should be similar to those of model_nb above
naive_bayes_model = learn_model(data_small[:,:,train_indexes],transformed_labels_small[train_indexes])
linear_gaussian_model = learn_model(data_small[:,:,train_indexes],transformed_labels_small[train_indexes], nui_skeleton_conn)

# The accuracy of naive_bayes_model on data_small[:,:,test_indexes] that have labels
# labels_small[test_indexes] should be aprox accur_nb
prob_NB = classify_instances(data_small[:,:,test_indexes], naive_bayes_model)
resultClass_NB = np.argmax(prob_NB, axis=1)
test_accuracy_NB = accuracy(resultClass_NB,transformed_labels_small[test_indexes])
print("Naive Bayes:",test_accuracy_NB)

# The accuracy of linear_gaussian_model on data_small[:,:,test_indexes] that have labels
# labels_small[test_indexes] should be aprox accur_lg
prob_LGM = classify_instances(data_small[:,:,test_indexes], linear_gaussian_model)
resultClass_LGM = np.argmax(prob_LGM, axis=1)
test_accuracy_LGM = accuracy(resultClass_LGM,transformed_labels_small[test_indexes])
print("Linear Gaussian:",test_accuracy_LGM)

Naive Bayes: 0.95
Linear Gaussian: 0.975


#### Evaluation
##### 10-fold cross validation
##### leave one out cross validation

In [21]:
## Evaluation on the training dataset

#Load dataset
dataset, labels, individuals = load_dataset()
labels = labels.squeeze()

#Transform labels to a form of 0 - numClasses - 1
#This is done so we can use functions like np.argmax to get output label
#from probabilities
le = LabelEncoder()
le.fit(labels)
transformed_labels = le.transform(labels)

In [22]:
#Evaluating the Naive Bayes model.
#Perfrom 10-fold cross validation and,
#Leave one out cross validation on individuals

#The output for each cross validation is a tuple,
#of the form (mean, variance)


print("Naive Bayes Model Evaluation")
print("")

print("---10 Fold Cross Validation---")
cv1_NB = stratified_K_fold_cross_val(dataset,transformed_labels,10)
print(cv1_NB)

print("")

print("---Leave One Out Cross Validation---")
cv2_NB = leave_one_out_cross_val(dataset,transformed_labels,individuals)
print(cv2_NB)

Naive Bayes Model Evaluation

---10 Fold Cross Validation---
(0.9427857484457197, 0.002470360785365809)

---Leave One Out Cross Validation---
(0.9425818333235602, 0.01328690037044481)


In [23]:
#Evaluating the Linear Gaussian model.
#Perfrom 10-fold cross validation and,
#Leave one out cross validation on individuals

#The output for each cross validation is a tuple,
#of the form (mean, variance)


print("Linear Gaussian Model Evaluation")
print("")

print("---10 Fold Cross Validation---")
cv1_LG = stratified_K_fold_cross_val(dataset,transformed_labels,10,nui_skeleton_conn)
print(cv1_LG)

print("")

print("---Leave One Out Cross Validation---")
cv2_LG = leave_one_out_cross_val(dataset,transformed_labels,individuals,nui_skeleton_conn)
print(cv2_LG)

Linear Gaussian Model Evaluation

---10 Fold Cross Validation---
(0.9833715925394546, 0.0008091374044806353)

---Leave One Out Cross Validation---
(0.9850682489839717, 0.0025799554481852355)
