In [5]:

import numpy as np
import glob2
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
from random import randrange
import logging
import random

In [6]:
logging.basicConfig(filename='./log.log',level=10, force=True, format='%(asctime)s   %(funcName)s - %(levelname)s:%(message)s')

# Read data

In [7]:
link_cat = r'./PetImages/Cat/**'
link_dog = r'./PetImages/Dog/**'

In [8]:
n_samples = 100
images  = []
labels = []
list_cat = glob2.glob(link_cat)
print(len(list_cat))
for i in range(n_samples):
    if('jpg' in list_cat[i]):
        img = Image.open(list_cat[i]).convert('RGB')
        img = img.resize((400,400), Image.LANCZOS)
        if len(np.array(img).shape)  == 3:
            images.append(np.array(img))
            labels.append(1)
            
list_dog = glob2.glob(link_dog)
print(len(list_dog))
for i in range(n_samples):
    if('jpg' in list_dog[i]):
        img = Image.open(list_dog[i]).convert('RGB')
        img = img.resize((400,400), Image.LANCZOS)
        if len(np.array(img).shape)  == 3:
            images.append(np.array(img))
            labels.append(-1)

X = np.array(images)
y = np.array(labels)

for index in range(len(images)):
    if images[index].shape[2] != 3:
        print(index, images[index].shape[2])

12501
12501


In [9]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [10]:
x_train = np.array([x.flatten() for x in X_train])
x_test = np.array([x.flatten() for x in X_test])

# SVM

In [11]:
y_train

array([-1,  1,  1,  1,  1, -1,  1, -1,  1, -1,  1, -1, -1,  1, -1, -1,  1,
       -1, -1, -1,  1,  1,  1, -1, -1,  1, -1,  1, -1,  1,  1, -1, -1,  1,
        1, -1, -1,  1,  1, -1,  1,  1, -1, -1, -1, -1,  1,  1, -1, -1, -1,
        1, -1, -1,  1, -1,  1,  1, -1,  1,  1, -1, -1, -1,  1,  1, -1, -1,
        1,  1, -1,  1,  1, -1,  1, -1,  1, -1,  1, -1, -1, -1, -1,  1,  1,
        1,  1, -1,  1,  1, -1, -1,  1,  1,  1,  1, -1, -1,  1,  1, -1, -1,
       -1, -1,  1,  1, -1,  1, -1,  1,  1,  1,  1,  1, -1, -1, -1,  1, -1,
       -1,  1,  1, -1, -1, -1, -1,  1, -1,  1,  1, -1, -1,  1, -1,  1, -1,
        1,  1, -1, -1])

In [12]:
class SVM:
  def __init__(self, C=1.0, gamma=0.1, tol=1e-3, max_iter=1200):
    self.C = C
    self.gamma = gamma
    self.tol = tol
    self.max_iter = max_iter
    self.n_iter = 0
    self.eps = 1e-3

  def rbf(self, x1, x2):
    return np.exp(-self.gamma * (np.linalg.norm(x1 - x2) **2 ))
  
  def get_error(self, i):
    return (self.output(i) - self.y[i]) if self.is_bounded(i) else self.errors[i]
  
  def is_bounded(self, i):
    if 0 < self.alphas[i] and self.alphas[i] < self.C:
      return False
    return True

  def find_non_bound_indexes(self):
    self.non_bound_indexes = [i for i in range(self.n_samples) if not self.is_bounded(i)]

  def output(self, x):
    s = -self.b
    for alpha, sv_y, sv_x in zip(self.alphas, self.y, self.X):
      s += alpha * sv_y * self.rbf(sv_x, x)
    return s
  
  def predict(self, X):
    pred = []
    for x in X:
      pred.append(np.sign(self.output(x)))
    return pred
    
  def compute_L_H(self):
    if self.y1 != self.y2:
      L = max(0, self.alpha2 - self.alpha1)
      H = min(self.C, self.C + self.alpha2 - self.alpha1)
    else:
      L = max(0, self.alpha2 + self.alpha1 - self.C)
      H = min(self.C, self.alpha2 + self.alpha1)
    return L, H
  
  def compute_threshold(self, alpha1_new, alpha2_new, k11, k12, k22):
    b1 = self.E1 + self.y1 * (alpha1_new - self.alpha1) * k11 + \
        self.y2 * (alpha2_new - self.alpha2) * k12 + self.b
    b2 = self.E2 + self.y1 * (alpha1_new - self.alpha1) * k12 + \
        self.y2 * (alpha2_new - self.alpha2) * k22 + self.b

    if 0 < alpha1_new and alpha1_new < self.C:
      new_b = b1
    elif 0 < alpha2_new and alpha2_new < self.C:
      new_b = b2
    else:
      new_b = (b1+b2)/2.0
    return new_b
    
  def update_error_cache(self, alpha1_new, alpha2_new, i1, i2, old_b):
    delta1 = self.y1 * (alpha1_new - self.alpha1)
    delta2 = self.y2 * (alpha2_new - self.alpha2)
    
    for i in range(self.n_samples):
      if not self.is_bounded(i):
        self.errors[i] += delta1 * self.rbf(self.x1, self.X[i]) + \
                          delta2 * self.rbf(self.x2, self.X[i]) + \
                          self.b - old_b
    if not self.is_bounded(i1):
      self.errors[i1] = 0
    if not self.is_bounded(i2):
      self.errors[i2] = 0
    
  def get_alpha2(self, L, H, k11, k12, k22, s):
    eta = k11 + k22 - 2 * k12
    if eta > 0:
      alpha2_new = self.alpha2 + self.y2 * (self.E1-self.E2)/eta
      alpha2_new = min(H, max(L, alpha2_new))
    else:
      logging.warn("ETA <= 0")
      f1 = self.y1*(self.E1 + self.b) - self.alpha1*k11 - s*self.alpha2*k12
      f2 = self.y2*(self.E2 + self.b) - s*self.alpha1*k12 - self.alpha2*k22
      L1 = self.alpha1 + s*(self.alpha2-L)
      H1 = self.alpha1 + s*(self.alpha2-H)
      Lobj = L1*f1 + L*f2 + 0.5*(L1**2)*k11 + 0.5*(L**2)*k22 + s*L*L1*k12
      Hobj = H1*f1 + H*f2 + 0.5*(H1**2)*k11 + 0.5*(H**2)*k22 + s*H*H1*k12
      if Lobj < Hobj - self.eps:
        alpha2_new = L
      elif Lobj > Hobj + self.eps:
        alpha2_new = H
      else:
        alpha2_new = self.alpha2
    return alpha2_new
    
  def take_step(self, i1, i2):
    if i1 == i2:
      logging.debug("i1 = i2, exit")
      return 0
    self.y1 = self.y[i1]
    self.alpha1 = self.alphas[i1]
    self.x1 = self.X[i1]
    self.E1 = self.get_error(i1)
    
    s = self.y1 * self.y2
    L, H = self.compute_L_H()
    if L==H:
      return 0
    k11 = self.rbf(self.x1, self.x1)
    k12 = self.rbf(self.x1, self.x2)
    k22 = self.rbf(self.x2, self.x2)
    alpha2_new = self.get_alpha2(L, H, k11, k12, k22, s)
    if abs(alpha2_new - self.alpha2) < self.tol * (alpha2_new + self.alpha2 + self.eps):
      return 0
    alpha1_new = self.alpha1 + s * (self.alpha2 - alpha2_new)
    old_b = self.b
    self.b = self.compute_threshold(alpha1_new, alpha2_new, k11, k12, k22)
    self.update_error_cache(alpha1_new, alpha2_new, i1, i2, old_b)
    self.alphas[i1] = alpha1_new
    self.alphas[i2] = alpha2_new
    logging.warn(f"Done {i1} and {i2}")
    return 1
  
  def second_choice_heuristic(self, i2):
    i1 = -1
    max = 0
    for j in self.non_bound_indexes:
      step = abs(self.get_error(j) - self.errors[i2])
      if step > max:
        max = step
        i1 = j
    return i1

  def examine_example(self, i2):
    self.y2 = self.y[i2]
    self.alpha2 = self.alphas[i2]
    self.x2 = self.X[i2]
    self.E2 = self.get_error(i2)
    r2 = self.E2 * self.y2
    if ((r2 < -self.tol and self.alpha2 < self.C) or \
      (r2 > self.tol and self.alpha2 > 0)):      
      self.find_non_bound_indexes()
      logging.info(f"Found {len(self.non_bound_indexes)} non bound")
      if (len(self.non_bound_indexes) > 1):
        logging.info("Starting branch 1")
        i1 = self.second_choice_heuristic(i2)
        logging.info(f"Take step {i1} and {i2}")
        if i1 >= 0 and self.take_step(i1, i2):
          return 1
      logging.info("Starting branch 2")
      if len(self.non_bound_indexes) > 0:
        rand_i = randrange(len(self.non_bound_indexes))
        for i1 in self.non_bound_indexes[rand_i:] + self.non_bound_indexes[:rand_i]:
          logging.info(f"Take step {i1} and {i2}")
          if self.take_step(i1, i2):
            return 1
      logging.info("Starting branch 3")
      rand_i = randrange(self.n_samples)
      all_indexes = list(range(self.n_samples))
      for i1 in all_indexes[rand_i:] + all_indexes[:rand_i]:
        logging.info(f"Take step {i1} and {i2}")
        if self.take_step(i1, i2):
          return 1
    return 0

  def fit(self, X, y):
    random.seed(0)
    logging.info("Setting startup parameters")
    self.X = X
    self.y = y
    self.n_samples, self.n_features = X.shape
    self.errors = np.zeros(self.n_samples)
    self.alphas = np.zeros(self.n_samples)
    self.b = 0
    num_changed = 0
    examine_all = True
    logging.info("Start while loop")
    self.n_iter = 0
    while (num_changed > 0) or examine_all:
      self.n_iter += 1
      logging.info(f"N iter: {self.n_iter}")
      num_changed = 0
      if examine_all:
        logging.info("for loop 1")
        for i in range(self.n_samples):
          num_changed += self.examine_example(i)
      else:
        logging.info("for loop 2")
        for i in range(self.n_samples):
          if not self.is_bounded(i):
            num_changed += self.examine_example(i)
      logging.info(f"examine_all: {examine_all}, numchanged: {num_changed}")
      if examine_all:
        examine_all = False
      elif num_changed == 0:
        examine_all = True
        
    sv_idx = (self.alphas > 0)
    logging.info(f"Filtering support vectors, there are {np.count_nonzero(sv_idx)} alphas")
    self.support_vectors = X[sv_idx]
    self.support_vector_labels = y[sv_idx]

In [13]:
svm = SVM(gamma = 1/(x_train.shape[1]*x_train.var()))
svm.fit(x_train, y_train)
pred = svm.predict(x_test)
accuracy_score(pred, y_test)

  logging.warn(f"Done {i1} and {i2}")


KeyboardInterrupt: 

In [15]:
y_train_sklearn = np.array([str(y) for y in y_train])
y_test_sklearn = np.array([str(y) for y in y_test])

In [16]:
svc = SVC(kernel='rbf', gamma = 1/(x_train.shape[1]*x_train.var()))
svc.fit(x_train, y_train_sklearn)
pred = svc.predict(x_test)
accuracy_score(pred, y_test_sklearn)

0.6166666666666667

In [17]:
svc.n_iter_

array([140], dtype=int32)

In [18]:
svc.dual_coef_

array([[-1.        , -0.84618477, -1.        , -1.        , -1.        ,
        -0.48525263, -0.88835903, -1.        , -1.        , -1.        ,
        -1.        , -0.67630197, -0.91638137, -1.        , -1.        ,
        -0.92151731, -1.        , -1.        , -1.        , -1.        ,
        -0.96725608, -0.90533996, -0.66546383, -1.        , -1.        ,
        -0.80156863, -1.        , -1.        , -1.        , -0.84111311,
        -1.        , -1.        , -1.        , -1.        , -0.19542954,
        -1.        , -1.        , -1.        , -0.56715469, -1.        ,
        -1.        , -1.        , -1.        , -0.66465594, -0.8423303 ,
        -1.        , -1.        , -0.38911849, -0.71641567, -0.92482623,
        -0.93938708, -1.        , -1.        , -0.65894374, -1.        ,
        -1.        , -1.        , -0.82435754, -1.        , -1.        ,
        -0.03750627, -1.        , -0.6678171 , -0.60209135, -0.66211985,
        -1.        , -1.        , -1.        , -1. 