In [42]:
import numpy as np
import pandas as pd
import cvxpy as cp
import numpy.linalg as la
import random

from pandas.io.pytables import performance_doc

class MyClassifier_25:  

    def __init__(self,dataset,class1:int,class2:int) -> None:
        self.w = None
        self.b = None
        self.classes = {1 : class1, -1: class2, 0:None}
        self.dataset_train = dataset

        #data prep
        self.trainlabel,self.traindata = self.prepare_binary(self.dataset_train)
        
        #train the classfier 
        self.train(self.traindata,self.trainlabel)

        
    
    def prepare_binary(self,dataset):

        #USAGE    
        # Since we have to deal with a binary classifier to diffrentiate between digits 7 and 1, 
        # we choose only those examples.
        # If asked to train a classifier on any other pair a, b (say),
        # please pass the right arguments to the following function as follows:
        # trainlabel, traindata, dataTargetDf = prepare_binary(a,b)


        # We now assign +1 to one class and -1 to the other;
        # Therefore +1 and -1 will be the new labels
        class1 = self.classes[1]
        class2 = self.classes[-1]

        trainlabel = dataset.loc[(dataset['label']== class1)  | (dataset['label']== class2) ]['label']
        trainlabel.loc[trainlabel == class1] = 1
        trainlabel.loc[trainlabel == class2] = -1
        trainlabel = trainlabel.to_numpy()
    
        #In order to match dimensions of "traindata" and "trainlabel", we convert trainlabel to two dimension array
        # for hinge loss
        trainlabel= np.reshape(trainlabel, (trainlabel.shape[0],1))   

        # We now extract the features for the two classes
        traindata = dataset.loc[(dataset['label']== class1)  | (dataset['label']== class2) ]
        traindata = traindata.drop(labels = ["label"],axis = 1).to_numpy()

        # print(traindata.shape[1])



        return trainlabel, traindata

    def target_df(self,traindata,trainlabel):
        # Also creating a dataframe with these, so that we can randomize the order of the train data when needed without
        # losing the mapping between feature vectors and the target labels
        trainDf=pd.DataFrame(traindata)
        targetDf=pd.DataFrame(trainlabel,columns=['target'])
        
        dataTargetDf = pd.concat([trainDf, targetDf[['target']]], axis = 1)
        ##If randomizing the order, should we use the dataframe 'finalDf'?
        return dataTargetDf

    def subset(self,dataTargetDf, subsetfrac:float):
        
        # Usage: If 20% of the data is to be randomly selected
        # subsetDf = subset(dataTargetDf, 0.2)
        
        return dataTargetDf.sample(frac = subsetfrac)

    def sample_selection(self,training_sample):
        pass
    
    def _hinge_loss_svm(self,traindata, trainlabel,W,w):
        m = traindata.shape[1]
        # Equation for the regularizer.
        # It is the lambda*(norm2 of W)**2
        # Here "lambda" is a non negative constant
        lambd = cp.Parameter(nonneg=True)

        ## Ideally we will have to try using different values fro "lambda"
        ## For the sake of testing the code, we have set it to 0.01
        ## Do we need to have a lambda?
        lambd = 0.01 
        reg_loss = cp.norm(W,p=2)**2
        
        #hinge loss
        hinge_loss = cp.sum(cp.pos(1-cp.multiply(trainlabel,traindata @ W - w)))
        

        
        #Objective is to minimize reg_loss and hinge_loss
        # objective_func = cp.Minimize(hinge_loss/m + lambd*reg_loss)
        prob = cp.Problem(cp.Minimize(hinge_loss/m + lambd*reg_loss))
        # Now framing the LP, along with the constraints
        return prob

    def _normal_loss_svm(self,traindata,trainlabel, W,w):
        #Constraint
        # For every feature vector traindata[i] and its corresponding label trainlabel[i]:
        # W^T*traindata[i] + w >= 1
        const = [trainlabel[i]*(traindata[i]@ W + w) >= 1 for i in range(traindata.shape[0])]
        ##Check the dimensions in the above constraint equation
        
        #Objective is to minimize reg_loss and hinge_loss
        # objective_func = cp.Minimize(hinge_loss/m + lambd*reg_loss)
        objective_func = cp.Minimize(0.5*cp.norm(W,p=2)**2)
        prob = cp.Problem(objective_func,constraints=const)
        # Now framing the LP, along with the constraints
        return prob

    def train(self,traindata,trainlabel):
        
        #USAGE
        # W, w = train(traindata, trainlabel)

        # m: Number of feature vectors
        # W and w: Weight vector and Bias value respectively
        print(traindata.shape)
        m = traindata.shape[1]
        W = cp.Variable((m,1))
        w = cp.Variable()

        
        prob = self._hinge_loss_svm(traindata,trainlabel,W,w)

        prob.solve()
        
        # Solving the problem would give us the optimal values from W and w;
        # which have to be returned, so that we can use them while testing

        ## adding to class variable
        self.w = W
        self.b = w
        

    def f(self,test_input):
        test_val = test_input.dot(self.w.value) -  self.b.value
        if test_val < -1:
            test_val= -1
        elif test_val > 1:
            test_val = 1
        else:
            test_val = 0 
            #it should classify the points in the P2 region as well, code should be modified to always return either 1 or 0)
        estimated_class = self.classes.get(test_val)
        return estimated_class
    
    def assess_classifier_performance(self,performance):
        performance = np.asarray(performance)
        correct = (np.count_nonzero(performance)/len(performance))*100
        return correct

    def test(self,dataset_test):
        testlabel,testdata= self.prepare_binary(dataset_test)
        res = []
        performance = []
        for i in range(testdata.shape[0]):
            result = self.f(testdata[i])
            res.append(result)
            
            actual_class = self.classes.get(int(testlabel[i]))
            
            if result == actual_class:
                performance.append(1)
            else:
                performance.append(0)
                # we have return only the results as per the description 
        return res, performance
    
    def plot_classifier_performance_vs_number_of_samples(self):
        pass

In [69]:
#train1 = label1, train2 = label7, train3 = label 1 + 7 

df = pd.read_csv("D:\ip\mnist_train.csv") 
label1data = df.loc[df['label'] == 1] #all rows corresponding to label1
label2data = df.loc[df['label'] == 7] #all rows corresponding to label7
label3data = df.loc[df['label'].isin([1,7])]
train1_count = len(label1data)
train2_count = len(label2data)
train3_count = len(label3data)
train1_data = label1data.drop(columns = ['label']) #dropping the column1 consisting of labels
train2_data = label2data.drop(columns = ['label'])
train3_data = label3data.drop(columns = ['label'])

c1 = train1_data.sum()/train1_count #centroid = sum of each component of all vectors/number
c2 = train2_data.sum()/train2_count
c3 = train3_data.sum()/train3_count

dist12 = la.norm(c1-c2) #dist between centroids of 1 and 2
dist13 = la.norm(c1-c3)
dist23 = la.norm(c2-c3)

print(dist12, dist13, dist23)

#value of dist12, dist13, dist23 = (1369.2509985955885 659.5185289614332 709.7324696341552)


1369.2509985955885 659.5185289614332 709.7324696341552


In [272]:
#np.array(train2_data.head(1))-np.array(train1_data.head(1))
#la.norm(np.subtract(train2_data.to_numpy(),np.array(train1_data.head(1))),axis=1)

dist = []

def func_min_dist_p2_point(x):
    #z = np.subtract(train2_data.to_numpy(),np.array(x))
    #print(z.shape)
    dist_min = np.min(la.norm(np.subtract(train2_data.to_numpy(),np.array(x)),axis=1))
    #dist_min = np.min(la.norm(y, axis = 1))
    return dist_min

def dist_p1_centroid(x):
    z = np.subtract(np.array(x[:-1]),np.array(c1))
    return(la.norm(z))

def dist_p2_centroid(x):
    z = np.subtract(np.array(x[:-2]),np.array(c2))
    return(la.norm(z))
    

x = train1_data[:5000]
y = x.copy()
y['Dist_other'] = y.apply(func_min_dist_p2_point,axis=1)
y['Dist_own_centroid'] = y.apply(dist_p1_centroid,axis=1)
#y['Dist_other_centroid'] = y.apply(dist_p2_centroid,axis=1)

print(x.shape)
print(y.shape)

selected_train1_logic1 = y.loc[y['Dist_other'] < y['Dist_own_centroid']]
#selected_train1_logic2 = y.loc[y['Dist_other_centroid'] < y['Dist_own_centroid']]

print(selected_train1_logic1.shape)
print(selected_train1_logic2.shape)


(5000, 784)
(5000, 786)
(2633, 786)
(8, 787)


In [303]:
def func_min_dist_point_to_label2(point):     
    dist_min = np.min(la.norm(np.subtract(train2_data.to_numpy(),np.array(point)),axis=1))
    return dist_min

def func_min_dist_point_to_label1(point):     
    dist_min = np.min(la.norm(np.subtract(train1_data.to_numpy(),np.array(point)),axis=1))
    return dist_min

def dist_to_centroid1(point):
    z = np.subtract(np.array(point[:len(c1)]),np.array(c1))
    return(la.norm(z))

def dist_to_centroid2(point):
    z = np.subtract(np.array(point[:len(c2)]),np.array(c2))
    return(la.norm(z))


def select_func(big_data_set,label):
    if label == 1:
        f = func_min_dist_point_to_label2
        g = dist_to_centroid1
    else:
        f = func_min_dist_point_to_label1
        g = dist_to_centroid2
    
    #big_data_set['Dist_other'] = big_data_set.apply(f,axis=1)
    #big_data_set['Dist_own_centroid'] = big_data_set.apply(g,axis=1)
    y = big_data_set.copy()
    y['Dist_other'] = y.apply(f,axis=1)
    y['Dist_own_centroid'] = y.apply(g,axis=1)
    selected_train_logic1 = y.loc[y['Dist_other'] < y['Dist_own_centroid']]
    return(selected_train_logic1)
    

x = select_func(train1_data[:200],1)
y = select_func(train2_data[:200],2)

print(type(x))
print(len(y))


<class 'pandas.core.frame.DataFrame'>
40


In [305]:
print(type(x))
a = x.copy()
b = y.copy()
d = select_func(x,1)
g = select_func(y,1)

print(len(d))
print(len(g))

<class 'pandas.core.frame.DataFrame'>


ValueError: operands could not be broadcast together with shapes (6265,784) (786,) 