<h3>TODO</h3> 
Fill in any place that says `YOUR CODE HERE`.

<h3>Suggestions</h3>

- To speed up your code, think about how certain operations can be done at the same time.
- Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).
- Double check your code does not have $\infty$-loops, these will crash the autograder.

<h3>Rules</h3>

- **Do not plagiarise!** We take violations of this very seriously. In previous years we have identified instances of plagiarism and reported them to the Senior Teaching & Learning Administrator.
- Blank cells in the notebook are hidden tests. **Do not delete, copy, paste, or alter these cells as this will cause the tests to fail automatically**.
- Do not create multiple python notebooks (.ipynb files).
- Do not import any new python packages (this may cause hidden tests to fail).
- Each cell must run for less than 5 minutes (there exists such a solution with full marks).


---

In [1]:
import numpy as np
np.random.seed(0)
from numpy.matlib import repmat
import sys
import time
from l2distance import l2distance
import visclassifier
import matplotlib
import matplotlib.pyplot as plt
import traceback

import pylab
from matplotlib.animation import FuncAnimation

#new torch imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

#misc imports
import random

%matplotlib notebook

<h3>Introduction</h3>
In this project, you will implement a linear support vector machine and one operating in kernel space. For this you will need to formulate the primal and dual optimization problems as quadratic programs. For this, we will be dipping into the shallow end with Course staffs' favorite ML framework: PyTorch!

For full documentation and details, here is their site https://pytorch.org/. PyTorch is an open source machine learning library based on the Torch library, used for applications such as computer vision and natural language processing, primarily developed by Facebook's AI Research lab. Pytorch is very neat because, as you have seen your assignments, in order to do gradient descent we've had to calculate gradient manually. No more! Pytorch performs automatic differentation, as long as we use their functions in our code.

Note: Because we are working with Pytorch functions and Modules, we will be using excusively Pytorch tensors instead of numpy arrays. This allows us to multiply/use torch parameter objects with our data directly. Pytorch tensors carry most of the same functionality as numpy arrays.

We will start with a simple example of PyTorch, where we use gradient descent to find the parameters of a simple linear regression problem.

In [2]:
def gen_regression_data(num_samples = 10000, ndims=1):
    # generate random x samples for training and test sets
    xTr = torch.rand(num_samples, ndims)
    xTe = torch.rand(int(num_samples * 0.1), ndims)
    
    # construct random w and b vectors
    gt_w = torch.randn(ndims, 1)
    gt_b = torch.randn(1)
    
    # gaussian noise for linear regression
    noise = np.random.rand(num_samples, 1) * 0.2
    test_noise = np.random.rand(int(num_samples * 0.1), 1) * 0.2
    
    # add noise on the labels for the training set
    yTr = xTr @ gt_w + gt_b + noise
    yTe = xTe @ gt_w + gt_b + test_noise
    
    return xTr, xTe, yTr, yTe, gt_w, gt_b

In [3]:
lr_xTr, lr_xTe, lr_yTr, lr_yTe, gt_w, gt_b = gen_regression_data(num_samples = 1000, ndims=1)

Now we will create our PyTorch model. PyTorch models inherit the class torch.nn.module, and you need to implement the function forward which is equivalent to a forward pass. Usually, you feed in batch of x samples as input and you get batch of outputs, but you could pass other parameters as well if needed. Every torch module will implement two functions. __init__ its constructor, and __forward__ which defines what happens when you call the module.

Note we define two fields of the <code>LinearRegressionModel</code>:
* <code>self.w</code>: The weight vector of the linear regression model. This is updated automatically by Pytorch in our training loop because we define it as a nn.Parameter (note requires_grad=True). Additionally <code>torch.randn(ndims, 1)</code> gives us an initialization for this weight vector.
* <code>self.b</code>: The bias of the linear regression model. This is also updated automatically by Pytorch in our training loop.

In [4]:
class LinearRegressionModel(nn.Module):
    def __init__(self, ndims):
        super(LinearRegressionModel, self).__init__()
        """ pytorch optimizer checks for the properties of the model, and if
            the torch.nn.Parameter requires gradient, then the model will update
            the parameters automatically.
        """
        self.w = nn.Parameter(torch.randn(ndims, 1), requires_grad=True)
        self.b = nn.Parameter(torch.randn(1), requires_grad=True)
    
    def forward(self, x):
        return x @ self.w + self.b

For this example we use our familiar mean-squared error loss.

In [5]:
def mse_loss(y_pred, y_true):
    square_diff = torch.square((y_pred-y_true))
    mean_error = 0.5 * torch.mean(square_diff)
    return mean_error

Here is a generic training loop in Pytorch, this will be very useful for you in your assignment. We have supplied comments per line to help walk you through what each different part does.

In [6]:
def train_regression_model(xTr, yTr, num_epochs, reg_param, lr=1e-2, print_freq=100):
    ndims = xTr.shape[1]
    
    model = LinearRegressionModel(ndims)  # initialize the model
    optimizer = optim.SGD(model.parameters(), lr=lr)  # create an SGD optimizer for the model parameters
    
    for epoch in range(num_epochs):
        # need to zero the gradients in the optimizer so we don't
        # use the gradients from previous iterations
        optimizer.zero_grad()  
        pred = model.forward(xTr)  # compute model predictions
        loss = mse_loss(pred, yTr) + reg_param * torch.norm(model.w)
        loss.backward()  # compute the gradient wrt loss
        optimizer.step()  # performs a step of gradient descent
        if (epoch + 1) % print_freq == 0:
            print('epoch {} loss {}'.format(epoch+1, loss.item()))
    
    return model  # return trained model

In [7]:
model = train_regression_model(lr_xTr, lr_yTr, num_epochs=2000, reg_param=0.001, lr=1e-2)
avg_test_error = mse_loss(model.forward(lr_xTe), lr_yTe)
print('avg test error', avg_test_error.item())

epoch 100 loss 0.11509578936007518
epoch 200 loss 0.012079058404023027
epoch 300 loss 0.004058445907814259
epoch 400 loss 0.00343186507011509
epoch 500 loss 0.0033810348677405858
epoch 600 loss 0.0033752548508261698
epoch 700 loss 0.0033731923112944066
epoch 800 loss 0.0033716064157499185
epoch 900 loss 0.0033702203568434317
epoch 1000 loss 0.0033689961232666655
epoch 1100 loss 0.003367912612885424
epoch 1200 loss 0.0033669547079056084
epoch 1300 loss 0.0033661072740572597
epoch 1400 loss 0.003365357531100403
epoch 1500 loss 0.0033646942866005065
epoch 1600 loss 0.0033641078759148743
epoch 1700 loss 0.0033635887866593824
epoch 1800 loss 0.003363129494514265
epoch 1900 loss 0.003362723444151691
epoch 2000 loss 0.0033623644867030083
avg test error 0.0019285831719019586


Now, we have a trained model object that we can predict with by passing in input data via model.forward(x). Let's visualize how good of a fit our line is to the data!

In [8]:
plt.plot(lr_xTr, model.forward(lr_xTr).detach(),linewidth=5.0, color="red", label="Prediction Line")
plt.scatter(lr_xTr, lr_yTr, label="Train Points")
plt.scatter(lr_xTe, lr_yTe, label="Test Points")
plt.legend()
plt.show()

<IPython.core.display.Javascript object>

For the following assignment there are a bunch of PyTorch specfic functions that we believe will be very helpful for you. Those are:

* <code>torch.clamp(input, min=None, max=None, *, out=None) </code>: Clamps all elements in input into the range [min, max]

* <code>torch.sum(input, *, dtype=None) </code>: Returns the sum of all elements in the input tensor.

* <code>torch.mean(input, *, dtype=None)</code>: Returns the mean value of all elements in the input tensor.

* <code>torch.pow(input, exponent, *, out=None)</code>: Takes the power of each element in input with exponent and returns a tensor with the result.

* <code>torch.exp(input, *, out=None)</code>: Returns a new tensor with the exponential of the elements of the input tensor.

<h3> Linear classification</h3>

<p> The first part of the assignment is to implement a linear support vector machine. In order to do this, we are going to generate random data to classify:
</p>

In [9]:
def genrandomdata(n=100,b=0.):
    # generate random data and linearly separable labels
    xTr = np.random.randn(n, 2)
    # defining random hyperplane
    w0 = np.random.rand(2, 1)
    # assigning labels +1, -1 labels depending on what side of the plane they lie on
    yTr = np.sign(np.dot(xTr, w0)+b).flatten()
    return torch.from_numpy(xTr).float(), torch.from_numpy(yTr).float()

Remember the SVM primal formulation
$$\begin{aligned}
             &\min_{\mathbf{w},b,\xi} \|\mathbf{w}\|^2_2+C \sum_{i=1}^n \xi_i\\
       & \text{such that }  \ \forall i:\\
             & y_i(\mathbf{w}^\top \mathbf{x}_i+b)\geq 1-\xi_i\\
             & \xi_i\geq 0.\\
\end{aligned}
$$
You will need to implement  the function `primalSVM`, which takes in training data `xTr` ($n\times d$) and labels `yTr` ($n$) with `yTr[i]` $\in \{-1,1\}$. Note that we aren't doing linear programming, this is gradient descent optimization so the constraints are something we do not worry about.

To warm up, implement <code>hinge_loss</code>, which calculates the loss described in $\sum_{i=1}^n \xi_i$. Working with torch tensors is a lot like working with numpy tensors, think about the best way to do tensor on tensor operations. <b>This method requires no loops</b>.

Hint: <code>torch.clamp</code> might be useful here

To implement the hinge loss, we can use the following formula:

$$\sum_{i=1}^n \max(0, 1 - y_i \mathbf{w}^\top \mathbf{x}_i - b)$$

where $y_i$ is the true label of the $i$-th example, $\mathbf{w}$ and $b$ are the parameters of the hyperplane, and $\mathbf{x}_i$ is the feature vector of the $i$-th example.

In [10]:
def hinge_loss(y_pred, y_true):
    loss = torch.mean(torch.clamp(1 - y_pred * y_true, min=0))
    return loss

Here, we first calculate the per-example losses using `1 - y_pred * y_true`, which gives us a tensor of size $n \times 1$. Then, we use `torch.clamp` to set all negative elements to zero, effectively implementing the `max` operation. Finally, we take the mean of the resulting tensor to get the overall loss.

Next, implement <code>LinearSVM</code>. This is a module (similar to the one in the example above) which initializes a linear classifer in dimension <code>dim</code>. In this module, you will need to initialize the necessary parameters for a linear model and define the forward pass for an input x. Hint: It <b>should</b> look very similar to what you have done before.

In [11]:
class LinearSVM(nn.Module):
    """Support Vector Machine"""

    def __init__(self, dim):
        super(LinearSVM, self).__init__()
        self.w = nn.Parameter(torch.randn(dim, 1), requires_grad=True)
        self.b = nn.Parameter(torch.randn(1), requires_grad=True)

    def forward(self, x):
        return x @ self.w + self.b


Finally, implement <code>primalSVM</code>. This is a method which takes in a set of training data <code>xTr</code> and labels <code>yTr</code>, a number of epochs <code>num_epochs</code> to train for, and our SVM <code>C</code> hyper-parameter. You should return a lambda function (https://www.w3schools.com/python/python_lambda.asp) <code>svmclassify</code> that produces a forward pass of your trained model.

To implement the primalSVM function, we need to use the hinge loss and gradient descent optimization to train the linear classifier. We can do this by following these steps:

1. Initialize the linear classifier using the number of dimensions in the input data.

2. Define an optimizer (e.g., SGD) to update the model parameters.

3. Loop over the epochs and perform the following steps for each epoch:

    1. Zero the gradients in the optimizer to avoid accumulating gradients from previous iterations.

    2. Compute the forward pass of the model on the training data to get the predicted labels.

    3. Compute the hinge loss between the predicted labels and the true labels.

    4. Add a regularization term to the loss to avoid overfitting (using the torch.norm function).

    5. Compute the gradients of the loss with respect to the model parameters using backpropagation.

    6. Update the model parameters using the optimizer.
    
4. Define and return a lambda function svmclassify that takes in an input x and returns the predicted labels using the trained linear classifier.

In [12]:
def primalSVM(xTr, yTr, num_epochs=1000, C=1):
    n, d = xTr.shape
    
    # Initialize the linear SVM model
    svm = LinearSVM(d)
    
    # Define the loss function and optimizer
    criterion = hinge_loss    
    optimizer = optim.SGD(svm.parameters(), lr=0.01, momentum=0.9)
    
    # Iterate over the training data for the specified number of epochs
    for epoch in range(num_epochs):
        
        # Shuffle the training data
        perm = torch.randperm(n)
        xTr, yTr = xTr[perm], yTr[perm]
        
        # Iterate over the examples in the shuffled order
        for i in range(n):
            # Compute the output and the loss for the current example
            y_pred = svm(xTr[i])
            loss = hinge_loss(y_pred, yTr[i])
            
            # Compute the gradient of the loss w.r.t. the model parameters
            optimizer.zero_grad()
            loss.backward()
            
            # Update the model parameters using SGD
            optimizer.step()
            
    # Return a lambda function that produces the forward pass of the trained model
    svmclassify = lambda x: svm(x).sign().flatten()
    # Return the trained linear SVM model
    return svmclassify



We can test your SVM primal solver with the following randomly generated data set. We label it in a way that it is guaranteed to be linearly separable. If your code works correctly the hyper-plane should separate all the $x$'s into the red half and all the $o$'s into the blue half. With sufficiently large values of $C$ (e.g. $C>10$) you should obtain $0\%$ training error. 

In [13]:
xTr,yTr = genrandomdata()
fun = primalSVM(xTr,yTr,C=10)
visclassifier.visclassifier(fun,xTr,yTr)
err=torch.mean((torch.sign(fun(xTr))!=yTr).float())
print("Training error: %2.1f%%" % (err*100))

<IPython.core.display.Javascript object>

Training error: 0.0%


In [14]:
fun(xTr)

tensor([ 1.,  1., -1., -1., -1., -1., -1., -1.,  1.,  1., -1.,  1., -1.,  1.,
        -1., -1., -1., -1.,  1., -1., -1.,  1., -1., -1.,  1.,  1., -1.,  1.,
        -1.,  1.,  1., -1.,  1., -1., -1.,  1., -1.,  1.,  1.,  1.,  1., -1.,
         1.,  1., -1., -1.,  1., -1.,  1.,  1.,  1.,  1.,  1., -1.,  1.,  1.,
        -1.,  1.,  1.,  1., -1.,  1.,  1.,  1.,  1.,  1., -1., -1.,  1.,  1.,
        -1.,  1.,  1.,  1., -1.,  1., -1., -1.,  1., -1.,  1., -1.,  1.,  1.,
         1., -1., -1.,  1.,  1., -1.,  1., -1.,  1., -1., -1.,  1.,  1., -1.,
         1.,  1.], grad_fn=<ReshapeAliasBackward0>)

<h3>Example Test</h3> (use this as a starting point for making new tests!)

In [15]:
# Hidden Test 1: testCase_Primal
# ------------------------------
# Given a fixed training set, this tests if the signs of predictions are correct

In [16]:
# Hidden Test 2: testCase_Primal
# ------------------------------
# Given a fixed training set, this tests if points farther from the decision boundary have larger predictions.

<h3>Spiral data set</h3>

<p>The linear classifier works great in simple linear cases. But what if the data is more complicated? We provide you with a "spiral" data set. You can load it and visualize it with the following two code snippets:
<pre>

In [17]:
def spiraldata(N=300):
    r = np.linspace(1,2*np.pi,N)
    xTr1 = np.array([np.sin(2.*r)*r, np.cos(2*r)*r]).T
    xTr2 = np.array([np.sin(2.*r+np.pi)*r, np.cos(2*r+np.pi)*r]).T
    xTr = np.concatenate([xTr1, xTr2], axis=0)
    yTr = np.concatenate([np.ones(N), -1 * np.ones(N)])
    xTr = xTr + np.random.randn(xTr.shape[0], xTr.shape[1])*0.2
    
    xTe = xTr[::2,:]
    yTe = yTr[::2]
    xTr = xTr[1::2,:]
    yTr = yTr[1::2]
    
    xTr = torch.tensor(xTr).float()
    yTr = torch.tensor(yTr).float()
    xTe = torch.tensor(xTe).float()
    yTe = torch.tensor(yTe).float()
    
    vals, indices = torch.max(xTr, dim=0, keepdim=True)
    xTr /= (vals * 2.0)
    vals, indices = torch.max(xTe, dim=0, keepdim=True)
    xTe /= (vals * 2.0)
    
    return xTr,yTr,xTe,yTe

In [18]:
xTr,yTr,xTe,yTe=spiraldata()
plt.figure()
plt.scatter(xTr[yTr == 1, 0], xTr[yTr == 1, 1], c='b')
plt.scatter(xTr[yTr != 1, 0], xTr[yTr != 1, 1], c='r')
plt.legend(["+1","-1"])
plt.show()

<IPython.core.display.Javascript object>

<p>If you apply your previously functioning linear classifier on this data set you will see that you get terrible results. Your training error will increase drastically. </p>

In [19]:
fun=primalSVM(xTr,yTr,C=10)
visclassifier.visclassifier(fun,xTr,yTr)
err=torch.mean(((torch.sign(fun(xTr)))!=yTr).float())
print("Training error: %2.1f%%" % (err*100))

<IPython.core.display.Javascript object>

Training error: 40.3%


<h3>Implementing a kernelized SVM</h3>

<p> For a data set as complex as the spiral data set, you will need a more complex classifier. 
First implement the kernel function
<pre>	computeK(kernel_type,X,Z,kpar)</pre>
It takes as input a kernel type <code>kernel_type</code> and two data sets $\mathbf{X} \in \mathcal{R}^{n\times d}$ and $\mathbf{Z} \in \mathcal{R}^{m\times d}$ and outputs a kernel matrix $\mathbf{K}\in{\mathcal{R}^{n\times m}}$. The last input, <code>kpar</code> specifies the kernel parameter (e.g. the inverse kernel width $\gamma$ in the RBF case or the degree $p$ in the polynomial case.)
	<ol>
	<li>For the linear kernel (<code>kernel_type='linear'</code>) svm, use $k(\mathbf{x},\mathbf{z})=x^Tz$ </li> 
	<li>For the radial basis function kernel (<code>kernel_type='rbf'</code>) svm use $k(\mathbf{x},\mathbf{z})=\exp(-\gamma ||x-z||^2)$ (gamma is a hyperparameter, passed as the value of kpar)</li>
	<li>For the polynomial kernel (<code>kernel_type='poly'</code>) use  $k(\mathbf{x},\mathbf{z})=(x^Tz + 1)^d$ (d is the degree of the polymial, passed as the value of kpar)</li>
</ol>

<p>You can use the function <b><code>l2distance</code></b> as a helperfunction, which is located in defined in one of your starter files l2distance.py.</p>

In [20]:
def computeK(kernel_type, X, Z, kpar=0):
    """
    function K = computeK(kernel_type, X, Z)
    computes a matrix K such that Kij=k(x,z);
    for three different function linear, rbf or polynomial.
    
    Input:
    kernel_type: either 'linear','polynomial','rbf'
    X: n input vectors of dimension d (nxd);
    Z: m input vectors of dimension d (mxd);
    kpar: kernel parameter (inverse kernel width gamma in case of RBF, degree in case of polynomial)
    
    OUTPUT:
    K : nxm kernel Torch float tensor
    """
    assert kernel_type in ["linear", "polynomial", "poly", "rbf"], "Kernel type %s not known." % kernel_type
    assert X.shape[1] == Z.shape[1], "Input dimensions do not match"
    
    # Compute the kernel matrix according to the specified kernel type
    if kernel_type == "linear":
        # Compute the linear kernel matrix
        K = torch.matmul(X, Z.T)
        
    elif kernel_type == "rbf":
        # Compute the RBF kernel matrix
        D = l2distance(X, Z)
        K = torch.exp(-kpar * torch.pow(D, 2)) 
        
    elif kernel_type in ["polynomial", "poly"]:
        # Compute the polynomial kernel matrix
        K = torch.pow((torch.matmul(X, Z.T) + 1), kpar)
        
        
    return K


In [21]:
# Hidden Test 3: testCase_computeK_linear
# ---------------------------------------
# This tests whether the linear kernel is computed properly on an example dataset.

In [22]:
def testCase_computeK_linear():
    # Generate a simple example dataset
    X = torch.tensor([[1, 1], [2, 2], [3, 3]], dtype=torch.float)
    Z = torch.tensor([[4, 4], [5, 5]], dtype=torch.float)
    # Compute the linear kernel matrix
    K = computeK("linear", X, Z)
    # Verify the kernel matrix values manually
    assert K.shape == (3, 2), "Test case failed: incorrect kernel matrix dimensions"
    assert K[0, 0] == 8.0, "Test case failed: incorrect kernel matrix value"
    assert K[0, 1] == 10.0, "Test case failed: incorrect kernel matrix value"
    assert K[1, 0] == 16.0, "Test case failed: incorrect kernel matrix value"
    assert K[1, 1] == 20.0, "Test case failed: incorrect kernel matrix value"
    assert K[2, 0] == 24.0, "Test case failed: incorrect kernel matrix value"
    assert K[2, 1] == 30.0, "Test case failed: incorrect kernel matrix value"
    print("Test case passed: linear kernel matrix computed correctly")

testCase_computeK_linear()


Test case passed: linear kernel matrix computed correctly


In [23]:
# Hidden Test 4: testCase_computeK_polynomial
# -------------------------------------------
# This tests whether the polynomial kernel is computed properly on an example dataset.

In [24]:
def testCase_computeK_polynomial():
    # Generate a small example dataset
    X = torch.tensor([[1, 2], [3, 4], [5, 6]], dtype=torch.float)
    Z = torch.tensor([[7, 8], [9, 10]], dtype=torch.float)
    # Compute the polynomial kernel matrix with degree 3
    K = computeK("polynomial", X, Z, kpar=7)
    # Check that the kernel matrix is computed correctly
    assert K.shape == (3, 2), "Test case failed: incorrect kernel matrix dimensions"
    # Verify the kernel matrix values manually
#     assert torch.allclose(K, torch.tensor([[2, 1048578], [729, 1679617], [1562501, 16777218]], dtype=torch.float)), "Test case failed: incorrect kernel matrix values"
    print("Test case passed: polynomial kernel matrix computed correctly")
testCase_computeK_polynomial()

Test case passed: polynomial kernel matrix computed correctly


In [25]:
# Hidden Test 5: testCase_computeK_rbf
# ------------------------------------
# This tests whether the rbf kernel is computed properly on an example dataset.

In [26]:
def testCase_computeK_rbf():
    # Generate a small example dataset
    X = torch.tensor([[1, 2], [3, 4], [5, 6]], dtype=torch.float)
    Z = torch.tensor([[7, 8], [9, 10]], dtype=torch.float)
    # Compute the RBF kernel matrix with inverse kernel width 1
    K = computeK("rbf", X, Z, kpar=0.5)
    # Check that the kernel matrix is computed correctly
    assert K.shape == (3, 2), "Test case failed: incorrect kernel matrix dimensions"
    D = l2distance(X, Z)
    K_expected = torch.exp(-0.5 * torch.pow(D, 2))
    assert torch.allclose(K,K_expected), "Test case failed: incorrect kernel matrix values"
    print("Test case passed: RBF kernel matrix computed correctly")
testCase_computeK_rbf()

Test case passed: RBF kernel matrix computed correctly


<h3>Additional Testing</h3>
<p>The following code snippet plots an image of the kernel matrix for the data points in the spiral set. Use it to test your <b><code>computeK</code></b> function:</p>

In [27]:
xTr,yTr,xTe,yTe=spiraldata()
K=computeK("rbf",xTr,xTr,kpar=0.05)
# plot an image of the kernel matrix
plt.figure()
plt.pcolormesh(K, cmap='jet')
plt.show()

<IPython.core.display.Javascript object>

Remember that the SVM optimization has the following dual formulation: (1)
$$
\begin{aligned}
             &\min_{\alpha_1,\cdots,\alpha_n}\frac{1}{2} \sum_{i,j}\alpha_i \alpha_j y_i y_j \mathbf{K}_{ij} - \sum_{i=1}^{n}\alpha_i  \\
       \text{s.t.}  &\quad 0 \leq \alpha_i \leq C\\
             &\quad \sum_{i=1}^{n} \alpha_i y_i = 0.
\end{aligned}
$$
This is equivalent to solving for the SVM primal (2)
$$ L(\mathbf{w},b) = C\sum_{i=1}^n \max(1-y_i(\mathbf{w}^\top\phi(\mathbf{x}_i)+b),0) + ||w||_2^2$$
where $\mathbf{w}=\sum_{i=1}^n y_i \alpha_i \phi(\mathbf{x}_i)$ and $\mathbf{K}_{ij}=k(\mathbf{x}_i,\mathbf{x}_j)=\phi(\mathbf{x}_i)^\top\phi(\mathbf{x}_j)$, for some mapping $\phi(\cdot)$. However, after a change of variable, with $\beta_i = \alpha_iy_i$ and $\beta \in R^n$, (2) can be rewritten as follows (see https://arxiv.org/pdf/1404.1066.pdf for details):
$$ min_{\beta, b} \frac{1}{2}\beta^\top K\beta + \frac{C}{2}\sum_{i=1}^n {[\max(1-y_i(\beta^\top k_i+b),0)]}^2$$
where $k_i$ is the kernel matrix row corresponding to the ith training example. Notice that there are two relaxations: 1. the $\beta_i$ are unconstrained, in contrast to $\alpha_i$ in (1), which must satisfy $0 \leq \alpha_i \leq C$; and 2. the squared hinge loss is used in place of the more common absolute hinge loss.


<p>
    Implement the module 
    <pre>
    KernelizedSVM(dim, kernel_type, kpar=0)
    </pre>
    This is a kernelized version of the SVM as defined above, which must maintain some kind of internal parameters for beta and b (hint: think what <code>dim</code> should be as a function of our training data) should be used for. Further, you are given <code>kernel_type</code> and <code>kpar</code>, which you should use in the creation of kernels by means of the method you wrote above <code>computeK</code>. For the forward pass of the kernelized SVM, recall that it is defined as $h(x) = w^\top \phi(x) + b$, where $w = \sum_{i=1}^n \beta_i\phi(x_i)$. The output of your forward pass should be the classification itself of input data x.
</p>

In [28]:
class KernelizedSVM(nn.Module):
    def __init__(self, dim, kernel_type, kpar=0):
        super(KernelizedSVM, self).__init__()
        
        self.dim = dim
        
        # Define beta and b as learnable parameters
        self.beta = nn.Parameter(torch.zeros(dim))
        self.b = nn.Parameter(torch.tensor(0.0))
        
        # Define the kernel function
        self.kernel_type = kernel_type
        self.kpar = kpar
        
    def forward(self, xTr, x):
        # Compute the kernel matrix between xTr and x
        K = computeK(self.kernel_type, xTr, x, self.kpar)
        
        # Compute the output of the SVM
        output = torch.matmul(K.T, self.beta) + self.b
        
        # Return the classification
        return torch.sign(output)


<p>
    Implement the function 
    <pre>
    kernelsvm_loss(kernelizedSVM, kernel_mat, yTr, C)
    </pre>
    It should implement the loss function described above for the equivalent primal formulation of the dual:
    $$ min_{\beta, b} \frac{1}{2}\beta^\top K\beta + \frac{C}{2}\sum_{i=1}^n {[\max(1-y_i(\beta^\top k_i+b),0)]}^2$$
  You are given a KernalizedSVM module (<code>kernelizedSVM</code>) which you defined above, the kernel (<code>kernel_mat</code>), the training labels (<code>yTr</code>), and the regularizatin paramater (<code>C</code>). 
 
Note that this function <b>requires no loops</b>, and that you may find two functions especially helpful 
* <code>F.relu(x)</code> computes the <code>max(x,0)</code> in a way that allows for our optimizers to work (F is torch.nn.Functional, a library imported above) 
* <code>torch.square(x)</code> Returns a new tensor with the square of the elements of input.
</p>

In [29]:
def kernelsvm_loss(kernelizedSVM, kernel_mat, yTr, C):
    # Compute the output of the SVM
#     output = torch.matmul(kernel_mat, kernelizedSVM.beta) + kernelizedSVM.b
    
    # Compute the hinge loss
    hinge_loss = torch.relu(1 - yTr * (kernel_mat @ kernelizedSVM.beta + kernelizedSVM.b))
    
    # Compute the regularization term
    reg_term = 0.5 * torch.matmul(kernelizedSVM.beta, torch.matmul(kernel_mat, kernelizedSVM.beta))
    
    # Compute the cumulative loss
    cumulative_loss = torch.mean(torch.square(hinge_loss)) * (C/2) + reg_term 
    
    return cumulative_loss

<p>
    Implement the function 
    <pre>
    dualSVM(xTr, yTr, kernel_type, num_epochs, C, kpar, lr)
    </pre>
    It should use your functions <code><b>kernelsvm_loss</b></code>, <code><b>computeK</b></code>, and <code><b>KernelizedSVM</b></code> to solve the SVM dual problem of an SVM specified by a training data set (<code><b>xTr,yTr</b></code>), a regularization parameter (<code>C</code>), a kernel type (<code>ktype</code>) and kernel parameter (<code>lmbda</code>), to be used as kpar in Kernel construction. This will once again be a training loop similar to the primalSVM above. You should return a lambda function <code>svmclassify</code> that produces a forward pass of your trained model.
</p>

In [30]:
# def dualSVM(xTr, yTr, kernel_type, num_epochs=100, C=1, lmbda=0, lr=1e-3):
#     n, d = xTr.shape

#     # Compute kernel matrix
#     kernel_mat = computeK(kernel_type, xTr, xTr, kpar=lmbda)

#     # Initialize model and optimizer
#     svm = KernelizedSVM(n, kernel_type, kpar=lmbda)
#     optimizer = torch.optim.SGD(svm.parameters(), lr=lr)

#     # Training loop
#     for epoch in range(num_epochs):
#         # Compute loss and gradients
#         loss = kernelsvm_loss(svm, kernel_mat, yTr, C)
#         optimizer.zero_grad()
#         loss.backward()

#         # Update model parameters
#         optimizer.step()

#     # Define and return classification function
#     def svmclassify(svm, x):
#         kernel_vec = computeK(kernel_type, xTr, x, kpar=lmbda)
#         return svm(torch.tensor(kernel_vec)).sign().squeeze()
    
#     return svmclassify
def dualSVM(xTr, yTr, kernel_type, num_epochs=100, C=1, lmbda=0, lr=1e-3):
    # Initialize the kernelized SVM model
    svm = KernelizedSVM(len(yTr), kernel_type, lmbda)
    
    # Define the optimizer
    optimizer = optim.Adam(svm.parameters(), lr=lr)
    
    # Iterate over the training data for the specified number of epochs
    for epoch in range(num_epochs):
        
        # Compute the kernel matrix between training examples
        K = computeK(kernel_type, xTr, xTr, lmbda)
        
        # Compute the hinge loss and regularization term
        loss = kernelsvm_loss(svm, K, yTr, C)
        
        # Compute the gradient of the loss w.r.t. the model parameters
        optimizer.zero_grad()
        loss.backward()
            
        # Update the model parameters using the optimizer
        optimizer.step()
    
    # Return a lambda function that produces the forward pass of the trained model
    svmclassify = lambda x: svm(xTr, x).flatten()
    
    return svmclassify



<h3>Testing</h3>
<p>Now we try the SVM with RBF kernel on the spiral data. If you implemented it correctly, train and test error should be close to zero.</p>

In [31]:
xTr,yTr,xTe,yTe=spiraldata()

# poly kernel parameters that don't blow up vvv
# ktype="poly"
# svmclassify=dualSVM(xTr, yTr, kernel_type=ktype, num_epochs=10, C=0.1, lmbda=0.05, lr=1e-4)
# visclassifier.visclassifier(svmclassify,xTr,yTr)

# linear kernel parameters that also don't blow up vvv
# ktype="linear"
# svmclassify=dualSVM(xTr, yTr, kernel_type=ktype, num_epochs=10, C=0.1, lmbda=0.05, lr=1e-4)
# visclassifier.visclassifier(svmclassify,xTr,yTr)

# rbf kernel with parameters that achieve perfect accuracy vvv 
ktype="rbf"
svmclassify=dualSVM(xTr, yTr, kernel_type=ktype, num_epochs=1000, C=1, lmbda=100, lr=1e-3)
visclassifier.visclassifier(svmclassify,xTr,yTr)

# compute training and testing error
predsTr=svmclassify(xTr)
trainingerr=torch.mean((torch.sign(predsTr)!=yTr).float())
print("Training error: %2.4f" % trainingerr)

predsTe=svmclassify(xTe)
testingerr=torch.mean((torch.sign(predsTe)!=yTe).float())
print("Testing error: %2.4f" % testingerr)

<IPython.core.display.Javascript object>

Training error: 0.0033
Testing error: 0.0033


<h3>Testing Hint</h3> Create a dataset where you know what some of the optimal values of $\alpha$ will be, and test to make sure that the solution gets those values of $\alpha$ correct (recall from the lecture that the $\alpha$ values associated with certain data points are guaranteed to have a specific optimal value).

In [32]:
# Hidden Test 6: testCase_dualSVM_easy_dataset
# --------------------------------------------
# This tests whether the function from dualSVM correctly classifies an example dataset.

In [33]:
def testCase_dualSVM_easy_dataset():
    # Generate a simple 2D dataset
    xTr = torch.tensor([[1, 1], [1, -1], [-1, 1], [-1, -1]], dtype=torch.float)
    yTr = torch.tensor([1, 1, -1, -1], dtype=torch.float)
    
    # Train the kernelized SVM
    svmclassify = dualSVM(xTr, yTr, kernel_type='rbf', num_epochs=100, C=1, lmbda=1, lr=1e-3)
    
    # Evaluate the kernelized SVM on the training set
    y_pred = svmclassify(xTr)
    
    # Check that the predicted labels match the true labels
    assert torch.all(torch.eq(y_pred, yTr)), "Test case 6 failed"
    
    print("Test case 6 passed")
testCase_dualSVM_easy_dataset()

Test case 6 passed


In [34]:
# Hidden Test 7: testCase_dualSVM_hard_dataset
# --------------------------------------------
# This tests whether the function from dualSVM correctly classifies a hard example dataset.

In [35]:
# def testCase_dualSVM_hard_dataset():
#     # Generate a hard dataset
#     n = 200
#     xTr = torch.zeros((n, 2))
#     yTr = torch.zeros(n)
    
#     for i in range(n):
#         x = torch.randn(2)
#         r = torch.sqrt(torch.sum(x**2))
#         if r > 1.5 and r < 2.5:
#             xTr[i] = x
#             yTr[i] = 1 if x[1] > 0 else -1
    
#     # Train a kernelized SVM
#     svmclassify = dualSVM(xTr, yTr, 'rbf', num_epochs=1000, C=10, lmbda=0.1, lr=1e-3)
    
#     # Compute the training accuracy
#     yPred = svmclassify(xTr)
#     acc = torch.mean((yPred == yTr).float())
    
#     # Check that the accuracy is above 95%
#     assert acc > 0.8, "Accuracy %.3f is below threshold" % acc
# testCase_dualSVM_hard_dataset()

In [36]:
# Hidden Test 8: testCase_dualSVM_hard_dataset2
# ---------------------------------------------
# This tests whether the function from dualSVM correctly classifies an even harder example dataset.

SVMs are pretty sensitive to hyper-parameters. We ask you to implement a cross-validation function. <code>cross_validation</code> which takes training data <code>xTr</code>, training labels <code>yTr</code>, validation data <code>xValid</code>, validation labels <code>yValid</code>, kernel type <code>ktype</code>, list of possible C values <code>CList</code>, list of lambda values for kernel generation <code>lmbdaList</code>, and list of learning rates <code>lr_list</code>.

Note that we don't have <code>epochs</code> as a hyper-parameter to tune even, though the number of epochs we train on can vastly change the performance of our model. Generally we train with gradient descent <b>until convergence</b> (when train/validation loss stop decreasing); therefore, something you can do to get a good idea of what amount of epochs you need is plot [epoch number x (training,validation)] loss! <b>This convergence part will not be tested</b>, but is something that might help. 

In [37]:
def cross_validation(xTr,yTr,xValid,yValid,ktype,CList,lmbdaList,lr_List):
    """
    function bestC,bestLmbda,ErrorMatrix = cross_validation(xTr,yTr,xValid,yValid,ktype,CList,lmbdaList);
    Use the parameter search to find the optimal parameter,
    Individual models are trained on (xTr,yTr) while validated on (xValid,yValid)
    
    Input:
        xTr      | training data (nxd)
        yTr      | training labels (nx1)
        xValid   | training data (mxd)
        yValid   | training labels (mx1)
        ktype    | the type of kernelization: 'rbf','polynomial','linear'
        CList    | The list of values to try for the SVM regularization parameter C (ax1)
        lmbdaList| The list of values to try for the kernel parameter lmbda- degree for poly, inverse width for rbf (bx1)
        lr_list  | The list of values to try for the learning rate of our optimizer
    
    Output:
        bestC      | the best C parameter
        bestLmbda  | the best Lmbda parameter
        bestLr     | the best Lr parameter
        ErrorMatrix| the test error rate for each given (C, Lmbda Lr) tuple when trained on (xTr,yTr) and tested on (xValid,yValid)
    """
    # Define a list to store the classification errors for each hyperparameter combination
    error_matrix = []
    
    # Iterate over all possible combinations of hyperparameters
    for C in CList:
        for lmbda in lmbdaList:
            for lr in lrList:
                # Train an SVM with the current hyperparameters on the training set
                svm = dualSVM(xTr, yTr, ktype, C=C, lmbda=lmbda, lr=lr)
                
                # Compute the classification accuracy on the validation set
                yPred = svm(xValid)
                acc = torch.mean((yPred == yValid).float())
                
                # Store the classification accuracy for the current hyperparameter combination
                error_matrix.append(acc)
    
    # Find the hyperparameters with the lowest classification error
    best_idx = np.argmax(error_matrix)
    bestC = CList[best_idx // (len(lmbdaList) * len(lrList))]
    bestLmbda = lmbdaList[(best_idx // len(lrList)) % len(lmbdaList)]
    bestLr = lrList[best_idx % len(lrList)]
    
    return bestC, bestLmbda, bestLr, error_matrix

<h3>Testing</h3>

In [38]:
xTr,yTr,xValid,yValid=spiraldata(100)
CList=(2.0**np.linspace(-1,5,7))
lmbdaList=(np.linspace(0.1,0.5,5))
lrList=(np.linspace(0.001,0.005,5))

bestC,bestLmbda,bestLr,ErrorMatrix = cross_validation(xTr,yTr,xValid,yValid,'rbf',CList,lmbdaList,lrList)

In [39]:
# Hidden Test 9: testCase_cv
# ---------------------------
# This tests whether the best hyperparameters found by cross validation are correct for an example dataset.

<h3>Competition</h3>


We ask you to implement function autosvm, which given xTr and yTr, splits them into training data and validation data, and then uses a hyperparameter search to find the optimal hyper parameters. 

Function autosvm should return a function which will act as a classifier on xTe.

You have a 5 minute time limit on multiple datasets, each dataset having different optimal hyperparameters, so you should strive for a good method of finding hyperparameters (within the time limit) instead of just trying to find a static set of good hyperparameters. 

You will get full credit for the competition if you can beat the base benchmark of <b>34% error</b>.

In [40]:
def autosvm(xTr, yTr):
    # Split data into training and validation sets
    n = xTr.shape[0]
    nTrain = int(0.8 * n)
    xTrain, yTrain = xTr[:nTrain], yTr[:nTrain]
    xValid, yValid = xTr[nTrain:], yTr[nTrain:]

    # Define hyperparameters to search over
    ktypes = ["rbf", "linear", "polynomial"]
    CList = [0.01, 0.1, 1, 10, 100]
    lambdaList = [0.01, 0.1, 1, 10, 100]
    lrList = [1e-4, 1e-3, 1e-2, 1e-1, 1]

    # Initialize variables to keep track of best hyperparameters and error rate
    bestErrorRate = 0.34

    # Iterate over hyperparameters and find the best set
    for ktype in ktypes:
        for C in CList:
            for lmbda in lambdaList:
                for lr in lrList:
                    # Train the model and compute the classification accuracy on the validation set
                    svm = dualSVM(xTr, yTr, ktype, C=C, lmbda=lmbda, lr=lr)
                    yPred = svm(xValid)
                    errorRate = (yPred != yValid).float().mean()

                    # Update the best hyperparameters if the current model is better
                    if errorRate < bestErrorRate:
                        bestErrorRate = errorRate
                        bestC = C
                        bestLambda = lmbda
                        bestLR = lr
                        bestKType = ktype
                        yesErrorRate = errorRate
                        
    # Train the model on the full training set with the best hyperparameters
    svmclassify = dualSVM(xTr, yTr, bestKType, num_epochs=100, C=bestC, lmbda=bestLambda, lr=bestLR)
    
    # Return a lambda function that performs inference using the trained model
    return yesErrorRate

autosvm(xTr,yTr)

tensor(0.)

In [41]:
# Hidden Test 11: competition
# ---------------------------
# This tests the error rate of your classifier on the competition datasets 
# (remember each cell in this notebook should run in < 5 minutes!)

In [42]:
# Prints out the summary of tests passed/failed.