In [3]:
import numpy as np
from math import e
import math
import random
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import copy
import cvxopt as cvo
import copy
from cvxopt import matrix, solvers

In [36]:


class SVM_Poly():
    def __init__(self, exponent = 1, upper_limit = 0):
        self.thresh = 1.0e-5
        self.exponent = exponent
        self.upper_limit = upper_limit
        #suppress output
        cvo.solvers.options['show_progress'] = False

    def set_exponent(self, Q):
        self.exponent = Q

    def set_upper_limit(self, C):
        self.upper_limit = max(0, C)

    def kernel_calc(self, X2):
        #X2 = inputs
        #polynomial kernel (1+xnT*xm)^Q
        kernel = np.power(np.add(1, np.dot(self.X,X2.T)), self.exponent)
        return kernel

    def get_constraints(self, num_ex):
        #soft margin
        if self.upper_limit > 0:
            #make constraints matrix G, h being passed number of examples
            #-alphas <= 0
            G1 = np.multiply(-1, np.eye(num_ex))
            #alphas <= c
            G2 = np.eye(num_ex)
            G = np.vstack((G1, G2))
            h1 = np.zeros(num_ex)
            h2 = np.ones(num_ex)*self.upper_limit
            h = np.hstack((h1, h2))
            return cvo.matrix(G), cvo.matrix(h)
        else:
            #hard margin
            G = cvo.matrix(np.multiply(-1, np.eye(num_ex)))
            # h = 0
            h = cvo.matrix(np.zeros(num_ex))
            return G, h
    def ayK(self, Xin):
        #get the value of sum(alpha_n > 0) {alpha_n * y_n * K(x_n, input)}
        k_calc = self.kernel_calc(Xin)
        pre_sum = np.multiply(self.alphas, np.multiply(self.Y, k_calc))
        post_sum = np.sum(pre_sum, axis=0)
        return post_sum
        
        
    def predict(self,Xin):
        post_sum = np.add(self.ayK(Xin), self.bias)
        return post_sum

    
    def calc_error(self, Xin,Yin):
        num_ex = Xin.shape[0]
        predicted = np.sign(self.predict(Xin))
        num_incorrect = np.sum(np.not_equal(predicted, np.sign(Yin)))
        prop_incorrect = float(num_incorrect)/float(num_ex)
        return prop_incorrect


    def train(self,X,Y):
        #expecting X as Nxd matrix and Y as a Nx1 matrix
        #note: no reshaping for X
        X = X.astype(float)
        Y = Y.astype(float)
        self.X = X
        num_ex, cur_dim = X.shape
        self.Y = Y.reshape((num_ex, 1))
        k_calc = self.kernel_calc(X)
        q = cvo.matrix(np.multiply(-1, np.ones((num_ex,1))))
        P = cvo.matrix(np.multiply(np.outer(Y, Y), k_calc))
        A = cvo.matrix(Y.reshape(1, num_ex), tc='d')
        b = cvo.matrix(0.0)
        G, h = self.get_constraints(num_ex)
        cvo_sol = cvo.solvers.qp(P,q,G,h,A,b)
        alphas = np.ravel(cvo_sol['x'])
        alphas_thresh = np.greater_equal(alphas,self.thresh)
        sv_idx = np.argmax(alphas_thresh)
        self.alphas = alphas.reshape((num_ex, 1))
        self.num_alphas = np.sum(alphas_thresh)
        self.bias = Y[sv_idx] - self.ayK(X[sv_idx])
        


class SVM_RBF(SVM_Poly):
    def __init__(self, gamma = 1.5, upper_limit = 0):
        self.thresh = 1.0e-5
        self.gamma = gamma
        self.upper_limit = upper_limit
        #suppress output
        cvo.solvers.options['show_progress'] = False


    def kernel_calc(self, Xin):
        if len(Xin.shape) == 1:
            Xin = Xin.reshape((1, Xin.shape[0]))
        cur_m = self.X.shape[0]
        cur_n = Xin.shape[0]
        ret = np.ndarray((cur_m, cur_n))
        if self.X.shape[1] == Xin.shape[1]:
            for i in range(cur_m):
                for j in range(cur_n):
                    ret[i][j] = np.exp(-1 * self.gamma * np.linalg.norm(self.X[i] - Xin[j]))
        if ret.shape[0] == 1 and ret.shape[1] == 1:
            return ret[0][0]
        else:
            return ret

In [29]:
class Lloyd:
    def calc_cluster_centers(self):
        #return true if all clusters nonempty, else false
        nonempty = True # if all clusters nonempty 
        for k,cluster in enumerate(self.cluster):
            if len(cluster) <= 0:
                nonempty = False
                break
            else:
                cur_cluster = self.X[cluster]
                self.cluster_centers[k] = np.average(cur_cluster, axis=0)
        return nonempty
            
            
    def assign_clusters(self):
        #returns if cluster membership changed or not
        changed = False
        #hopefully X is two-dim or else this breaks
        #iterate over X
        for n, xn in enumerate(self.X):
            cur_cluster = self.X_cluster[n]
            dest_cluster = cur_cluster #cluster that current xn ends up in
            shortest_dist = np.linalg.norm(self.cluster_centers[cur_cluster]-xn) #dist of xn from current cluster
            #iterate over clusters
            for l, cluster in enumerate(self.cluster_centers):
                cur_dist = np.linalg.norm(cluster - xn) #dist of xn from iterated cluster
                if cur_dist < shortest_dist:
                    dest_cluster = l
                    shortest_dist = cur_dist
            if cur_cluster != dest_cluster:
                self.cluster[cur_cluster].remove(n)
                self.cluster[dest_cluster].append(n)
                self.X_cluster[n] = dest_cluster
                changed = True
        return changed
                
    
    def init_clusters(self):
        self.cluster = [[] for x in range(self.k)]
        self.cluster[0] = [x for x in range(self.X_n)] #stick all in the first cluster for now
        #listing of cluster membership by elts of X
        self.X_cluster = [0 for x in range(self.X_n)]
        #cluster centers
        self.cluster_centers = np.random.uniform(self.rng[0], self.rng[1], (self.k, self.X_dim))
        self.assign_clusters()

    def set_X(self,X):
        #X = dataset, should be m x n np array
        self.X = X
        self.X_n = X.shape[0]
        if len(X.shape) == 1:
            self.X_dim = 1
        else:
            self.X_dim = X.shape[1]
        self.init_clusters()


    def __init__(self, X, k, rng):
        #k = number of clusters
        self.k = max(1, int(k))
        #rng = range of allowed center coords as an array
        self.rng = rng
        self.set_X(X)


    def set_k(self,k):
        if k != self.k:
            self.k = max(1, int(k))
            self.init_clusters()

    def run(self):
        runs = 1 #number of runs executed
        while True:
            while True:
                nonempty = self.calc_cluster_centers()
                if nonempty == True:
                    break
                else:
                    self.init_clusters()
                    runs = runs + 1
            changed = self.assign_clusters()
            if changed == False:
                break
        return runs

#h(x) = sign (sum(n=1;N) {wn * exp (-gamma * ||x-muk||^2)} + b)
#elts of phi matrix = exp(-gamma ||xi-muj||^2)

#this will be with a bias term so we need to reshape phi
class RBF:
    def set_X(self, X):
        self.lloyd.set_X(X)
        self.lloyd.run()

    def set_Y(self, Y):
        self.Y = Y

    def set_k(self, k):
        self.k = k
        self.lloyd.set_k(k)
        self.lloyd.run()

    def set_gamma(self, g):
        self.gamma = g
 
    def kernel_calc(self, Xin):
        #calculates exp( - gamma * ||Xin - mu||^2)
        if len(Xin.shape) == 1:
            Xin = Xin.reshape((1, Xin.shape[0]))
        cur_m = Xin.shape[0]
        cur_n = self.lloyd.cluster_centers.shape[0]
        ret = np.ndarray((cur_m, cur_n))
        if Xin.shape[1] == self.lloyd.cluster_centers.shape[1]:
            for i in range(cur_m):
                for j in range(cur_n):
                    ret[i][j] = np.exp(-1 * self.gamma * np.linalg.norm(Xin[i] - self.lloyd.cluster_centers[j]))
        if ret.shape[0] == 1 and ret.shape[1] == 1:
            return ret[0][0]
        else:
            return ret
               
    def __init__(self, gamma, X, Y, k, rng):
        #k = k centers for anticipated lloyd's algo
        #rng - 2-dim array of anticipated range allowable 
        self.gamma = gamma
        self.k = k
        self.rng = rng
        self.lloyd = Lloyd(X, k, rng)
        self.lloyd.run()
        self.Y = Y

    def train(self):
        phi = self.kernel_calc(self.lloyd.X)
        phi_n = phi.shape[0]
        #reshaping to get bias term
        phi_res = np.c_[np.ones(phi_n), phi]
        phi_pinv = np.linalg.pinv(phi_res)
        weights = np.matmul(phi_pinv, self.Y)
        self.bias = weights[0]
        self.weights = weights[1:]

    def calculate_preds(self, Xin):
        k_calc = self.kernel_calc(Xin)
        w_k = np.multiply(self.weights, k_calc)
        wk_sum = np.add(np.sum(w_k, axis=1), self.bias)
        return np.sign(wk_sum)
            
    def calc_error(self, Xin,Yin):
        num_ex = Xin.shape[0]
        predicted = np.sign(self.predict(Xin))
        num_incorrect = np.sum(np.not_equal(predicted, np.sign(Yin)))
        prop_incorrect = float(num_incorrect)/float(num_ex)
        return prop_incorrect

        
        
        
 


In [43]:


        
class create_test():
    def __init__(self,N,k=10,gamma=1.5):

        
        

        self.N = N
        self.X = np.random.uniform(-1.0,1.0,(self.N,2))
        self.y = np.array(self.find_actual_y(self.X))
        
        my_svm = SVM_RBF(gamma=gamma)
        my_svm.train(self.X,self.y)
        
        my_rbf = RBF(k=k,X=self.X,Y=self.y,rng=[-1,1],gamma=gamma)
        my_rbf.train()
        
        self.EinSVM = my_svm.calc_error(self.X,self.y)
        X = np.random.uniform(-1.0,1.0,(1000,2))
        y = self.find_actual_y(X)
        self.EoutSVM = my_svm.calc_error(X,y)
        self.EinRBF = self.Ein_error(my_rbf)
        self.EoutRBF = self.Eout_error(my_rbf)

        #self.Print()
        
    def Print(self):
        print("RBF Ein Error %s" % self.EinRBF)
        print("RBF Eout Error %s" % self.EoutRBF)
        print("SVM Ein Error %s" % self.EinSVM)
        print("SVM Eout Error %s" % self.EoutSVM)
    
    def find_actual_y(self,X):
        return np.sign(X[:,1]-X[:,0]+.25*np.sin(np.pi*X[:,0]))
    
    def add_thresholdCol(self,X):
        return np.concatenate([[[1]for x in range(len(X))],X],axis=1)  
        
    def Ein_error(self,my_model):
        preds = my_model.calculate_preds(self.X)
        return np.count_nonzero(preds!=self.y)/self.N
    
    def Eout_error(self,my_model):
        X = np.random.uniform(-1.0,1.0,(1000,2))
        y = self.find_actual_y(X)
        preds = my_model.calculate_preds(X)
        return np.count_nonzero(preds!=y)/1000
    

In [44]:

Wins = []
for x in range(1):
    test = create_test(100,k=9)
    print(test.EoutSVM,test.EoutRBF)
    Wins.append(test.EoutSVM<test.EoutRBF)
print(Wins)


0.017 0.074
[True]


In [180]:
Eins = []
for x in range(1):
    obj = create_test(100)
    Eins.append(obj.EinSVM)
print(Eins)


[0.0]


In [45]:

Wins = []
for x in range(25):
    test = create_test(100,k=12)
    print(test.EoutSVM,test.EoutRBF)
    Wins.append(test.EoutSVM<test.EoutRBF)
print(Wins)


0.031 0.068
0.035 0.046
0.03 0.072
0.038 0.057
0.021 0.078
0.041 0.081
0.034 0.091
0.026 0.072
0.031 0.084
0.065 0.098
0.036 0.071
0.037 0.06
0.066 0.093
0.045 0.094
0.037 0.066
0.031 0.084
0.039 0.079
0.028 0.037
0.033 0.034
0.061 0.052
0.029 0.043
0.029 0.034
0.042 0.06
0.043 0.064
0.02 0.063
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, False, True, True, True, True, True]


In [46]:
np.count_nonzero(Wins)/25

0.96

In [365]:
#if kernel is false - high Error because the X space is non linearly separated by weird find y function!

nan

In [50]:

k9 = []
k12 =[]
for x in range(25):
    test = create_test(100,k=9)
    k9.append([test.EinRBF,test.EoutRBF])
    test = create_test(100,k=12)
    k12.append([test.EinRBF,test.EoutRBF])


In [51]:
np.average(k12,axis=0)

array([0.046  , 0.06552])

In [52]:
np.average(k9,axis=0)

array([0.0388 , 0.06492])

In [None]:
k9