- https://github.com/topics/pegasos
- https://github.com/jyotishp/pegasos-svm/blob/master/src/svm.py


In [1]:
import math
import numpy as np
from random import randint

In [3]:
def pegasos(x, y, weights=None, iterations=2000, lam=1):
    if type(weights) == type(None): weights = np.zeros(x[0].shape)
    num_S = len(y)
    for i in range(iterations):
        it = randint(0, num_S-1)
        step = 1/(lam*(i+1))
        decision = y[it] * weights @ x[it].T
        if decision < 1:
            weights = (1 - step*lam) * weights + step*y[it]*x[it]
        else:
            weights = (1 - step*lam) * weights
    return weights

In [3]:
def pegasosSolver(X,Y,lamb,n_iter=100):
    # the main pegasos solver code
    C = len(Y)
    # print(X)
    W = np.zeros(len(X[0]))
    it = 0
    while it < n_iter:
        eta=1.0/(lamb*(it+1))
        choice=random.randint(0,C-1)
        x = X[choice]
        out = np.dot(W.T,x)
        y = Y[choice]
        if y*out >= 1:
            W = (1-eta*lamb)*W
        else:
            W = (1-eta*lamb)*W + (eta*y)*x
        it = it + 1
    return W

In [8]:
import os
import sys
import argparse
import numpy as np
from mnist import MNIST

from pegasos import *

class Dataset():

    def __init__(self, data_dir, labels_to_load=[0,1]):
        self.labels_to_load = labels_to_load
        self.mnist_loader = MNIST(data_dir)
        print('Loading dataset...')

        self.xtrain, self.ytrain = self.mnist_loader.load_training()
        self.xtrain = np.array(self.xtrain, dtype=np.float64)
        self.ytrain = np.array(self.ytrain, dtype=np.float64)
        self.xtrain, self.ytrain = self.trim_dataset(self.xtrain, self.ytrain)

        self.xtest, self.ytest = self.mnist_loader.load_testing()
        self.xtest = np.array(self.xtest, dtype=np.float64)
        self.ytest = np.array(self.ytest, dtype=np.float64)
        self.xtest, self.ytest = self.trim_dataset(self.xtest, self.ytest)
        print('Dataset loaded')

    def trim_dataset(self, x, y):
        xtrain = []
        ytrain = []
        for i in range(len(y)):
            if y[i] == 0:
                ytrain.append(-1)
                xtrain.append(x[i])
            elif y[i] == 1:
                ytrain.append(1)
                xtrain.append(x[i])
            else:
                pass
        return np.array(xtrain), np.array(ytrain)

def kernel_function(x, y):
    mean = np.linalg.norm(x - y)**2
    variance = 1
    return np.exp(-mean/(2*variance))

def parse_arguments():
    # args
    parser = argparse.ArgumentParser(description='')
    parser.add_argument('--dataset_dir', required=True)
    parser.add_argument('--iterations', type=int, default=10)
    parser.add_argument('--kernel', default=False, action='store_true')
    parser.add_argument('--lambda', default=1, type=float)
    return parser.parse_args()

def kernelized_svm(args, data):
    weights = kernelized_pegasos(
            x=data.xtrain,
            y=data.ytrain,
            kernel=kernel_function,
            iterations=args.iterations
    )
    errors = 0
    for i in range(len(data.ytest[:500])):
        decision = 0
        for j in range(len(data.ytrain)):
            decision += weights[j]*data.ytrain[j]*kernel_function(data.xtrain[j], data.xtest[i])
        if decision < 0:
            prediction = -1
        else:
            prediction = 1
        if prediction != data.ytest[i]: errors += 1
    return 1 - errors/len(data.ytest)

def svm(args, data):
    weights = pegasos(
            x=data.xtrain,
            y=data.ytrain,
            iterations=args.iterations
    )
    errors = 0
    for i in range(len(data.ytest)):
        decision = weights @ data.xtest[i].T
        if decision < 0:
            prediction = -1
        else:
            prediction = 1
        if prediction != data.ytest[i]: errors += 1
    return 1 - errors/len(data.ytest)

def main():
    args = parse_arguments()
    data = Dataset(args.dataset_dir)

    if args.kernel:
        print('Using RBF kernel')
        accuracy = kernelized_svm(args, data)
    else:
        accuracy = svm(args, data)
    print('Accuracy:', accuracy)

main()


ModuleNotFoundError: No module named 'mnist'

In [1]:
import sys
import numpy as np
import random

def bgd_pegasos(x, y, threshold, c=0, batch_size=100, max_iter=5000):
    if c == 0:
        c = 1 / batch_size

    data = list(zip(x, y))

    num_samples, num_features = len(x), len(x[0])
    num_batches = num_samples / batch_size

    w = np.zeros([num_features, ])
    w_old = np.zeros([num_features, ])
    b = 0
    b_old = 0

    it = 0

    while(it < max_iter):
        it += 1
        eeta = 1 / it

        batch = random.sample(data, batch_size)
        gjw = np.zeros([num_features, ])
        gb = 0

        for i in range(batch_size):
            x, y = batch[i]
            ti = y * (w@x + b)
            gti = 0 if ti > 1 else -1
            gjw += gti * y * x
            gb += gti * y

        gjw = eeta * (w + c * gjw)
        gb = eeta * c * gb

        w_old = w
        b_old = b

        w = w - gjw
        b = b - gb

        change_inb = abs(b - b_old)
        change_inw = np.abs(w - w_old)
        change_inw = change_inw[np.argmax(change_inw)]
        loss = max(change_inb, change_inw)

        sys.stdout.write("\r\x1b[K" + "Iteration: %d loss: %g" % (it, loss))
        sys.stdout.flush()
        # if(it >= num_batches and loss < threshold):
        if(loss < threshold):
            return w, b