# NYCU Machine Learning 2024 : HW4 SVM

In [1]:
import numpy as np
import cvxpy as cp
import pandas as pd

from pathlib import Path
from rich import print
from dataclasses import dataclass
from typing import Callable

In [2]:
np.set_printoptions(precision=4)

In [3]:
LABEL = ["Setosa" , "Versicolor" , "Virginica" ]

COLOR_1 = dict(zip(LABEL, ["red" , "green" , "blue"]))
COLOR_2 = dict(zip(LABEL, ["pink" , "yellow" , "orange"]))
COLOR_3 = dict(zip(LABEL, ["brown", "lightgreen", "navy", "magenta"]))
COLOR_4 = dict(zip(LABEL, ["teal", "gold", "violet", "coral"]))

COLOR_SELECT = {
    "before": {
        "train":COLOR_1,
        "test":COLOR_2,
    },
    "after": {
        "train":COLOR_3,
        "test":COLOR_4,
    }
}

COLUMN_NAME = ["Sepal length", "Sepal width" , "Petal length" , "Petal width" , "Label"]
TRAIN_DATA_SIZE = 25
ASSETS = "./assets"

In [4]:
assets_folder = Path(ASSETS)
assets_folder.mkdir(parents=True, exist_ok=True) 

In [5]:
def load_iris_file(with_name:bool=False)->pd.DataFrame:
    df = pd.read_fwf("./iris.txt")
    
    df_new = pd.DataFrame({k:[v] for k ,v in zip(COLUMN_NAME , df.columns)},dtype=float)
    df.columns = COLUMN_NAME
    df_new = pd.concat([df_new, df], axis=0).reset_index().drop(columns=["index"])
    
    if not with_name:
        return df_new
    
    df_with_name = df_new.copy()
    
    df_with_name["Label"] = df_with_name["Label"].apply(lambda x : LABEL[int(x)-1])
    
    return df_with_name

In [6]:
df = load_iris_file(with_name=True)
df

Unnamed: 0,Sepal length,Sepal width,Petal length,Petal width,Label
0,5.1,3.5,1.4,0.2,Setosa
1,4.9,3.0,1.4,0.2,Setosa
2,4.7,3.2,1.3,0.2,Setosa
3,4.6,3.1,1.5,0.2,Setosa
4,5.0,3.6,1.4,0.2,Setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,Virginica
146,6.3,2.5,5.0,1.9,Virginica
147,6.5,3.0,5.2,2.0,Virginica
148,6.2,3.4,5.4,2.3,Virginica


In [7]:
POSITIVE_CLASS ,NEGATIVE_CLASS= "Versicolor" , "Virginica"

In [8]:
df_need = df[["Petal length","Petal width", "Label"]]
df_need

Unnamed: 0,Petal length,Petal width,Label
0,1.4,0.2,Setosa
1,1.4,0.2,Setosa
2,1.3,0.2,Setosa
3,1.5,0.2,Setosa
4,1.4,0.2,Setosa
...,...,...,...
145,5.2,2.3,Virginica
146,5.0,1.9,Virginica
147,5.2,2.0,Virginica
148,5.4,2.3,Virginica


In [9]:
df_need_pos = df_need[df_need["Label"] == POSITIVE_CLASS]
df_need_pos_train , df_need_pos_test = df_need_pos[:TRAIN_DATA_SIZE], df_need_pos[TRAIN_DATA_SIZE:]

In [10]:
df_need_neg = df_need[df_need["Label"] == NEGATIVE_CLASS]
df_need_neg_train , df_need_neg_test = df_need_neg[:TRAIN_DATA_SIZE], df_need_neg[TRAIN_DATA_SIZE:]

In [11]:
df_need_train = pd.concat([df_need_pos_train, df_need_neg_train]).reset_index().drop(columns=["index"])
df_need_test  = pd.concat([df_need_pos_test, df_need_neg_test]).reset_index().drop(columns=["index"])

In [12]:
label_to_index = {POSITIVE_CLASS : 1 , NEGATIVE_CLASS : -1}
index_to_label = {1 : POSITIVE_CLASS , -1 : NEGATIVE_CLASS}


label_to_index_np = np.vectorize(label_to_index.get)
index_to_label_np = np.vectorize(index_to_label.get)

## Model

In [13]:
def rbf(sigma:float) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
    def run(x_1:np.ndarray, x_2:np.ndarray)->np.ndarray:
        assert x_1.shape[0] == x_2.shape[0], "输入的两个向量必须有相同的特征数"
        distance_squared = np.sum((x_1 - x_2) ** 2)
    
        # Compute the RBF kernel
        kernel_value = np.exp(-distance_squared / (2 * sigma ** 2))
        
        return kernel_value
    
    return run

def poly(p:int)-> Callable[[np.ndarray, np.ndarray], np.ndarray]:
    def run(x_1:np.ndarray, x_2:np.ndarray)->np.ndarray:
        assert x_1.shape[0] == x_2.shape[0], "输入的两个向量必须有相同的特征数"
        
        return (x_1.T @ x_2)**p
    
    return run

def linear() -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
    
    def run(x_1:np.ndarray, x_2:np.ndarray)->np.ndarray:
        assert x_1.shape[0] == x_2.shape[0], "输入的两个向量必须有相同的特征数"
        
        return x_1.T @ x_2
    
    return run

class Kernel:
    kernel_dict = {
        "linear":linear,
        "rbf": rbf,
        "poly": poly,
    }
        
    @staticmethod
    def get_kernel(name:str, config:dict) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
        if name not in Kernel.kernel_dict:
            raise NotImplementedError(f"{name} kernel not implemented. Available kernels: {list(Kernel.kernel_dict.keys())}")
        
        func = Kernel.kernel_dict[name]
        
        return func(**config) 

In [14]:
class SupportVectorMachine:
    def __init__(self, C:int, kernel_name:str="linear", kernel_arg:dict=dict(),threshold:float=1e-20):
        self._c = C
        self._threshold = threshold
        # like ("ay": ... , "x": ...,)
        self._a_y_x :np.ndarray = None
        self._b :float = None
        self._kernel  = Kernel.get_kernel(kernel_name, kernel_arg)
        self._kernel_info = {"name":kernel_name, "arg": kernel_arg}
        return
    
    @property
    def alpha(self)->np.ndarray:
        return self._a_y_x[:, 0].reshape(-1,1)
    
    @property
    def bias(self)->np.ndarray:
        return self._b
    
    def _build_k_matrix(self, x:np.ndarray)->np.ndarray:
        size = x.shape[0]
        return np.array([[self._kernel(x[i], x[j]) for j in range(size)] for i in range(size)])
    
    def _find_alpha(self, K:np.ndarray, x:np.ndarray , y:np.ndarray)-> tuple[np.ndarray,np.ndarray]:
        size = x.shape[0] 
        alpha = cp.Variable(size)
        
        big_k = np.outer(y,y) * K  
        big_k = cp.psd_wrap(big_k)
        
        objective = cp.Minimize((1/2) * cp.quad_form(alpha, big_k) -cp.sum(alpha))
        constraints = [
            cp.sum(cp.multiply(alpha, y)) == 0,  
            alpha >= 0, 
            alpha <= self._c
        ]
        problem = cp.Problem(objective, constraints) 
        problem.solve(solver=cp.CLARABEL) # verbose=True
        alpha = alpha.value
        
        # filter the important one 
        support_vectors = np.where((self._c >= alpha)& (alpha > self._threshold) )[0]
        return alpha , support_vectors
    
    def _find_bias(self,alpha:np.ndarray,support_vectors:np.ndarray, y: np.ndarray, K: np.ndarray):
        K_hat = K[support_vectors][:, support_vectors]
        alpha_hat = alpha[support_vectors]
        y_hat = y[support_vectors]
        
        res = y_hat - K_hat @ (alpha_hat * y_hat)
        
        return np.mean(res)
    
    def train(self, x:np.ndarray, y:np.ndarray): # [batch, feature]   [batch, 1]
        # get the a
        K = self._build_k_matrix(x) 
        alpha , support_vector = self._find_alpha(K,x,y)
        # find the best b
        self._b = self._find_bias(alpha,support_vector,y,K)
        
        alpha , y = alpha.reshape(-1,1), y.reshape(-1,1)
        
        table = np.hstack((alpha , y, x))
        
        self._a_y_x = table[support_vector]
        
        return 
    
    def cal_one_item(self, ay:np.ndarray, x_kernel:np.ndarray, x_item: np.ndarray)->np.ndarray:
        # x [1, feature]
        # x_kernel [a, feature]
        
        res = np.sum(ay * self._kernel(x_kernel.T, x_item.reshape(-1,1))) + self._b
        
        return res
    
    def __call__(self, x : np.ndarray, with_sign:bool=False)->np.ndarray:
        # x [batch, feature]
        a, y, x_kernel = self._a_y_x[:, 0].reshape(-1,1) , self._a_y_x[:, 1].reshape(-1,1),  self._a_y_x[:, 2:]
        
        result = [self.cal_one_item(a*y, x_kernel=x_kernel, x_item=x_item) for x_item in x]
        res = np.hstack(result) 
        
        if with_sign:
            res = np.sign(res)
        
        return res
    
    def acc(self, x: np.ndarray,y: np.ndarray)->float:
        
        y_hat = self.__call__(x, True)
        
        return np.mean(y_hat == y)
    
    @property
    def info(self):
        return {"kernel": self._kernel_info, "support vector num":self._a_y_x.shape[0] , "C" : self._c, "b":f"{self._b}"}
    
    def __str__(self):
        return str(self.info)
    
    def __repr__(self):
        return self.__str__()
    

In [15]:
model = SupportVectorMachine(C =10 , kernel_name="rbf", kernel_arg={"sigma":2} ) # , kernel_name="rbf",kernel_arg={"sigma" : 2}

In [16]:
@dataclass
class TestResult:
    model:SupportVectorMachine
    acc : float
    bias : float
    alpha : np.ndarray
    
    @classmethod
    def build(cls, model:SupportVectorMachine, x:np.ndarray, y:np.ndarray):
        acc = model.acc(x=x,y=y)
        alpha = model.alpha
        bias = model.bias
        
        return cls(model,acc,bias,alpha)

In [17]:
def test_run(df_train:pd.DataFrame, df_test:pd.DataFrame, kernel_name:str , kernel_arg:dict=dict(), C_list:list[int] = [1,10,100]):
    
    if len(kernel_arg) != 0:
        kernel_item = next(iter(kernel_arg.items()))
        kernel_arg_name ,kernel_arg_value_list = kernel_item
        kernel_arg_list = [{kernel_arg_name: item} for item in kernel_arg_value_list]
    else:
        kernel_arg_list = [dict()]
    # return 
    train_x , train_y = df_train.drop(columns=["Label"]).to_numpy() , df_train["Label"].to_numpy() 
    test_x , test_y = df_test.drop(columns=["Label"]).to_numpy() , df_test["Label"].to_numpy() 
    
    train_y, test_y = label_to_index_np(train_y), label_to_index_np(test_y)
    
    model_list = []
    
    for kernel_arg_item in kernel_arg_list:
        for c in C_list:
            model = SupportVectorMachine(
                C=c,
                kernel_name=kernel_name,
                kernel_arg=kernel_arg_item
            )
            model.train(x=train_x, y=train_y)
            
            result = TestResult.build(model, x=test_x, y=test_y)
            
            model_list.append(result)
    
    
    return model_list

In [18]:
model_linear = test_run(
    df_train=df_need_train,
    df_test=df_need_test,
    kernel_name="linear",
)

In [19]:
print(model_linear)

In [20]:
model_rbf = test_run(
    df_train=df_need_train,
    df_test=df_need_test,
    kernel_name="rbf",
    kernel_arg={"sigma": [1,0.5,0.05]}
)

In [21]:
print(model_rbf)

In [22]:
model_poly = test_run(
    df_train=df_need_train,
    df_test=df_need_test,
    kernel_name="poly",
    kernel_arg={"p":[2,3,4,5]}
)

In [23]:
print(model_poly)