SVM is a supervised learning algorithm used for classification (and regression).
The goal is simple: find the best boundary (hyperplane) that separates data points of different classes with the largest possible margin.

Hyperplane: A decision boundary. In 2D it’s a line, in 3D a plane, and in higher dimensions, a hyperplane.

Margin: The distance between the hyperplane and the closest data points (support vectors).

Support Vectors: The data points that lie closest to the decision boundary. They “support” the hyperplane.

The intuition: the larger the margin, the better the generalization.

In [9]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from typing import Callable, Tuple, Optional, Dict
import math

In [2]:
dataset = {
    "X": [
        [2.4967, 1.8617],
        [2.6477, 3.5230],
        [1.7658, 1.7659],
        [3.5792, 2.7674],
        [1.5305, 2.5426],
        [1.5366, 1.5343],
        [2.2420, 0.0867],
        [0.2751, 1.4377],
        [0.9872, 2.3142],
        [1.0920, 0.5877],
        [-0.5344, -2.2258],
        [-1.9325, -3.4247],
        [-2.5444, -1.8891],
        [-3.1510, -1.6243],
        [-2.6006, -2.2917],
        [-2.6017, -0.1477],
        [-2.0135, -3.0577],
        [-1.1775, -3.2208],
        [-1.7911, -3.9597],
        [-3.3282, -1.8031]
    ],
    "y": [
        1.0, 1.0, 1.0, 1.0, 1.0, 
        1.0, 1.0, 1.0, 1.0, 1.0, 
        -1.0, -1.0, -1.0, -1.0, -1.0, 
        -1.0, -1.0, -1.0, -1.0, -1.0
    ]
}


In [3]:
def decision_score(x: np.ndarray, w: np.ndarray, b: float) -> float:
    """Compute linear decision score f(x) = w^T x + b."""
    return float(np.dot(w, x) + b)

def predict_label(x: np.ndarray, w: np.ndarray, b: float) -> int:
    """Predict {-1, +1} using sign(w^T x + b)."""
    return 1 if decision_score(x, w, b) >= 0.0 else -1

def batch_predict(X: np.ndarray, w: np.ndarray, b: float) -> np.ndarray:
    """Vectorized prediction for matrix X (n x d)."""
    scores = X @ w + b
    return np.where(scores >= 0.0, 1, -1)


In [6]:
def hinge_loss_terms(X: np.ndarray, y: np.ndarray, w: np.ndarray, b: float) -> np.ndarray:
    """
    Return per-sample hinge losses: max(0, 1 - y_i * (w^T x_i + b)).
    X: (n, d), y: (n,), w: (d,), b: float
    """
    margins = 1.0 - y * (X @ w + b)
    return np.maximum(0.0, margins)

def objective_primal(X: np.ndarray, y: np.ndarray, w: np.ndarray, b: float, lam: float) -> float:
    """
    Regularized hinge loss objective:
    J(w,b) = (lam/2) * ||w||^2 + (1/n) * sum_i max(0, 1 - y_i (w^T x_i + b))
    """
    n = X.shape[0]
    reg = 0.5 * lam * np.dot(w, w)
    loss = hinge_loss_terms(X, y, w, b).mean()
    return reg + loss

def subgradients_primal(X: np.ndarray, y: np.ndarray, w: np.ndarray, b: float, lam: float) -> Tuple[np.ndarray, float]:
    """
    Subgradients of J(w,b) w.r.t. (w, b).
    """
    n = X.shape[0]
    margins = y * (X @ w + b)
    mask = margins < 1.0  # misclassified or on margin
    if np.any(mask):
        # Average over all samples (full-batch)
        g_w = lam * w - (X[mask] * y[mask, None]).mean(axis=0)
        g_b = -y[mask].mean()
    else:
        g_w = lam * w
        g_b = 0.0
    return g_w, float(g_b)


In [7]:
class LinearSVMPrimal:
    """
    Linear SVM trained by full-batch subgradient descent on the primal objective.
    Good as a baseline. No kernels. 
    """

    def __init__(self, lam: float = 1e-2, max_iters: int = 2000, stepsize: Optional[Callable[[int], float]] = None,
                 shuffle: bool = True, random_state: Optional[int] = 42):
        """
        lam: regularization coefficient (λ > 0)
        stepsize: function t -> eta_t; default = 1 / (λ t)
        """
        if lam <= 0:
            raise ValueError("lam must be > 0")
        self.lam = lam
        self.max_iters = max_iters
        self.stepsize_fn = stepsize
        self.shuffle = shuffle
        self.random_state = random_state

        self.w: Optional[np.ndarray] = None
        self.b: float = 0.0
        self.history: Dict[str, list] = {"objective": []}

    def _default_stepsize(self, t: int) -> float:
        return 1.0 / (self.lam * max(1, t))

    def fit(self, X: np.ndarray, y: np.ndarray) -> "LinearSVMPrimal":
        rng = np.random.default_rng(self.random_state)
        n, d = X.shape
        self.w = np.zeros(d, dtype=float)
        self.b = 0.0

        idx = np.arange(n)
        for t in range(1, self.max_iters + 1):
            if self.shuffle:
                rng.shuffle(idx)
                X_batch = X[idx]
                y_batch = y[idx]
            else:
                X_batch = X
                y_batch = y

            g_w, g_b = subgradients_primal(X_batch, y_batch, self.w, self.b, self.lam)
            eta = self.stepsize_fn(t) if self.stepsize_fn else self._default_stepsize(t)

            self.w -= eta * g_w
            self.b -= eta * g_b

            obj = objective_primal(X, y, self.w, self.b, self.lam)
            self.history["objective"].append(obj)
        return self

    def decision_function(self, X: np.ndarray) -> np.ndarray:
        if self.w is None:
            raise RuntimeError("Model is not fitted.")
        return X @ self.w + self.b

    def predict(self, X: np.ndarray) -> np.ndarray:
        scores = self.decision_function(X)
        return np.where(scores >= 0.0, 1, -1)



In [8]:
class LinearSVMPegasos:
    """
    Pegasos: Stochastic Subgradient Descent for Linear SVM.
    Efficient for large datasets. Includes optional projection step.
    """

    def __init__(self, lam: float = 1e-4, max_iters: int = 5000, batch_size: int = 32,
                 project: bool = True, random_state: Optional[int] = 42):
        if lam <= 0:
            raise ValueError("lam must be > 0")
        self.lam = lam
        self.max_iters = max_iters
        self.batch_size = batch_size
        self.project = project
        self.random_state = random_state

        self.w: Optional[np.ndarray] = None
        self.b: float = 0.0
        self.history: Dict[str, list] = {"objective": []}

    def fit(self, X: np.ndarray, y: np.ndarray) -> "LinearSVMPegasos":
        rng = np.random.default_rng(self.random_state)
        n, d = X.shape
        self.w = np.zeros(d, dtype=float)
        self.b = 0.0

        for t in range(1, self.max_iters + 1):
            # Sample minibatch
            batch_idx = rng.choice(n, size=min(self.batch_size, n), replace=False)
            Xb = X[batch_idx]
            yb = y[batch_idx]

            # Stochastic subgradient
            margins = yb * (Xb @ self.w + self.b)
            mask = margins < 1.0

            g_w = self.lam * self.w
            g_b = 0.0
            if np.any(mask):
                g_w = g_w - (Xb[mask] * yb[mask, None]).mean(axis=0)
                g_b = g_b - yb[mask].mean()

            eta = 1.0 / (self.lam * t)
            self.w -= eta * g_w
            self.b -= eta * g_b

            # Optional projection step: keep ||w|| <= 1/sqrt(lam)
            if self.project:
                norm_w = np.linalg.norm(self.w)
                cap = 1.0 / math.sqrt(self.lam)
                if norm_w > cap:
                    self.w *= (cap / norm_w)

            # Track objective on full data (optional; costs O(n d))
            obj = objective_primal(X, y, self.w, self.b, self.lam)
            self.history["objective"].append(obj)

        return self

    def decision_function(self, X: np.ndarray) -> np.ndarray:
        if self.w is None:
            raise RuntimeError("Model is not fitted.")
        return X @ self.w + self.b

    def predict(self, X: np.ndarray) -> np.ndarray:
        scores = self.decision_function(X)
        return np.where(scores >= 0.0, 1, -1)



In [10]:

def linear_kernel(a: np.ndarray, b: np.ndarray) -> float:
    return float(a @ b)

def poly_kernel(a: np.ndarray, b: np.ndarray, degree: int = 3, coef0: float = 1.0, gamma: Optional[float] = None) -> float:
    if gamma is None:
        gamma = 1.0 / a.shape[0]
    return float((gamma * (a @ b) + coef0) ** degree)

def rbf_kernel(a: np.ndarray, b: np.ndarray, gamma: Optional[float] = None) -> float:
    if gamma is None:
        gamma = 1.0 / a.shape[0]  # simple default
    diff = a - b
    return float(np.exp(-gamma * (diff @ diff)))


class KernelSVM_SMO:
    """
    Soft-margin SVM trained with SMO in the dual.
    Supports custom kernels (linear, poly, RBF).
    """

    def __init__(self, C: float = 1.0, kernel: Callable[[np.ndarray, np.ndarray], float] = None,
                 tol: float = 1e-3, max_passes: int = 5, random_state: Optional[int] = 42):
        if C <= 0:
            raise ValueError("C must be > 0")
        self.C = C
        self.kernel = kernel if kernel is not None else rbf_kernel
        self.tol = tol
        self.max_passes = max_passes
        self.random_state = random_state

        # Learned parameters
        self.alpha: Optional[np.ndarray] = None
        self.b: float = 0.0
        self.X: Optional[np.ndarray] = None
        self.y: Optional[np.ndarray] = None
        self._K: Optional[np.ndarray] = None  # Gram matrix (n x n)

    def _compute_kernel_matrix(self, X: np.ndarray) -> np.ndarray:
        n = X.shape[0]
        K = np.empty((n, n), dtype=float)
        for i in range(n):
            for j in range(i, n):
                val = self.kernel(X[i], X[j])
                K[i, j] = val
                K[j, i] = val
        return K

    def _f(self, i: int) -> float:
        """Decision score at training point i: f(x_i)."""
        return float(np.sum(self.alpha * self.y * self._K[:, i]) + self.b)

    def fit(self, X: np.ndarray, y: np.ndarray) -> "KernelSVM_SMO":
        rng = np.random.default_rng(self.random_state)
        n = X.shape[0]

        self.X = X.astype(float, copy=True)
        self.y = y.astype(float, copy=True)
        self.alpha = np.zeros(n, dtype=float)
        self.b = 0.0
        self._K = self._compute_kernel_matrix(self.X)

        passes = 0
        while passes < self.max_passes:
            num_changed = 0
            for i in range(n):
                E_i = self._f(i) - self.y[i]
                KKT_viol = ((self.y[i] * E_i < -self.tol and self.alpha[i] < self.C) or
                            (self.y[i] * E_i >  self.tol and self.alpha[i] > 0))
                if not KKT_viol:
                    continue

                # Choose j != i (simple heuristic: random)
                j = i
                while j == i:
                    j = rng.integers(0, n)
                E_j = self._f(j) - self.y[j]

                alpha_i_old = self.alpha[i]
                alpha_j_old = self.alpha[j]

                # Compute L, H
                if self.y[i] != self.y[j]:
                    L = max(0.0, self.alpha[j] - self.alpha[i])
                    H = min(self.C, self.C + self.alpha[j] - self.alpha[i])
                else:
                    L = max(0.0, self.alpha[i] + self.alpha[j] - self.C)
                    H = min(self.C, self.alpha[i] + self.alpha[j])

                if L == H:
                    continue

                # eta = 2 K_ij - K_ii - K_jj
                eta = 2.0 * self._K[i, j] - self._K[i, i] - self._K[j, j]
                if eta >= 0:
                    continue

                # Update alpha[j]
                self.alpha[j] = self.alpha[j] - (self.y[j] * (E_i - E_j)) / eta
                # Clip to [L, H]
                if self.alpha[j] > H:
                    self.alpha[j] = H
                elif self.alpha[j] < L:
                    self.alpha[j] = L

                # Check for significant change
                if abs(self.alpha[j] - alpha_j_old) < 1e-12:
                    self.alpha[j] = alpha_j_old
                    continue

                # Update alpha[i]
                self.alpha[i] = self.alpha[i] + self.y[i] * self.y[j] * (alpha_j_old - self.alpha[j])

                # Compute b1, b2
                b1 = (self.b - E_i
                      - self.y[i] * (self.alpha[i] - alpha_i_old) * self._K[i, i]
                      - self.y[j] * (self.alpha[j] - alpha_j_old) * self._K[i, j])

                b2 = (self.b - E_j
                      - self.y[i] * (self.alpha[i] - alpha_i_old) * self._K[i, j]
                      - self.y[j] * (self.alpha[j] - alpha_j_old) * self._K[j, j])

                # Update b
                if 0 < self.alpha[i] < self.C:
                    self.b = b1
                elif 0 < self.alpha[j] < self.C:
                    self.b = b2
                else:
                    self.b = 0.5 * (b1 + b2)

                num_changed += 1

            passes = passes + 1 if num_changed == 0 else 0

        return self

    def decision_function(self, X: np.ndarray) -> np.ndarray:
        if self.alpha is None or self.X is None or self.y is None:
            raise RuntimeError("Model is not fitted.")
        n_train = self.X.shape[0]
        n_eval = X.shape[0]
        scores = np.zeros(n_eval, dtype=float)
        for i in range(n_eval):
            s = 0.0
            xi = X[i]
            for j in range(n_train):
                if self.alpha[j] > 0.0:
                    s += self.alpha[j] * self.y[j] * self.kernel(self.X[j], xi)
            scores[i] = s + self.b
        return scores

    def predict(self, X: np.ndarray) -> np.ndarray:
        scores = self.decision_function(X)
        return np.where(scores >= 0.0, 1, -1)

    def support_indices(self, tol: float = 1e-10) -> np.ndarray:
        """Indices with alpha > tol."""
        if self.alpha is None:
            return np.array([], dtype=int)
        return np.flatnonzero(self.alpha > tol)

    def linear_w_if_applicable(self) -> Optional[np.ndarray]:
        """
        If kernel is linear, recover w = sum_i alpha_i y_i x_i.
        Otherwise returns None.
        """
        if self.alpha is None or self.X is None or self.y is None:
            return None
        # heuristically check: if kernel evaluated as dot
        try:
            # Compare a few evaluations to dot product to decide
            # (not foolproof; use only if you know it's linear_kernel)
            return (self.alpha * self.y) @ self.X
        except Exception:
            return None


In [11]:
if __name__ == "__main__":
    rng = np.random.default_rng(42)
    n_per = 20
    X_pos = rng.normal(loc=2.0, scale=1.0, size=(n_per, 2))
    X_neg = rng.normal(loc=-2.0, scale=1.0, size=(n_per, 2))
    X = np.vstack([X_pos, X_neg])
    y = np.hstack([np.ones(n_per), -np.ones(n_per)])

    # --- Linear (Primal) ---
    primal = LinearSVMPrimal(lam=1e-2, max_iters=2000)
    primal.fit(X, y)
    pred_primal = primal.predict(X)
    acc_primal = (pred_primal == y).mean()
    print(f"[Primal] Train accuracy: {acc_primal:.3f}")

    # --- Linear (Pegasos) ---
    pegasos = LinearSVMPegasos(lam=1e-4, max_iters=4000, batch_size=16)
    pegasos.fit(X, y)
    pred_peg = pegasos.predict(X)
    acc_peg = (pred_peg == y).mean()
    print(f"[Pegasos] Train accuracy: {acc_peg:.3f}")

    # --- Kernel (SMO, RBF) ---
    smo = KernelSVM_SMO(C=1.0, kernel=lambda a, b: rbf_kernel(a, b, gamma=0.5),
                        tol=1e-3, max_passes=5)
    smo.fit(X, y)
    pred_smo = smo.predict(X)
    acc_smo = (pred_smo == y).mean()
    print(f"[SMO-RBF] Train accuracy: {acc_smo:.3f}")

[Primal] Train accuracy: 1.000
[Pegasos] Train accuracy: 1.000
[SMO-RBF] Train accuracy: 1.000
