In [3]:
import numpy as np
import scipy.io as spio
import scipy.stats as stats
import math
import time

class GaussianProbabilisticModel:
    
    def __init__(self):
        self.num_classes = 0
        self.classes = []
        self.mle_mu_map = {}
        self.mle_sigma_map = {}
        self.class_prob_map = {}
        self.dim = 0
        self.topN_features = 200
        self.l = 0.1

    def __log_gaussian_pdf(self,x,mu,sigma,sigma_new_I_given = None,logdet_given = None,compute_inv_and_det = True):
        # calculate log of f(x)~multivariate Normal density
        # sigma is adjusted by 0.1*I to avoid inverting a singular matrix
        dim_of_cut_data = len(mu)
        sigma_new = np.matrix(sigma+self.l*np.eye(dim_of_cut_data))
        
        # can feed the inverse and det in directly so that we don't have to compute it every time
        if (compute_inv_and_det):
            sigma_new_I = sigma_new.I
            (sign, logdet) = np.linalg.slogdet(sigma_new)
        else:
            sigma_new_I = sigma_new_I_given
            logdet = logdet_given
        
        C = -0.5*logdet + (-dim_of_cut_data*0.5)*np.log(2*math.pi)
        x_minus_mu_mat = np.matrix(x-mu).T
        result = C+float(-0.5*x_minus_mu_mat.T*sigma_new_I*x_minus_mu_mat)

        return result
                
       
    def train_model(self,training_data,labels):
        # Steps:
        # 1: Find the N=200 most significant features, and slice the data based on those features
        # 2: Partition data according to labels
        # 3: Compute MLE of mu and sigma for each label
        
        # Step 1.1 - find the MLE of mu and sigma for each feature over the entire dataset
       
        for i in range(len(training_data)):
            if (i == 0):
                feature_mle_mu = training_data[i]
                self.dim = (training_data[i].shape)[0]
            else:
                feature_mle_mu += training_data[i]
            
        feature_mle_mu = feature_mle_mu * 1.0/len(training_data)
        
        for i in range(len(training_data)):
            x_minus_mu_feature = training_data[i]-feature_mle_mu
            if (i == 0):
                feature_mle_sigma = x_minus_mu_feature**2
            else:
                feature_mle_sigma += x_minus_mu_feature**2
                
        feature_mle_sigma = feature_mle_sigma * 1.0/len(training_data)
        
        # Step 1.2 - find the N=200 most significant features (features exhibiting the largest variance)
        keys = np.linspace(0,self.dim-1,self.dim)
        feature_var = list(zip(keys,feature_mle_sigma))
        
        feature_var_sorted = sorted(feature_var, key=lambda f: f[1], reverse = True)
        self.topN = [int(e[0]) for e in feature_var_sorted[:self.topN_features]]  
        
        # Step 1.3 - slice the training data on the most significant features
        training_data = training_data[:,self.topN]
        self.train_size = len(training_data)
        nums={}
        
        # Step 2.1 - calculate MLE of mu for each label
        for i in range(len(training_data)):
            if not (labels[i] in self.mle_mu_map):
                self.mle_mu_map[labels[i]] = training_data[i]
                self.classes += [labels[i]]
                nums[labels[i]] = 1
                self.num_classes += 1
                
            else:
                self.mle_mu_map[labels[i]] += training_data[i]
                nums[labels[i]] += 1
                
        for i in self.classes:
            self.mle_mu_map[i]=(1.0/nums[i])*self.mle_mu_map[i]
        
        # Step 2.2 - calculate MLE of sigma for each label
        for i in range(len(training_data)):
            # this is a column vector
            x_minus_mu = np.matrix(training_data[i]-self.mle_mu_map[labels[i]]).T
            
            if not (labels[i] in self.mle_sigma_map):
                self.mle_sigma_map[labels[i]]=(x_minus_mu)*(x_minus_mu.T)
            else:
                self.mle_sigma_map[labels[i]]+=(x_minus_mu)*(x_minus_mu.T)
               
        for i in self.classes:
            self.mle_sigma_map[i]=(1.0/nums[i])*self.mle_sigma_map[i]
            self.class_prob_map[i]=(nums[i]*1.0)/len(training_data)
         
        # precalculate matrix inverses and determinants to use in prediction function
        # the "new" refers to computing the inverse of sigma + l*I 
        self.sigma_new_inverses = {}
        self.logdets = {}
        for i in self.classes:
            self.sigma_new_inverses[i]=np.matrix(self.mle_sigma_map[i] + self.l*np.eye(self.topN_features)).I
            self.logdets[i]=np.linalg.slogdet(np.matrix(self.mle_sigma_map[i] + self.l*np.eye(self.topN_features)))[1]
            
    def predict(self,x):
        # prediction is equal to the maximum likelihood class
        prediction=-1
        curr_max=-1e300;
        
        for i in self.classes:
            p = self.__log_gaussian_pdf(x,self.mle_mu_map[i],self.mle_sigma_map[i],
                                       sigma_new_I_given = self.sigma_new_inverses[i],
                                       logdet_given = self.logdets[i],
                                       compute_inv_and_det = False) + np.log(self.class_prob_map[i])
            if(p>curr_max):
                curr_max=p
                prediction=i
        return prediction
                        
    def test_model(self,test_data,labels):
        misses=0        
        # slice the data based on the most significant features
        test_data = test_data[:,self.topN]
        self.test_size = len(test_data)
        results = {'Test Error': [], 'Training Size': [], 'Testing Size': [], 'Predicted': [], 'Actual': []}
        results['Training Size'] = self.train_size
        results['Testing Size'] = self.test_size

        for i in range(len(test_data)):
            pred = self.predict(test_data[i])
            actual = labels[i]
            
            results['Predicted'] += [pred]
            results['Actual'] += [actual]
            
            if(pred != actual):
                misses=misses+1
                
        results['Test Error'] = misses*1.0/len(test_data)
        return results


#mat = spio.loadmat('hw1data.mat', squeeze_me=True)
#image_matrix=mat['X']
#label_array=mat['Y']   

#tstart = time.time()
    
#gaussian_model= GaussianProbabilisticModel()
#gaussian_model.train_model(image_matrix[:3000,:],label_array[:3000])
#tmid = time.time()
#print('Training complete')
#print(gaussian_model.test_model(image_matrix[3001:3301,:],label_array[3001:3301]))

#tend = time.time()
#print("Training time: " + str(tmid-tstart))
#print("Testing time: " + str(tend-tmid))
#print("Total time taken: " + str(tend-tstart))



In [4]:
#kNN Classifier

def EuclideanDistance(x,y):
    return np.linalg.norm(np.array(y)-np.array(x))

def L1Distance(x,y):
    return np.sum(abs(np.array(y)-np.array(x)))

def LinfDistance(x,y):
    return max(abs(np.array(y)-np.array(x)))

class kNNModel:
    
    def __init__(self, dist_function):
        self.dist_function = dist_function
        self.topN_features = 200
        
    # computes the distance from x to each element in lst, which is a list of vectors
    def __dist(self, x, lst, lst_labels):
        dists = [self.dist_function(x,e) for e in lst]
        vec_label_dist_tuple = list(zip(lst,lst_labels,dists))
        return vec_label_dist_tuple
                
    def train_model(self,training_data,labels):
        # k will be log(# of data points) rounded to the nearest integer
        self.k = int(np.log(len(training_data)) + 0.5)
        
        # Step 1.1 - find the MLE of mu and sigma for each feature over the entire dataset
       
        for i in range(len(training_data)):
            if (i == 0):
                feature_mle_mu = training_data[i]
                self.dim = (training_data[i].shape)[0]
            else:
                feature_mle_mu += training_data[i]
            
        feature_mle_mu = feature_mle_mu * 1.0/len(training_data)
        
        for i in range(len(training_data)):
            x_minus_mu_feature = training_data[i]-feature_mle_mu
            if (i == 0):
                feature_mle_sigma = x_minus_mu_feature**2
            else:
                feature_mle_sigma += x_minus_mu_feature**2
                
        feature_mle_sigma = feature_mle_sigma * 1.0/len(training_data)
        
        # Step 1.2 - find the N=200 most significant features (features exhibiting the largest variance)
        keys = np.linspace(0,self.dim-1,self.dim)
        feature_var = list(zip(keys,feature_mle_sigma))
        
        feature_var_sorted = sorted(feature_var, key=lambda f: f[1], reverse = True)
        self.topN = [int(e[0]) for e in feature_var_sorted[:self.topN_features]]  
        
        # Step 1.3 - slice the training data on the most significant features
        training_data = training_data[:,self.topN]
        
        self.labels = labels
        self.data = training_data.tolist()
        self.train_size = len(training_data)
        
    def predict(self,x):
        vec_label_dist_tuple = self.__dist(x,self.data,self.labels)
        # sort by distance
        vec_label_dist_tuple_sorted = sorted(vec_label_dist_tuple, key=lambda f: f[2])
        # get an array containing (label,distance) tuples
        k_closest = vec_label_dist_tuple_sorted[:self.k]
        # group by frequency of label
        count_dic = {}
        for tup in k_closest:
            if tup[1] not in count_dic:
                count_dic[tup[1]] = 1.0
            else:
                count_dic[tup[1]] += 1.0
        
        prediction = max(count_dic, key=count_dic.get)
        return prediction          
        
    def test_model(self,test_data,labels):
        misses=0        
        # slice the data based on the most significant features
        test_data = test_data[:,self.topN]
        self.test_size = len(test_data)
        results = {'Test Error': [], 'Training Size': [], 'Testing Size': [], 'Predicted': [], 'Actual': []}
        results['Training Size'] = self.train_size
        results['Testing Size'] = self.test_size
        
        for i in range(len(test_data)):
            pred = self.predict(test_data[i])
            actual = labels[i]
            
            results['Predicted'] += [pred]
            results['Actual'] += [actual]
            
            if(pred != actual):
                misses += 1
                
        results['Test Error'] = misses*1.0/len(test_data)
        return results
    
mat = spio.loadmat('hw1data.mat', squeeze_me=True)
image_matrix=mat['X']
label_array=mat['Y']   

tstart = time.time()
#kNN_model_L2 = kNNModel(EuclideanDistance)
#kNN_model_L2.train_model(image_matrix[:1000,:],label_array[:1000])
#print(kNN_model_L2.test_model(image_matrix[1001:1101,:],label_array[1001:1101]))
#print("L2 Test error: " + str(L2_test_error))
#tend = time.time()
#print("Total time taken: " + str(tend-tstart))

#kNN_model_L1 = kNNModel(L1Distance)
#kNN_model_L1.train_model(image_matrix[:1000,:],label_array[:1000])
#print(kNN_model_L1.test_model(image_matrix[1001:1101,:],label_array[1001:1101]))
#print("L1 Test error: " + str(L1_test_error))

#kNN_model_Linf = kNNModel(LinfDistance)
#kNN_model_Linf.train_model(image_matrix[:1000,:],label_array[:1000])
#print(kNN_model_Linf.test_model(image_matrix[1001:1101,:],label_array[1001:1101]))
#print("Linf Test error: " + str(Linf_test_error))

In [None]:

def shuffle_and_split(data,partition):
    ran_order = np.arange(len(data['X']))
    np.random.shuffle(ran_order)
    ran_order_training = ran_order[:(int)(len(data['X'])*partition)] 
    ran_order_test = ran_order[(int)(len(data['X'])*partition):] 
    training_data = data['X'][ran_order_training]
    training_label = data['Y'][ran_order_training]
    test_data = data['X'][ran_order_test]
    test_label = data['Y'][ran_order_test]
    return [training_data,training_label,test_data,test_label]


mat = spio.loadmat('hw1data.mat', squeeze_me=True)
mat['X']=mat['X'][0:2000,:]
mat['Y']=mat['Y'][0:2000]
K=[]
knn_training_error=[]
knn_test_error=[]
gaussian_training_error=[]
gaussian_test_error=[]
gaussian_model= GaussianProbabilisticModel()
kNN_model_L2 = kNNModel(EuclideanDistance)
for k in range(40,100,10):
    gaussian_model= GaussianProbabilisticModel()
    kNN_model_L2 = kNNModel(EuclideanDistance)
    K.append(k/100)
    [training_data,training_label,test_data,test_label]=shuffle_and_split(mat,k/100)
    kNN_model_L2.train_model(training_data,training_label)
    gaussian_model.train_model(training_data,training_label)
    knn_training_error.append(kNN_model_L2.test_model(training_data,training_label))
    knn_test_error.append(kNN_model_L2.test_model(test_data,test_label))
    gaussian_training_error.append(gaussian_model.test_model(training_data,training_label))
    gaussian_test_error.append(gaussian_model.test_model(test_data,test_label))


In [24]:
import matplotlib.pyplot as plt
K=[40,50,60,70,80,90]
knn_training_error=[e['Test Error'] for e in knn_training_error ]
knn_test_error=[e['Test Error'] for e in knn_test_error ]
gaussian_training_error=[e['Test Error'] for e in gaussian_training_error ]
gaussian_test_error=[e['Test Error'] for e in gaussian_test_error ]
print(knn_training_error)
plt.plot(K,knn_training_error,label="kNN-Training Error")
plt.plot(K,knn_test_error,label="kNN-Testing Error")
plt.plot(K,gaussian_training_error,label="kNN-Training Error")
plt.plot(K,gaussian_test_error,label="kNN-Testing Error")
plt.xlabel("Model complexity->")
plt.ylabel("Error->")
plt.gca().set_ylim([0,1])
lt.gcf().set_size_inches(18,8)
plt.legend()
plt.show()

TypeError: 'float' object is not subscriptable

In [None]:
[training_data,training_label,test_data,test_label]=shuffle_and_split(mat,0.8)
l2_test_error=[]
l1_test_error=[]
linf_test_error=[]
K=[]
for k in range(1,10):
    K.append(k)
    kNN_model_L2 = kNNModel(EuclideanDistance)
    kNN_model_L1 = kNNModel(L1Distance)
    kNN_model_Linf = kNNModel(LinfDistance)
    kNN_model_L2.train_model(training_data,training_label)
    kNN_model_L1.train_model(training_data,training_label)
    kNN_model_Linf.train_model(training_data,training_label)
    l2_test_error.append(kNN_model_L2.test_model(test_data,test_label))
    l1_test_error.append(kNN_model_L1.test_model(test_data,test_label))
    linf_test_error.append(kNN_model_L2.kNN_model_Linf(test_data,test_label))

l2_test_error=[e['Test Error'] for e in l2_test_error ]
l1_test_error=[e['Test Error'] for e in l1_test_error ]
linf_test_error=[e['Test Error'] for e in linf_test_error ]
plt.plot(K,l2_test_error,label="kNN-L2")
plt.plot(K,l1_test_error,label="kNN-L1")
plt.plot(K,linf_test_error,label="kNN-Linf")
plt.xlabel("K(nearest neighbours)->")
plt.ylabel("Error->")
plt.gca().set_ylim([0,1])
plt.gca().set_xlim([0,1])
plt.gcf().set_size_inches(18,8)
plt.title("Training Error for different distance metrics->")
plt.legend()
plt.show()