In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import random
#from memory_profiler import profile

In [3]:
'''Data Cleaning'''
df = pd.read_csv("/Users/anushkahegde/Desktop/NEU/IE_7374_Machine_Learning/ml_project/Data_Set/Vehicle_Coupon.csv")
df.drop(['car', 'direction_same', 'toCoupon_GEQ5min'], axis=1, inplace=True)
df['temperature'] = df['temperature'].astype(str)
df = df.apply(lambda x: x.fillna(x.value_counts().index[0]))

In [4]:
'''Create dummies and split data'''
df_ohe = pd.get_dummies(df)
X, y = df_ohe.drop(['Y'], axis=1), df_ohe['Y']
y = np.where(y==0, -1, 1)

In [5]:
X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                                                        test_size=0.3,                                                                          
                                                                        random_state = 1)
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, 
                                                                        test_size=0.3,                                                                          
                                                                        random_state = 1)

X_train, X_valid, X_test = np.asarray(X_train), np.asarray(X_valid), np.asarray(X_test)
y_train, y_valid, y_test = np.asarray(y_train), np.asarray(y_valid), np.asarray(y_test)

In [6]:
class SMO():
    def __init__(self, max_iter=100, kernel_type='linear', C=1.0, epsilon=1e-4):
        self.kernels = {
            'linear' : self.kernel_linear,
            'poly' : self.kernel_poly,
            'rbf' : self.kernel_rbf
        }
        self.max_iter = max_iter
        self.kernel_type = kernel_type
        self.C = C
        self.epsilon = epsilon

    def fit(self, X, y):
        n, d = X.shape[0], X.shape[1]
        alpha = np.zeros((n))
        kernel = self.kernels[self.kernel_type]
        count = 0
        while True:
            count += 1
            alpha_prev = np.copy(alpha)
            for j in range(0, n):
                i = self.get_rnd_int(0, n-1, j) # Get random int i~=j
                x_i, x_j, y_i, y_j = X[i,:], X[j,:], y[i], y[j]
                k_ij = kernel(x_i, x_i) + kernel(x_j, x_j) - 2 * kernel(x_i, x_j)
                if k_ij == 0:
                    continue
                alpha_prime_j, alpha_prime_i = alpha[j], alpha[i]
                (L, H) = self.compute_L_H(self.C, alpha_prime_j, alpha_prime_i, y_j, y_i)

                # Compute model parameters
                self.w = self.calc_w(alpha, y, X)
                self.b = self.calc_b(X, y, self.w)

                # Compute E_i, E_j
                E_i = self.E(x_i, y_i, self.w, self.b)
                E_j = self.E(x_j, y_j, self.w, self.b)

                # Set new alpha values
                alpha[j] = alpha_prime_j + float(y_j * (E_i - E_j))/k_ij
                alpha[j] = max(alpha[j], L)
                alpha[j] = min(alpha[j], H)

                alpha[i] = alpha_prime_i + y_i*y_j * (alpha_prime_j - alpha[j])

            # Check convergence
            diff = np.linalg.norm(alpha - alpha_prev)
            if diff < self.epsilon:
                break
            #print(count)
            if count >= self.max_iter:
                print("Iteration number exceeded the max of %d iterations" % (self.max_iter))
                return
        self.b = self.calc_b(X, y, self.w)
        if self.kernel_type == 'linear':
            self.w = self.calc_w(alpha, y, X)
        # Get support vectors
        alpha_idx = np.where(alpha > 0)[0]
        support_vectors = X[alpha_idx, :]

        return support_vectors, count

    def predict(self, X):
        return self.h(X, self.w, self.b)

    def calc_b(self, X, y, w):
        b_tmp = y - np.dot(w.T, X.T)
        return np.mean(b_tmp)

    def calc_w(self, alpha, y, X):
        return np.dot(X.T, np.multiply(alpha,y))

    def h(self, X, w, b):
        return np.sign(np.dot(w.T, X.T) + b).astype(int)

    def E(self, x_k, y_k, w, b):
        return self.h(x_k, w, b) - y_k

    def compute_L_H(self, C, alpha_prime_j, alpha_prime_i, y_j, y_i):
        if(y_i != y_j):
            return (max(0, alpha_prime_j - alpha_prime_i), min(C, C - alpha_prime_i + alpha_prime_j))
        else:
            return (max(0, alpha_prime_i + alpha_prime_j - C), min(C, alpha_prime_i + alpha_prime_j))

    def get_rnd_int(self, a,b,z):
        i = z
        cnt=0
        while i == z and cnt<1000:
            i = random.randint(a,b)
            cnt=cnt+1
        return i

    def kernel_linear(self, x, z):
        return np.dot(x, z.T)

    def kernel_poly(self, x, z):
        return (np.dot(x, z.T) ** 2)

    def kernel_rbf(self, x, z, sigma=1):
        return np.exp(- (np.linalg.norm(x - z, 2)) ** 2 / (2 * sigma ** 2))

    def predict_proba(self, X):
        return np.dot(self.w.T, X.T) + self.b


In [7]:
'''Epochs'''

epochs = [100, 300, 500]
epoch_list = []
for iter in epochs: 
  svm = SMO(max_iter = iter, kernel_type='linear', C=5)
  svm.fit(X_train, y_train)
  y_hat_valid = round(accuracy_score(y_valid, svm.predict(X_valid)), 4)
  pdata = [iter, 'linear', 5, y_hat_valid]
  epoch_list.append(pdata)

epoch_list

Iteration number exceeded the max of 100 iterations


[[100, 'linear', 5, 0.6655],
 [300, 'linear', 5, 0.6693],
 [500, 'linear', 5, 0.6685]]

In [9]:
smo_epoch_tuning = pd.DataFrame(epoch_list, columns = ['Learning rate','Kernel type','No. of Iterations','Validation Accuracy'])

In [10]:
smo_epoch_tuning.to_csv("SMO_epoch_tuning", encoding='utf-8', index=False)

In [11]:
'''C'''

C = [1, 5, 10]
C_list = []
for c in C: 
  svm = SMO(max_iter = 100, kernel_type='linear', C=c)
  svm.fit(X_train, y_train)
  y_hat_valid = round(accuracy_score(y_valid, svm.predict(X_valid)), 4)
  pdata = [100, 'linear', c, y_hat_valid]
  C_list.append(pdata)

C_list