<a href="https://colab.research.google.com/github/YintongMa/EEGDataMining/blob/main/svm_rbf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from functools import lru_cache
from sklearn.svm import SVC
import pandas as pd
import sklearn
from sklearn.decomposition import PCA
from sklearn.utils import shuffle
import os


class svm():

    def __init__(self,
                 kernel="rbf", lmd=1e-1, gamma=0.1, bias=1.0, max_iter=100):
        if kernel not in self.__kernel_dict:
            print(kernel + " kernel does not exist!\nUse rbf kernel.")
            kernel = "rbf"
        if kernel == "rbf":
            def kernel_func(x, y):
                return self.__kernel_dict[kernel](x, y, gamma=gamma)
        else:
            kernel_func = self.__kernel_dict[kernel]
        self.kernel = kernel_func
        self.lmd = lmd
        self.bias = bias
        self.max_iter = max_iter

    def __linear_kernel(x, y):
        return np.dot(x, y)

    def __gaussian_kernel(x, y, gamma):
        diff = x - y
        return np.exp(-gamma * np.dot(diff, diff))

    __kernel_dict = {"linear": __linear_kernel, "rbf": __gaussian_kernel}

    def fit(self, X, y):
        def update_alpha(alpha, t):
            data_size, feature_size = np.shape(self.X_with_bias)
            new_alpha = np.copy(alpha)
            it = np.random.randint(low=0, high=data_size)
            x_it = self.X_with_bias[it]
            y_it = self.y[it]

            # alpha[k] = alpha[k] + eta[k] * (1 - myData.loc[k, 2] * sum(alpha * myData.loc[:, 2] * K[:, k]))
            if (y_it * (1. / (self.lmd * t)) * sum([alpha_j * y_it * self.kernel(x_it, x_j) for x_j, alpha_j in zip(self.X_with_bias, alpha)])) < 1.:
                new_alpha[it] += 1
            return new_alpha

        self.X_with_bias = np.c_[X, np.ones((np.shape(X)[0])) * self.bias]
        self.y = y
        alpha = np.zeros((np.shape(self.X_with_bias)[0], 1))

        for t in range(1, self.max_iter + 1):
            alpha = update_alpha(alpha, t)
        self.alpha = alpha
        return alpha


    def predict(self,X):
        X_with_bias = np.c_[X, np.ones((np.shape(X)[0])) * self.bias]

        y_score = []

        for x in X_with_bias:
            i = 0
            for (x_j, y_j, alpha_j) in zip(self.X_with_bias, self.y, self.alpha):
                i += alpha_j * y_j * self.kernel(x_j, x)
            y_score.append((1. / (self.lmd * self.max_iter)) * i)

        
        return y_score


def cross_val_binary(X, y, batch_size):
    error_arr = []
    subset_num = int(len(X)/batch_size)-1
    for i in range(subset_num):
        print('batch: '+str(i))
        error = 0
        X_test = X[i*batch_size: (i+1)*batch_size]
        y_test = y[i*batch_size: (i+1)*batch_size]
        X_train = np.concatenate((X[0: i*batch_size], X[(i+1)*batch_size: len(X)]))
        y_train = np.concatenate((y[0: i*batch_size], y[(i+1)*batch_size: len(y)]))
        
        # Revised part, PCA fitting only applied to training set
        pca = PCA()
        X_train =  pca.fit_transform(X_train)
        X_test = pca.transform(X_test)

        print('training')
        cf = svm(kernel="linear")
        cf.fit(X_train, y_train)

        print('predicting')
        y_score = cf.predict((X_test))
        result = []
        for s in y_score:
            if s >= 0.:
                result.append(1)
            else:
                result.append(-1)

        for i in range(len(X_test)):
            if result[i] != y_test[i]:
                error = error + 1
        error_rate = error/batch_size
        error_arr.append(error_rate)
    
#     print ("Error rate of each iteration: " + str(error_arr))
    print ("Average error rate:" + str(np.average(error_arr)))
    print()
    
    return np.average(error_arr)



In [None]:
# Binary tasks
PATH = "/content/drive/MyDrive/ML/data_binary"
file_list = os.listdir(PATH)
file_list.sort()
task = []
result = []

for file in file_list:
    data = np.load(os.path.join(PATH, file))
    X = data['x']
    y = data['y']
    print(file)
    avg_err = cross_val_binary(X, y, int(len(X)/10))
    task.append(file.split(".")[0])
    result.append(avg_err)

result_df = {"task": task,"avg_err": result}
df = pd.DataFrame(result_df, columns = ["task","avg_err"])

df.to_csv("/content/drive/MyDrive/ML/result/least_squares_binary.csv", index=False)

In [None]:
# Classify 10 different digit classes
def cross_val_multi(X, y, batch_size):
    error_arr = []
    subset_num = int(len(X)/batch_size)-1
    
    for i in range(subset_num):
        print("batch: " + str(i))
        error = 0
        X_test = X[i * batch_size: (i + 1) * batch_size]
        y_test = y[i * batch_size: (i + 1) * batch_size]
        X_train = np.concatenate((X[0: i * batch_size], X[(i + 1) * batch_size: len(X)]))
        label_train = np.concatenate((y[0: i * batch_size], y[(i + 1) * batch_size: len(y)]))
        
        
        svm_list = []
        for m in range(10):
            print('building predictor for ' + str(m))
            y_train = []
            for j in range(len(label_train)):
                if label_train[j] == i:
                    y_train.append([1])
                else:
                    y_train.append([-1])
            y_train = np.array(y_train)
                    
            pca = PCA()
            X_train =  pca.fit_transform(X_train)
            X_test = pca.transform(X_test)


            cf = svm(kernel="rbf")
            cf.fit(X_train, y_train)
            svm_list.append(cf)


        print('predicting')
        scores = np.zeros((len(svm_list),len(X_test)))
        for i in range(len(svm_list)):
            scores[i] = np.array(svm_list[i].predict((X_test))).flatten()

        scores = scores.T
        result = []
        for j in range(len(y_test)):
            result.append(np.abs(scores[j] - 1).argmin())
            
        for j in range(len(result)):
            if result[j] != y_test[j]:
                error = error + 1
        
        error_rate = error / batch_size
        error_arr.append(error_rate)
        print('error_rate: '+str(error_rate))
        print('***************************')

#     print("Error rate of each iteration: " + str(error_arr))
    print("Average error rate:" + str(np.average(error_arr)))
    return np.average(error_arr)

In [None]:
# Multiclass tasks
task = "classify 10 digit seeing"
data = np.load("/content/drive/MyDrive/ML/data_multi/all_raw_digit.npz") 
x = data['x']
y = data['y']


avg_err = cross_val_multi(x, y, int(len(x)/10))
#avg_err = cross_val_multi(x[:100], y[:100], 10)

result = {"task": [task],"avg_err": [avg_err]}
df = pd.DataFrame(result, columns = ["task","avg_err"])

df.to_csv("/content/drive/MyDrive/ML/result/svm_linear_multi_old.csv", index=False)

batch: 0
building predictor for 0
building predictor for 1
building predictor for 2
building predictor for 3
building predictor for 4
building predictor for 5
building predictor for 6
building predictor for 7
building predictor for 8
building predictor for 9
predicting
error_rate: 0.9042735042735043
***************************
batch: 1
building predictor for 0
building predictor for 1
building predictor for 2
building predictor for 3
building predictor for 4
building predictor for 5
building predictor for 6
building predictor for 7
building predictor for 8
building predictor for 9
predicting
error_rate: 0.8931623931623932
***************************
batch: 2
building predictor for 0
building predictor for 1
building predictor for 2
building predictor for 3
building predictor for 4
building predictor for 5
building predictor for 6
building predictor for 7
building predictor for 8
building predictor for 9
predicting


In [None]:
import numpy as np
from functools import lru_cache
from sklearn.svm import SVC
import pandas as pd
import sklearn
from sklearn.decomposition import PCA
from sklearn.utils import shuffle
import os
from functools import lru_cache

class svm_new():
    def __init__(self, kernel="rbf", lmd=1e-1, sigma=0.1, max_iter=100):
        self.kernel = kernel
        self.lmd = lmd
        self.sigma = sigma
        self.max_iter = max_iter
        self.alpha = 0
        self.w = 0


    def kernel_matrix(self, X):
        if self.kernel == "linear":
            return X.dot(X.T)
        elif self.kernel == "rbf":
            return X.dot(X.T)
        else:
            row, col = X.shape
            GassMatrix = np.zeros(shape=(row, row))
            i = 0
            for v_i in X:
                j = 0
                for v_j in X:
                    GassMatrix[i, j] = Gaussian(v_i.T, v_j.T, self.sigma)
                    j += 1
                i += 1
            return GassMatrix

    def Gaussian(self, x, z):
        norm = np.linalg.norm(x - z)**2
        up = ((norm**2)*-1)
        down = (2 * (self.sigma ** 2))
        return math.exp(up/down)

    def KRR(self, K,y):
        m,n = K.shape
        return np.linalg.inv(K + np.dot(self.lmd,np.identity(m))).dot(y)


    def fit(self, X, y, lamb=0.01, alpha_init=None, eta=0.01, max_step=1000):
        """
        Compute the optimal alpha for an SVM using the kernel matrix K
        and observations y.
        alpha_init : our initial value for alpha
        eta: step size
        """

        K = self.kernel_matrix(X)
        n = len(y)
        if alpha_init is None:
            alpha_init = np.zeros(n)
        assert K.shape == (n, n)
        assert len(alpha_init) == n
        # make sure all labels are +/ - 1 and that we have
        # both positive and negative labels
        assert (np.unique(y) == np.array([-1, 1])).all()
        alpha_k = alpha_init
        for k in range(max_step):
            if k == 0 or (k+1)%10 == 0:
                print(k+1)
            alpha_old = alpha_k
            ## your code here to update alpha_k
            res = np.zeros(n)
            for i in range(n):
                if y[i] * K[:, i].T.dot(alpha_old) < 1:
                    res += np.identity(n).dot(-y[i] * K[:, i])
            res += 2 * lamb * K.dot(alpha_old)

            alpha_k = alpha_old - eta * res
            # # compute y_est
            # y_est = (K + lamb * np.identity(n)).dot(alpha_k)
            # accuracy = np.mean((y_est > 0) == (y > 0))
            alpha_norm_change = np.linalg.norm(alpha_k - alpha_old)
            # print("k = {:5}".format(k) + ",alpha_norm_change = {:3.2f}".format(alpha_norm_change) +
            #       ",accuracy = {:3.1f}".format(accuracy * 100))
            if alpha_norm_change < 1e-1:
                break
        self.alpha = alpha_k
        self.w = X.T.dot(alpha_k)
        return alpha_k, self.w

    def predict (self, X):
        """
        y_predict = []
        for s in X.dot(self.w.T):
            if s[0] >= 0.:
                y_predict.append(1)
            else:
                y_predict.append(-1)

        return np.array(y_predict)
        """
        return X.dot(self.w.T)


def cross_val_rbf(X, y, batch_size):
    error_arr = []
    subset_num = int(len(X)/batch_size)-1
    for i in range(subset_num):
        print('batch: '+str(i))
        error = 0
        X_test = X[i*batch_size: (i+1)*batch_size]
        y_test = y[i*batch_size: (i+1)*batch_size]
        X_train = np.concatenate((X[0: i*batch_size], X[(i+1)*batch_size: len(X)]))
        y_train = np.concatenate((y[0: i*batch_size], y[(i+1)*batch_size: len(y)]))
        
        # Revised part, PCA fitting only applied to training set
        pca = PCA()
        X_train =  pca.fit_transform(X_train)
        X_test = pca.transform(X_test)

        print('training')
        cf = svm_new(kernel="rbf")
        cf.fit(X_train, y_train)

        print('predicting')
        y_score = cf.predict((X_test))
        result = []
        for s in y_score:
            if s >= 0.:
                result.append(1)
            else:
                result.append(-1)

        for i in range(len(X_test)):
            if result[i] != y_test[i]:
                error = error + 1
        error_rate = error/batch_size
        error_arr.append(error_rate)
    
#     print ("Error rate of each iteration: " + str(error_arr))
    print ("Average error rate:" + str(np.average(error_arr)))
    print()
    
    return np.average(error_arr)



In [None]:
# Binary tasks
PATH = "/content/drive/MyDrive/ML/data_binary"
file_list = os.listdir(PATH)
file_list.sort()
task = []
result = []

for file in file_list:
    data = np.load(os.path.join(PATH, file))
    X = data['x']
    y = data['y']
    print(file)
    avg_err = cross_val_rbf(X, y, int(len(X)/10))
    task.append(file.split(".")[0])
    result.append(avg_err)

result_df = {"task": task,"avg_err": result}
df = pd.DataFrame(result_df, columns = ["task","avg_err"])

df.to_csv("/content/drive/MyDrive/ML/result/svm_rbf_binary.csv", index=False)

all_0_vs_rest.npz
batch: 0
training
predicting
batch: 1
training
predicting
batch: 2
training
predicting
batch: 3
training
predicting
batch: 4
training
predicting
batch: 5
training
predicting
batch: 6
training
predicting
batch: 7
training
predicting
batch: 8
training
predicting
Average error rate:0.13383838383838384

all_1_vs_rest.npz
batch: 0
training
predicting
batch: 1
training
predicting
batch: 2
training
predicting
batch: 3
training
predicting
batch: 4
training
predicting
batch: 5
training
predicting
batch: 6
training
predicting
batch: 7
training
predicting
batch: 8
training
predicting
Average error rate:0.12794612794612792

all_2_vs_rest.npz
batch: 0
training
predicting
batch: 1
training
predicting
batch: 2
training
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
170
180
190
200
210


In [None]:
# Classify 10 different digit classes
def cross_val_multi_rbf(X, y, batch_size):
    error_arr = []
    subset_num = int(len(X)/batch_size)-1
    
    for i in range(subset_num):
        print("batch: " + str(i))
        error = 0
        X_test = X[i * batch_size: (i + 1) * batch_size]
        y_test = y[i * batch_size: (i + 1) * batch_size]
        X_train = np.concatenate((X[0: i * batch_size], X[(i + 1) * batch_size: len(X)]))
        label_train = np.concatenate((y[0: i * batch_size], y[(i + 1) * batch_size: len(y)]))
        
        
        svm_list = []
        for m in range(10):
            print('building predictor for ' + str(m))
            y_train = []
            for j in range(len(label_train)):
                if label_train[j] == i:
                    y_train.append([1])
                else:
                    y_train.append([-1])
            y_train = np.array(y_train)
                    
            pca = PCA()
            X_train =  pca.fit_transform(X_train)
            X_test = pca.transform(X_test)


            cf = svm_new(kernel="rbf")
            cf.fit(X_train, y_train)
            svm_list.append(cf)


        print('predicting')
        scores = np.zeros((len(svm_list),len(X_test)))
        for i in range(len(svm_list)):
            scores[i] = np.array(svm_list[i].predict((X_test))).flatten()

        scores = scores.T
        result = []
        for j in range(len(y_test)):
            result.append(np.abs(scores[j] - 1).argmin())
            
        for j in range(len(result)):
            if result[j] != y_test[j]:
                error = error + 1
        
        error_rate = error / batch_size
        error_arr.append(error_rate)
        print('error_rate: '+str(error_rate))
        print('***************************')

#     print("Error rate of each iteration: " + str(error_arr))
    print("Average error rate:" + str(np.average(error_arr)))
    return np.average(error_arr)

In [None]:
# Multiclass tasks
task = "classify 10 digit seeing"
data = np.load("/content/drive/MyDrive/ML/data_multi/all_raw_digit.npz") 
x = data['x']
y = data['y']


avg_err = cross_val_multi_rbf(x, y, int(len(x)/10))
#avg_err = cross_val_multi_rbf(x[:100], y[:100], 10)

result = {"task": [task],"avg_err": [avg_err]}
df = pd.DataFrame(result, columns = ["task","avg_err"])

df.to_csv("/content/drive/MyDrive/ML/result/svm_rbf_multi.csv", index=False)

batch: 0
building predictor for 0
1
building predictor for 1
1


KeyboardInterrupt: ignored