In [1]:

# imports
import os
import sys
import types
import json
import base64

# figure size/format
fig_width = 5.5
fig_height = 3.5
fig_format = 'pdf'
fig_dpi = 300
interactivity = ''
is_shiny = False
is_dashboard = False
plotly_connected = True

# matplotlib defaults / format
try:
  import matplotlib.pyplot as plt
  plt.rcParams['figure.figsize'] = (fig_width, fig_height)
  plt.rcParams['figure.dpi'] = fig_dpi
  plt.rcParams['savefig.dpi'] = "figure"

  # IPython 7.14 deprecated set_matplotlib_formats from IPython
  try:
    from matplotlib_inline.backend_inline import set_matplotlib_formats
  except ImportError:
    # Fall back to deprecated location for older IPython versions
    from IPython.display import set_matplotlib_formats
    
  set_matplotlib_formats(fig_format)
except Exception:
  pass

# plotly use connected mode
try:
  import plotly.io as pio
  if plotly_connected:
    pio.renderers.default = "notebook_connected"
  else:
    pio.renderers.default = "notebook"
  for template in pio.templates.keys():
    pio.templates[template].layout.margin = dict(t=30,r=0,b=0,l=0)
except Exception:
  pass

# disable itables paging for dashboards
if is_dashboard:
  try:
    from itables import options
    options.dom = 'fiBrtlp'
    options.maxBytes = 1024 * 1024
    options.language = dict(info = "Showing _TOTAL_ entries")
    options.classes = "display nowrap compact"
    options.paging = False
    options.searching = True
    options.ordering = True
    options.info = True
    options.lengthChange = False
    options.autoWidth = False
    options.responsive = True
    options.keys = True
    options.buttons = []
  except Exception:
    pass
  
  try:
    import altair as alt
    # By default, dashboards will have container sized
    # vega visualizations which allows them to flow reasonably
    theme_sentinel = '_quarto-dashboard-internal'
    def make_theme(name):
        nonTheme = alt.themes._plugins[name]    
        def patch_theme(*args, **kwargs):
            existingTheme = nonTheme()
            if 'height' not in existingTheme:
              existingTheme['height'] = 'container'
            if 'width' not in existingTheme:
              existingTheme['width'] = 'container'

            if 'config' not in existingTheme:
              existingTheme['config'] = dict()
            
            # Configure the default font sizes
            title_font_size = 15
            header_font_size = 13
            axis_font_size = 12
            legend_font_size = 12
            mark_font_size = 12
            tooltip = False

            config = existingTheme['config']

            # The Axis
            if 'axis' not in config:
              config['axis'] = dict()
            axis = config['axis']
            if 'labelFontSize' not in axis:
              axis['labelFontSize'] = axis_font_size
            if 'titleFontSize' not in axis:
              axis['titleFontSize'] = axis_font_size  

            # The legend
            if 'legend' not in config:
              config['legend'] = dict()
            legend = config['legend']
            if 'labelFontSize' not in legend:
              legend['labelFontSize'] = legend_font_size
            if 'titleFontSize' not in legend:
              legend['titleFontSize'] = legend_font_size  

            # The header
            if 'header' not in config:
              config['header'] = dict()
            header = config['header']
            if 'labelFontSize' not in header:
              header['labelFontSize'] = header_font_size
            if 'titleFontSize' not in header:
              header['titleFontSize'] = header_font_size    

            # Title
            if 'title' not in config:
              config['title'] = dict()
            title = config['title']
            if 'fontSize' not in title:
              title['fontSize'] = title_font_size

            # Marks
            if 'mark' not in config:
              config['mark'] = dict()
            mark = config['mark']
            if 'fontSize' not in mark:
              mark['fontSize'] = mark_font_size

            # Mark tooltips
            if tooltip and 'tooltip' not in mark:
              mark['tooltip'] = dict(content="encoding")

            return existingTheme
            
        return patch_theme

    # We can only do this once per session
    if theme_sentinel not in alt.themes.names():
      for name in alt.themes.names():
        alt.themes.register(name, make_theme(name))
      
      # register a sentinel theme so we only do this once
      alt.themes.register(theme_sentinel, make_theme('default'))
      alt.themes.enable('default')

  except Exception:
    pass

# enable pandas latex repr when targeting pdfs
try:
  import pandas as pd
  if fig_format == 'pdf':
    pd.set_option('display.latex.repr', True)
except Exception:
  pass

# interactivity
if interactivity:
  from IPython.core.interactiveshell import InteractiveShell
  InteractiveShell.ast_node_interactivity = interactivity

# NOTE: the kernel_deps code is repeated in the cleanup.py file
# (we can't easily share this code b/c of the way it is run).
# If you edit this code also edit the same code in cleanup.py!

# output kernel dependencies
kernel_deps = dict()
for module in list(sys.modules.values()):
  # Some modules play games with sys.modules (e.g. email/__init__.py
  # in the standard library), and occasionally this can cause strange
  # failures in getattr.  Just ignore anything that's not an ordinary
  # module.
  if not isinstance(module, types.ModuleType):
    continue
  path = getattr(module, "__file__", None)
  if not path:
    continue
  if path.endswith(".pyc") or path.endswith(".pyo"):
    path = path[:-1]
  if not os.path.exists(path):
    continue
  kernel_deps[path] = os.stat(path).st_mtime
print(json.dumps(kernel_deps))

# set run_path if requested
run_path = 'L2hvbWUvbW9tby9Eb2N1bWVudHMvU1ZN'
if run_path:
  # hex-decode the path
  run_path = base64.b64decode(run_path.encode("utf-8")).decode("utf-8")
  os.chdir(run_path)

# reset state
%reset

# shiny
# Checking for shiny by using False directly because we're after the %reset. We don't want
# to set a variable that stays in global scope.
if False:
  try:
    import htmltools as _htmltools
    import ast as _ast

    _htmltools.html_dependency_render_mode = "json"

    # This decorator will be added to all function definitions
    def _display_if_has_repr_html(x):
      try:
        # IPython 7.14 preferred import
        from IPython.display import display, HTML
      except:
        from IPython.core.display import display, HTML

      if hasattr(x, '_repr_html_'):
        display(HTML(x._repr_html_()))
      return x

    # ideally we would undo the call to ast_transformers.append
    # at the end of this block whenver an error occurs, we do 
    # this for now as it will only be a problem if the user 
    # switches from shiny to not-shiny mode (and even then likely
    # won't matter)
    import builtins
    builtins._display_if_has_repr_html = _display_if_has_repr_html

    class _FunctionDefReprHtml(_ast.NodeTransformer):
      def visit_FunctionDef(self, node):
        node.decorator_list.insert(
          0,
          _ast.Name(id="_display_if_has_repr_html", ctx=_ast.Load())
        )
        return node

      def visit_AsyncFunctionDef(self, node):
        node.decorator_list.insert(
          0,
          _ast.Name(id="_display_if_has_repr_html", ctx=_ast.Load())
        )
        return node

    ip = get_ipython()
    ip.ast_transformers.append(_FunctionDefReprHtml())

  except:
    pass

def ojs_define(**kwargs):
  import json
  try:
    # IPython 7.14 preferred import
    from IPython.display import display, HTML
  except:
    from IPython.core.display import display, HTML

  # do some minor magic for convenience when handling pandas
  # dataframes
  def convert(v):
    try:
      import pandas as pd
    except ModuleNotFoundError: # don't do the magic when pandas is not available
      return v
    if type(v) == pd.Series:
      v = pd.DataFrame(v)
    if type(v) == pd.DataFrame:
      j = json.loads(v.T.to_json(orient='split'))
      return dict((k,v) for (k,v) in zip(j["index"], j["data"]))
    else:
      return v

  v = dict(contents=list(dict(name=key, value=convert(value)) for (key, value) in kwargs.items()))
  display(HTML('<script type="ojs-define">' + json.dumps(v) + '</script>'), metadata=dict(ojs_define = True))
globals()["ojs_define"] = ojs_define
globals()["__spec__"] = None



In [2]:
# | echo: false
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os

plt.rc("image", cmap="Greys")

In [3]:
# | echo: false
def show_images(images, titles=None):
    n = len(images)
    plt.figure(figsize=(12, 4))
    for i in range(n):
        plt.subplot(1, n, i + 1)
        plt.imshow(images[i], cmap="gray")
        if titles:
            plt.title(titles[i])
        plt.axis("off")
    plt.show()


def show_hyperplane(f, xlim=(-2, 6), ax=None, figsize=(3, 3)):
    if ax is None:
        ax = plt.gca()
    min, max = xlim
    xvals = np.linspace(min, max, 100)
    xx, yy = np.meshgrid(xvals, xvals)
    Z = f(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    ax.contour(
        xx,
        yy,
        Z,
        levels=[-1, 0, 1],
        colors=["gray", "black", "gray"],
        linestyles=["--", "-", "--"],
        linewidths=[1, 2, 1],
    )
    ax.set_xlim(min, max)
    ax.set_ylim(min, max)
    ax.set_aspect("equal")

In [4]:
# | echo: false
np.random.seed(42)

d1 = np.random.randn(18, 2)
d2 = np.random.randn(18, 2) + 4

X_dummy = np.vstack((d1, d2))
y_dummy = np.array([1] * 18 + [-1] * 18)

idx = np.random.permutation(len(X_dummy))
X_dummy = X_dummy[idx]
y_dummy = y_dummy[idx]

X_with_noise = np.append(X_dummy, [[2, 3], [3, 0]], axis=0)
y_with_noise = np.append(y_dummy, [1, -1], axis=0)

In [5]:
#| echo: false
class MyPerceptron:

    def __init__(self, lr=0.1, epochs=10, history=False):
        self.lr = lr
        self.epochs = epochs
        self.history = history
        self.losses = []
        self.records = []

    def f(self, x):
        return x @ self.w + self.b

    def sign(self, x):
        return np.where(x > 0, 1, -1)

    def predict(self, x):
        return self.sign(self.f(x))

    def record(self, w, b, current_loss, i):
        self.records.append([(w, self.w.copy()), (b, self.b), current_loss, i])

    def fit(self, X, y):
        _, n_features = X.shape
        self.w = np.zeros(n_features)
        self.b = 0

        for _ in range(self.epochs):
            for i, (xi, yi) in enumerate(zip(X, y)):
                y_hat = self.predict(xi)

                old_w = self.w.copy()
                old_b = self.b
                if self.history:
                    current_loss = (self.sign(self.f(X)) != y).mean()

                if self.history and current_loss == 0:
                    self.record(old_w, old_b, current_loss, i)
                    return

                if yi * y_hat < 0:
                    self.w += self.lr * yi * xi
                    self.b += self.lr * yi
                    if self.history:
                        self.record(old_w, old_b, current_loss, i)

In [6]:
#| echo: false
def plot_records(X, y, records, show_legend=True, show_previous=True, figsize=(15, 15)):
    n_updates = len(records)

    cols = 3
    rows = int(np.ceil(n_updates / cols))
    _, axs = plt.subplots(rows, cols, figsize=figsize)
    axs = axs.flatten() if n_updates > 1 else [axs]

    x_margin = 2.0
    y_margin = 2.0
    x_min = np.min(X[:, 0]) - x_margin
    x_max = np.max(X[:, 0]) + x_margin
    y_min = np.min(X[:, 1]) - y_margin
    y_max = np.max(X[:, 1]) + y_margin

    for i in range(n_updates):
        ax = axs[i]
        old_w, new_w = records[i][0]
        old_b, new_b = records[i][1]

        pos_idx = y == 1
        neg_idx = y == -1
        ax.scatter(
            X[pos_idx, 0],
            X[pos_idx, 1],
            color="blue",
            alpha=0.6,
            marker="+",
        )
        ax.scatter(
            X[neg_idx, 0],
            X[neg_idx, 1],
            color="red",
            alpha=0.6,
        )

        ax.scatter(
            X[records[i][3], 0],
            X[records[i][3], 1],
            facecolor="yellow",
            edgecolor="black",
            label="Current Point",
        )

        def plot_hp(w, b, color, linestyle, label):
            x_vals = np.linspace(x_min, x_max, 100)
            if abs(w[1]) > 1e-6:
                y_vals = (-w[0] * x_vals - b) / w[1]
                ax.plot(
                    x_vals,
                    y_vals,
                    color=color,
                    linestyle=linestyle,
                    label=label,
                )
            else:
                x_val = -b / w[0] if abs(w[0]) > 1e-6 else 0
                ax.axvline(x_val, color=color, linestyle=linestyle, label=label)
            return x_vals, y_vals if abs(w[1]) > 1e-6 else None

        plot_hp(new_w, new_b, "black", "-", "After Update")

        if show_previous:
            plot_hp(old_w, old_b, "green", "--", "Before Update")
            x0 = 0
            y0 = (-old_w[0] * x0 - old_b) / old_w[1] if old_w[1] != 0 else 0
            norm = np.linalg.norm(old_w)
            if norm != 0:
                ax.arrow(
                    x0,
                    y0,
                    old_w[0] / norm,
                    old_w[1] / norm,
                    head_width=0.5,
                    width=0.1,
                    color="green",
                )
        x0 = 0
        y0 = (-new_w[0] * x0 - new_b) / new_w[1] if new_w[1] != 0 else 0
        norm = np.linalg.norm(new_w)
        if norm != 0:
            ax.arrow(
                x0,
                y0,
                new_w[0] / norm,
                new_w[1] / norm,
                head_width=0.5,
                width=0.1,
                color="black",
            )

        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)
        ax.set_title(f"Update {i+1}")
        ax.set_aspect("equal")
        if show_legend:
            ax.legend()

    for j in range(n_updates, len(axs)):
        axs[j].axis("off")

    plt.tight_layout()
    plt.show()

In [7]:
#| fig-cap: Perceptron Learning Process
#| echo: false
perceptron = MyPerceptron(lr=0.1, epochs=2, history=True)
perceptron.fit(X_dummy, y_dummy)
plot_records(X_dummy, y_dummy, perceptron.records[:-1], figsize=(11, 8))

<Figure size 3300x2400 with 6 Axes>

In [8]:
# | fig-cap: The Optimal Hyperplane
# | echo: false
plt.figure(figsize=(3, 3))
plt.scatter(d1[0:, 0], d1[0:, 1], color="blue", marker="+")
plt.scatter(d2[0:, 0], d2[0:, 1], color="red", marker="o")
plt.plot(
    np.linspace(-2, 6, 10),
    -np.linspace(-2, 6, 10) + 4.1,
    color="black",
)
plt.gca().set_aspect("equal")
plt.show()

<Figure size 900x900 with 1 Axes>

In [9]:
# | fig-cap: Different Final Hyperplane With Different Data Order
# | echo: false
params = []
for i in range(6):
    idx = np.random.permutation(len(X_dummy))
    X_shuffled = X_dummy[idx]
    y_shuffled = y_dummy[idx]
    perceptron = MyPerceptron(lr=1, epochs=5, history=True)
    perceptron.fit(X_shuffled, y_shuffled)
    params.append(perceptron.records[-1])
plot_records(
    X_shuffled,
    y_shuffled,
    params,
    show_legend=False,
    show_previous=False,
    figsize=(10, 7),
)

<Figure size 3000x2100 with 6 Axes>

In [10]:
# | echo : false
import cvxopt
from cvxopt import matrix, solvers


class HardMarginSVM:
    def __init__(self):
        self.w = None
        self.b = None
        self.support_vectors = None
        self.support_vector_labels = None
        self.alphas = None

    def fit(self, X, y):
        n_samples, _ = X.shape

        P = matrix(np.outer(y, y) * (X @ X.T))

        q = matrix(-np.ones(n_samples))

        G = matrix(-np.eye(n_samples))
        h = matrix(np.zeros(n_samples))

        A = matrix(y.reshape(1, -1).astype(float))
        b_eq = matrix(0.0)

        # Solve the quadratic programming problem
        solvers.options["show_progress"] = False
        solution = solvers.qp(P, q, G, h, A, b_eq)

        # Extract alphas
        alphas = np.array(solution["x"]).flatten()

        # Find support vectors (alphas > threshold)
        sv_threshold = 1e-5
        sv_indices = alphas > sv_threshold

        self.support_vectors = X[sv_indices]
        self.support_vector_labels = y[sv_indices]
        self.alphas = alphas[sv_indices]

        # Calculate w
        self.w = np.sum(
            (self.alphas * self.support_vector_labels).reshape(-1, 1)
            * self.support_vectors,
            axis=0,
        )

        # Calculate b using any support vector
        self.b = self.support_vector_labels[0] - np.dot(self.support_vectors[0], self.w)

        return self

    def predict(self, X):
        return np.sign(self.decision_function(X))

    def decision_function(self, X):
        return X @ self.w + self.b

In [11]:
# | echo: false
# | fig-cap: The Classification Boundary Found by MMC
qp_svm = HardMarginSVM()
qp_svm.fit(X_dummy, y_dummy)
plt.figure(figsize=(3, 3))
plt.scatter(X_dummy[:, 0], X_dummy[:, 1], c=y_dummy, cmap="bwr", alpha=1)
show_hyperplane(qp_svm.decision_function, (-2, 6))

<Figure size 900x900 with 1 Axes>

In [12]:
# | echo: false
class SMOSVM:
    def __init__(self, C=1.0, tol=1e-3, max_iter=100, gamma=1.0):

        self.C = C
        self.tol = tol
        self.max_iter = max_iter

    def _compute_error(self, i):
        return self.f[i] - self.y[i]

    def _update_f(self, i):
        self.f[i] = np.sum(self.alphas * self.y * self.K[:, i]) + self.b

    def _take_step(self, i1, i2):
        """
        Optimize the pair (alpha_i1, alpha_i2).
        Returns 1 if progress was made, 0 otherwise.
        """
        if i1 == i2:
            return 0

        alpha1 = self.alphas[i1]
        alpha2 = self.alphas[i2]
        y1 = self.y[i1]
        y2 = self.y[i2]
        E1 = self._compute_error(i1)
        E2 = self._compute_error(i2)
        s = y1 * y2

        # Compute bounds L and H
        if y1 != y2:
            L = max(0, alpha2 - alpha1)
            H = min(self.C, self.C + alpha2 - alpha1)
        else:
            L = max(0, alpha1 + alpha2 - self.C)
            H = min(self.C, alpha1 + alpha2)

        if L == H:
            return 0

        # Compute eta
        k11 = self.K[i1, i1]
        k12 = self.K[i1, i2]
        k22 = self.K[i2, i2]
        eta = k11 + k22 - 2 * k12

        if eta > 0:
            # Normal case: compute new alpha2
            alpha2_new = alpha2 + y2 * (E1 - E2) / eta

            # Clip alpha2_new to be within bounds
            if alpha2_new >= H:
                alpha2_new = H
            elif alpha2_new <= L:
                alpha2_new = L
        else:
            alpha2_new = L if abs(alpha2 - L) < abs(alpha2 - H) else H

        # Check if the change in alpha2 is significant
        if abs(alpha2_new - alpha2) < 1e-5:
            return 0

        # Compute new alpha1
        alpha1_new = alpha1 + s * (alpha2 - alpha2_new)

        # Update threshold b
        b1 = (
            self.b
            - E1
            - y1 * (alpha1_new - alpha1) * k11
            - y2 * (alpha2_new - alpha2) * k12
        )
        b2 = (
            self.b
            - E2
            - y1 * (alpha1_new - alpha1) * k12
            - y2 * (alpha2_new - alpha2) * k22
        )

        if 0 < alpha1_new < self.C:
            b_new = b1
        elif 0 < alpha2_new < self.C:
            b_new = b2
        else:
            b_new = (b1 + b2) / 2

        # Update alphas and b
        self.alphas[i1] = alpha1_new
        self.alphas[i2] = alpha2_new
        self.b = b_new

        # Update f cache for all points
        for i in range(self.n):
            self.f[i] += (
                y1 * (alpha1_new - alpha1) * self.K[i1, i]
                + y2 * (alpha2_new - alpha2) * self.K[i2, i]
                + (b_new - self.b + (b_new - self.b))
            )  # This effectively adds (b_new - old_b)

        # Correct the b update in f
        self.f = self.f - (b_new - self.b)
        self.b = b_new

        # Recompute f properly
        self.f = np.sum(self.alphas * self.y * self.K.T, axis=1) + self.b

        return 1

    def _examine_example(self, i2):
        """
        Inner loop: select i1 to pair with i2.
        Returns 1 if progress was made, 0 otherwise.
        """
        y2 = self.y[i2]
        alpha2 = self.alphas[i2]
        E2 = self._compute_error(i2)
        r2 = E2 * y2

        # Check KKT conditions with tolerance
        if (r2 < -self.tol and alpha2 < self.C) or (r2 > self.tol and alpha2 > 0):

            # Get non-bound examples (0 < alpha < C)
            non_bound_indices = np.where((self.alphas > 0) & (self.alphas < self.C))[0]

            if len(non_bound_indices) > 1:
                # Choose i1 to maximize |E1 - E2|
                errors = np.array([self._compute_error(i) for i in non_bound_indices])
                if E2 > 0:
                    i1 = non_bound_indices[np.argmin(errors)]
                else:
                    i1 = non_bound_indices[np.argmax(errors)]

                if self._take_step(i1, i2):
                    return 1

                # If no progress, try all non-bound examples in random order
                np.random.shuffle(non_bound_indices)
                for i1 in non_bound_indices:
                    if self._take_step(i1, i2):
                        return 1

            # If still no progress, try all examples in random order
            all_indices = np.random.permutation(self.n)
            for i1 in all_indices:
                if self._take_step(i1, i2):
                    return 1

        return 0

    def _compute_kernel_matrix(self, X):
        return X @ X.T

    def fit(self, X, y):
        """
        Train the SVM using SMO algorithm.

        Parameters:
        -----------
        X : ndarray of shape (n_samples, n_features)
            Training data
        y : ndarray of shape (n_samples,)
            Target values (must be +1 or -1)
        """
        self.X = X
        self.y = y
        self.n, self.d = X.shape

        # Initialize alphas, b, and error cache
        self.alphas = np.zeros(self.n)
        self.b = 0.0

        # Compute kernel matrix
        self.K = self._compute_kernel_matrix(X)

        # Initialize f (decision function values)
        self.f = np.zeros(self.n) + self.b

        # Main loop
        num_changed = 0
        examine_all = True
        iter_count = 0

        while (num_changed > 0 or examine_all) and iter_count < self.max_iter:
            num_changed = 0

            if examine_all:
                # Loop over all training examples
                for i in range(self.n):
                    num_changed += self._examine_example(i)
            else:
                # Loop over non-bound examples
                non_bound_indices = np.where(
                    (self.alphas > 0) & (self.alphas < self.C)
                )[0]
                for i in non_bound_indices:
                    num_changed += self._examine_example(i)

            if examine_all:
                examine_all = False
            elif num_changed == 0:
                examine_all = True

            iter_count += 1

        # Store support vectors
        sv_indices = self.alphas > 1e-5
        self.support_vectors_ = X[sv_indices]
        self.support_vector_labels_ = y[sv_indices]
        self.support_vector_alphas_ = self.alphas[sv_indices]

    def decision_function(self, X):
        """Compute the decision function for samples in X."""
        if not hasattr(self, "support_vectors_"):
            raise ValueError("Model has not been trained yet")

        n_samples = X.shape[0]
        decision = np.zeros(n_samples)

        for i in range(n_samples):
            K = (X[i : i + 1] @ self.support_vectors_.T).flatten()
            decision[i] = (
                np.sum(self.support_vector_alphas_ * self.support_vector_labels_ * K)
                + self.b
            )

        return decision

    def predict(self, X):
        return np.sign(self.decision_function(X))

In [13]:
# | echo: false
# | fig-cap: The Decision Boundary Found by SMO SVM on Noisy Data

smo_svm = SMOSVM(C=0.2, tol=1e-3, max_iter=100)
smo_svm.fit(X_with_noise, y_with_noise)

# Plot the decision boundary
plt.figure(figsize=(3, 3))
plt.scatter(
    X_with_noise[:, 0], X_with_noise[:, 1], c=y_with_noise, cmap="bwr", alpha=0.6, s=100
)

show_hyperplane(smo_svm.decision_function)

plt.show()

<Figure size 900x900 with 1 Axes>

In [14]:
# | fig-cap: The Illustration of Hinge Loss
# | echo: false
plt.figure(figsize=(4, 2))
xvals = np.linspace(-10, 10, 1000)
yvals = np.maximum(0, 1 - xvals)
plt.plot(xvals, yvals, "b-")
plt.grid(True, alpha=0.3)
plt.axhline(y=0, color="black", alpha=0.3)
plt.axvline(color="black", alpha=0.3)
plt.xlim(-3, 5)
plt.ylim(-1, 4)
plt.show()

<Figure size 1200x600 with 1 Axes>

In [15]:
# | echo: false
# | fig-cap: Effects of Different Learning Rates on Convergence
def f(x):
    return np.array(x) ** 2


fig, axes = plt.subplots(1, 2, figsize=(8, 4))

x = np.linspace(-2, 2, 100)
y = f(x)
axes[0].plot(x, y, "b-")
axes[0].axhline(y=0, color="black", alpha=0.3)
axes[0].axvline(color="black", alpha=0.3)
axes[0].set_ylim(-1, 4)
axes[0].set_xlim(-2, 2)
axes[0].scatter([-1.0], f([-1.0]), color="red")
axes[0].plot([-1.0, 1.5], [f(-1), f(-1)], "r--")
axes[0].axvline(x=1.5, color="black", alpha=0.3, linestyle="--")
axes[0].scatter([1.5], f([1.5]), color="green")
axes[0].text(-1.0, 1.2, "Start", fontsize=12, color="red")
axes[0].text(0.4, 2.3, "Overshoot", fontsize=12, color="green")
axes[0].set_title("Too Large, Loss Increases")

axes[1].plot(x, y, "b-")
axes[1].axhline(y=0, color="black", alpha=0.3)
axes[1].axvline(color="black", alpha=0.3)
axes[1].set_ylim(-1, 4)
axes[1].set_xlim(-2, 2)
axes[1].scatter([-1.8], f([-1.8]), color="red")
axes[1].plot([-1.8, -1.7], [f(-1.8), f(-1.8)], "r--")
axes[1].axvline(x=-1.7, color="black", alpha=0.3, linestyle="--")
axes[1].scatter([-1.7], f([-1.7]), color="green")
axes[1].text(-1.6, 3.3, "Start", fontsize=12, color="red")
axes[1].text(-1.5, 2.7, "Too slow", fontsize=12, color="green")
axes[1].set_title("Too Small, Loss Decreases Slowly")
plt.show()

<Figure size 2400x1200 with 2 Axes>

In [16]:
# | echo: false
class SGDSVM:
    def __init__(self, lambda_=0.01, lr=0.01, epochs=10, batch_size=64):
        self.lambda_ = lambda_
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.loss_history = []

    def loss(self, X, y):
        distances = 1 - y * (X @ self.w + self.b)
        hinge_loss = np.maximum(0, distances).mean()
        reg = self.lambda_ * (self.w @ self.w)
        return hinge_loss + reg

    def gradient(self, X, y):

        # Decision values for batch
        distances = 1 - y * (X @ self.w + self.b)

        # Identify samples with hinge loss > 0
        mask = distances > 0

        # Gradient w.r.t w
        dw = 2 * self.lambda_ * self.w
        if np.any(mask):
            dw += -np.mean((y[mask]).reshape(-1, 1) * X[mask], axis=0)

        # Gradient w.r.t b
        db = 0.0
        if np.any(mask):
            db = -np.mean(y[mask])

        return dw, db

    def fit(self, X, y):
        np.random.seed(42)
        n_samples, n_features = X.shape
        self.w = np.random.normal(0, 0.01, n_features)
        self.b = 0.0

        self.losses_train = []

        for epoch in range(self.epochs):
            indices = np.random.permutation(n_samples)
            X_shuffled, y_shuffled = X[indices], y[indices]

            for i in range(0, n_samples, self.batch_size):

                # numpy array never go out of index range with slicing
                X_batch = X_shuffled[i : i + self.batch_size]
                y_batch = y_shuffled[i : i + self.batch_size]

                dw, db = self.gradient(X_batch, y_batch)
                self.w -= self.lr * dw
                self.b -= self.lr * db

            self.loss_history.append(self.loss(X, y))

    def decision_function(self, X):
        return X @ self.w + self.b

    def predict(self, X):
        return np.sign(self.decision_function(X))

In [17]:
# | echo: false
# | fig-cap: Comparison Between Hard Margin SVM and Soft Margin SVM on Noisy Data
hard_svm = HardMarginSVM()
hard_svm.fit(X_with_noise, y_with_noise)
sgd_svm = SGDSVM(epochs=200, lr=0.01, lambda_=0.1)
sgd_svm.fit(X_with_noise, y_with_noise)

fig, axes = plt.subplots(1, 2, figsize=(6, 3))
axes[0].scatter(
    X_with_noise[:, 0], X_with_noise[:, 1], c=y_with_noise, cmap="bwr", alpha=1
)
axes[0].set_title("Hard Margin SVM")
axes[1].set_title("Soft Margin SVM")
axes[1].scatter(
    X_with_noise[:, 0], X_with_noise[:, 1], c=y_with_noise, cmap="bwr", alpha=1
)
show_hyperplane(hard_svm.decision_function, ax=axes[0])
show_hyperplane(sgd_svm.decision_function, ax=axes[1])
plt.show()

<Figure size 1800x900 with 2 Axes>

In [18]:
# | echo: false
# | fig-cap: Effect of Regularization Parameter on Decision Boundary of Soft Margin SVM
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
Cs = [1000, 1, 0.1]
titles = [r"$C=1000$", r"$C=1$", r"$C=0.1$"]
for ax, C, title in zip(axes, Cs, titles):
    smo_svm = SMOSVM(C, tol=1e-3, max_iter=100)
    smo_svm.fit(X_with_noise, y_with_noise)
    ax.set_title(title)
    ax.scatter(
        X_with_noise[:, 0], X_with_noise[:, 1], c=y_with_noise, cmap="bwr", alpha=1
    )
    show_hyperplane(smo_svm.decision_function, ax=ax)
plt.show()

<Figure size 4500x1500 with 3 Axes>

In [19]:
#| echo: false

import idx2numpy

train_images = idx2numpy.convert_from_file("./mnist/archive/train-images.idx3-ubyte")
train_labels = idx2numpy.convert_from_file("./mnist/archive/train-labels.idx1-ubyte")
test_images = idx2numpy.convert_from_file("./mnist/archive/t10k-images.idx3-ubyte")
test_labels = idx2numpy.convert_from_file("./mnist/archive/t10k-labels.idx1-ubyte")
threes = train_images[train_labels == 3].reshape(-1, 28 * 28).astype(np.float32) / 255.0
sevens = train_images[train_labels == 7].reshape(-1, 28 * 28).astype(np.float32) / 255.0
fives = train_images[train_labels == 5].reshape(-1, 28 * 28).astype(np.float32) / 255.0
fives_valid = (
    test_images[test_labels == 5].reshape(-1, 28 * 28).astype(np.float32) / 255.0
)
threes_valid = (
    test_images[test_labels == 3].reshape(-1, 28 * 28).astype(np.float32) / 255.0
)
sevens_valid = (
    test_images[test_labels == 7].reshape(-1, 28 * 28).astype(np.float32) / 255.0
)

In [20]:
# | echo: false

X = np.vstack([threes, sevens])
y = np.array([1] * len(threes) + [-1] * len(sevens))
X_valid = np.vstack([threes_valid, sevens_valid])
y_valid = np.array([1] * len(threes_valid) + [-1] * len(sevens_valid))

show_images(
    [
        threes[0].reshape(28, 28),
        threes[1].reshape(28, 28),
        threes[2].reshape(28, 28),
        threes[3].reshape(28, 28),
        sevens[0].reshape(28, 28),
        sevens[1].reshape(28, 28),
        sevens[2].reshape(28, 28),
        sevens[3].reshape(28, 28),
    ],
    titles=["3", "3", "3", "3", "7", "7", "7", "7"],
)

<Figure size 3600x1200 with 8 Axes>

In [21]:
sgd_svm = SGDSVM(epochs=200, lr=0.0001, lambda_=0.001)
sgd_svm.fit(X, y)

In [22]:
plt.plot(sgd_svm.loss_history, color="blue", label="Training Loss")

plt.legend()
plt.show()

<Figure size 1650x1050 with 1 Axes>

In [23]:
(sgd_svm.predict(X_valid) == y_valid).mean()

np.float64(0.9828263002944063)

In [24]:
plt.imshow(sgd_svm.w.reshape(28, 28), cmap="RdBu", aspect="equal")

<matplotlib.image.AxesImage at 0x7d35217a9bd0>

<Figure size 1650x1050 with 1 Axes>

In [25]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import seaborn as sns

# Make predictions on validation set
y_pred = sgd_svm.predict(X_valid)

# Calculate accuracy
accuracy = accuracy_score(y_valid, y_pred)
print(f"SGD SVM Validation Accuracy: {accuracy:.4f}")

# Create confusion matrix
cm = confusion_matrix(y_valid, y_pred)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=["7", "3"],
    yticklabels=["7", "3"],
)
plt.title("Confusion Matrix - SGD SVM")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()

# Print detailed classification report
print("\nDetailed Classification Report:")
print(classification_report(y_valid, y_pred, target_names=["7", "3"]))

SGD SVM Validation Accuracy: 0.9828


<Figure size 2400x1800 with 2 Axes>


Detailed Classification Report:
              precision    recall  f1-score   support

           7       0.99      0.98      0.98      1028
           3       0.98      0.99      0.98      1010

    accuracy                           0.98      2038
   macro avg       0.98      0.98      0.98      2038
weighted avg       0.98      0.98      0.98      2038



In [26]:
# Use a smaller subset for SMO (it's computationally more intensive)
n_train_samples = 500
indices = np.random.choice(len(X), n_train_samples, replace=False)
X_train_small = X[indices]
y_train_small = y[indices]

# Train SMO SVM
smo_mnist = SMOSVM(C=1.0, tol=1e-3, max_iter=50)
smo_mnist.fit(X_train_small, y_train_small)

In [27]:
# Evaluate SMO SVM on validation set
y_pred_smo = smo_mnist.predict(X_valid)
accuracy_smo = accuracy_score(y_valid, y_pred_smo)
print(f"SMO SVM Validation Accuracy: {accuracy_smo:.4f}")

# Compare with SGD SVM on the same training subset
sgd_svm_small = SGDSVM(epochs=200, lr=0.0001, lambda_=0.001)
sgd_svm_small.fit(X_train_small, y_train_small)
y_pred_sgd_small = sgd_svm_small.predict(X_valid)
accuracy_sgd_small = accuracy_score(y_valid, y_pred_sgd_small)
print(f"SGD SVM Validation Accuracy (on subset): {accuracy_sgd_small:.4f}")

SMO SVM Validation Accuracy: 0.9720
SGD SVM Validation Accuracy (on subset): 0.9573


In [28]:
# Create confusion matrices for comparison
cm_smo = confusion_matrix(y_valid, y_pred_smo)
cm_sgd = confusion_matrix(y_valid, y_pred_sgd_small)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# SMO confusion matrix
sns.heatmap(cm_smo, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['7', '3'], yticklabels=['7', '3'], ax=axes[0])
axes[0].set_title(f'SMO SVM (Acc: {accuracy_smo:.4f})')
axes[0].set_xlabel('Predicted')
axes[0].set_ylabel('Actual')

# SGD confusion matrix
sns.heatmap(cm_sgd, annot=True, fmt='d', cmap='Greens', 
            xticklabels=['7', '3'], yticklabels=['7', '3'], ax=axes[1])
axes[1].set_title(f'SGD SVM (Acc: {accuracy_sgd_small:.4f})')
axes[1].set_xlabel('Predicted')
axes[1].set_ylabel('Actual')

plt.tight_layout()
plt.show()

<Figure size 4200x1500 with 4 Axes>

In [29]:
# Print comparison report
print("=" * 60)
print("COMPARISON: SMO vs SGD SVM")
print("=" * 60)
print(f"\nTraining set size: {n_train_samples} samples")
print(f"\nSMO SVM:")
print(f"  - Validation Accuracy: {accuracy_smo:.4f}")
print(f"  - Number of Support Vectors: {len(smo_mnist.support_vectors_)}")
print(
    f"  - Support Vector Ratio: {len(smo_mnist.support_vectors_)/n_train_samples:.2%}"
)
print(f"\nSGD SVM:")
print(f"  - Validation Accuracy: {accuracy_sgd_small:.4f}")
print(f"  - Non-zero weights: {np.sum(np.abs(sgd_svm_small.w) > 1e-5)}")
print("=" * 60)

COMPARISON: SMO vs SGD SVM

Training set size: 500 samples

SMO SVM:
  - Validation Accuracy: 0.9720
  - Number of Support Vectors: 68
  - Support Vector Ratio: 13.60%

SGD SVM:
  - Validation Accuracy: 0.9573
  - Non-zero weights: 784


In [30]:
# | echo: false
# | fig-cap: Some Samples of Digits 3, 5, and 7 from the MNIST Dataset
X = np.vstack([threes, fives, sevens])
y = np.array([0] * len(threes) + [1] * len(fives) + [2] * len(sevens))
X_valid = np.vstack([threes_valid, fives_valid, sevens_valid])
y_valid = np.array(
    [0] * len(threes_valid) + [1] * len(fives_valid) + [2] * len(sevens_valid)
)

show_images(
    [
        threes[0].reshape(28, 28),
        threes[1].reshape(28, 28),
        threes[2].reshape(28, 28),
        threes[3].reshape(28, 28),
        fives[0].reshape(28, 28),
        fives[1].reshape(28, 28),
        fives[2].reshape(28, 28),
        fives[3].reshape(28, 28),
        sevens[0].reshape(28, 28),
        sevens[1].reshape(28, 28),
        sevens[2].reshape(28, 28),
        sevens[3].reshape(28, 28),
    ],
    titles=["3", "3", "3", "3", "5", "5", "5", "5", "7", "7", "7", "7"],
)

<Figure size 3600x1200 with 12 Axes>

In [31]:
# | echo: false
class OvR_SGD_SVM:
    def __init__(self, n_classes, lambda_=0.01, lr=0.01, epochs=10, batch_size=64):
        self.n_classes = n_classes
        self.models = [
            SGDSVM(lambda_=lambda_, lr=lr, epochs=epochs, batch_size=batch_size)
            for _ in range(n_classes)
        ]

    def fit(self, X, y):
        for i in range(self.n_classes):
            y_binary = np.where(y == i, 1, -1)
            self.models[i].fit(X, y_binary)

    def predict(self, X):
        decision_values = np.array([model.predict(X) for model in self.models])
        return np.argmax(decision_values, axis=0)

In [32]:
# ovr_svm = OvR_SGD_SVM(n_classes=3, epochs=200, lr=0.0001, lambda_=0.001)
# ovr_svm.fit(X, y)
# ovr_svm_pred = ovr_svm.predict(X_valid)
# (ovr_svm_pred == y_valid).mean()

In [33]:
# # Calculate accuracy
# accuracy = accuracy_score(y_valid, ovr_svm_pred)
# print(f"SGD SVM Validation Accuracy: {accuracy:.4f}")

# # Create confusion matrix
# cm = confusion_matrix(y_valid, ovr_svm_pred)

# # Plot confusion matrix
# plt.figure(figsize=(8, 6))
# sns.heatmap(
#     cm,
#     annot=True,
#     fmt="d",
#     cmap="Blues",
#     xticklabels=["3", "5", "7"],
#     yticklabels=["3", "5", "7"],
# )
# plt.title("Confusion Matrix - SGD SVM")
# plt.xlabel("Predicted")
# plt.ylabel("Actual")
# plt.show()

# # Print detailed classification report
# print("\nDetailed Classification Report:")
# print(classification_report(y_valid, ovr_svm_pred, target_names=["3", "5", "7"]))