**Course Name：** DAT341 / DIT867 Applied Machine Learning

**Examiner：** Richard Johansson (richajo@chalmers.se)

**Assignment No.:** Assignment 4 - Software for neural network training

**Due Date:** Mon, 26 Feb 2024 11:59pm

**Group Name:** PA 4 3

**Group Members:**
- Natalia Alvarado (gusalvarsi@student.gu.se)
- Erdem Halil (gushaliler@student.gu.se)
- Xujie Li (guslixuf@student.gu.se)


In [1]:
import pandas as pd
import numpy as np
import torch

In [2]:
!wget --no-check-certificate https://www.cse.chalmers.se/~richajo/dit866/assignments/a4/data/a4_synthetic.csv

zsh:1: command not found: wget


# Task 1

Loading the synthetic dataset.

In [3]:
# You may need to edit the path, depending on where you put the files.
data = pd.read_csv('a4_synthetic.csv')

X = data.drop(columns='y').to_numpy()
Y = data.y.to_numpy()

Training a linear regression model for this synthetic dataset.

In [4]:
np.random.seed(1)

w_init = np.random.normal(size=(2, 1))
b_init = np.random.normal(size=(1, 1))

# We just declare the parameter tensors. Do not use nn.Linear.
w = torch.tensor(w_init, requires_grad=True)
b = torch.tensor(b_init, requires_grad=True)


eta = 1e-2
# A SGD optimizer with a learning rate of eta
opt = torch.optim.SGD([w,b], lr=eta)

t1_mse_per_epoch = []

for i in range(10):

    sum_err = 0

    for row in range(X.shape[0]):
        x = torch.tensor(X[[row], :])
        y = torch.tensor(Y[[row]])

        # Forward pass.
        # Compute predicted value for x
        y_pred = x @ w + b
        # Compute squared error loss
        err = (y - y_pred)**2

        # Backward and update.
        # Compute gradients and then update the model.
        opt.zero_grad()
        err.backward()
        opt.step()

        # For statistics.
        sum_err += err.item()

    mse = sum_err / X.shape[0]
    print(f'Epoch {i+1}: MSE =', mse)
    t1_mse_per_epoch.append(mse)

Epoch 1: MSE = 0.7999661130823179
Epoch 2: MSE = 0.017392390107906875
Epoch 3: MSE = 0.009377418010839892
Epoch 4: MSE = 0.009355326971438456
Epoch 5: MSE = 0.009365440968904256
Epoch 6: MSE = 0.009366989180952533
Epoch 7: MSE = 0.009367207398577986
Epoch 8: MSE = 0.009367238983974489
Epoch 9: MSE = 0.009367243704122537
Epoch 10: MSE = 0.009367244427185763


# Task 2

In [5]:
class Node:
    def __init__(self):
        pass

    def backward(self, grad_output):
        raise NotImplementedError('Unimplemented')

    def __repr__(self):
        return str(type(self))


class AdditionNode(Node):
    def __init__(self, left, right):
        self.left = left
        self.right = right

    def backward(self, grad_output):
        self.left.backward(grad_output)
        self.right.backward(grad_output)

class SubtractionNode(Node):
    def __init__(self, left, right):
        self.left = left
        self.right = right

    def backward(self, grad_output):
        self.left.backward(grad_output)
        self.right.backward(-grad_output)

class MatrixMultiplicationNode(Node):
    def __init__(self, left, right):
        self.left = left
        self.right = right

    def backward(self, grad_output):
        left_grad = grad_output @ self.right.data.T
        right_grad = self.left.data.T @ grad_output
        self.left.backward(left_grad)
        self.right.backward(right_grad)

class PowerNode(Node):
    def __init__(self, base, exponent):
        self.base = base
        self.exponent = exponent

    def backward(self, grad_output):
        base_grad = self.exponent * self.base.data**(self.exponent - 1) * grad_output
        self.base.backward(base_grad)

In [6]:
class Tensor:

    # Constructor. Just store the input values.
    def __init__(self, data, requires_grad=False, grad_fn=None):
        self.data = data
        self.shape = data.shape
        self.grad_fn = grad_fn
        self.requires_grad = requires_grad
        self.grad = None

    # So that we can print the object or show it in a notebook cell.
    def __repr__(self):
        dstr = repr(self.data)
        if self.requires_grad:
            gstr = ', requires_grad=True'
        elif self.grad_fn is not None:
            gstr = f', grad_fn={self.grad_fn}'
        else:
            gstr = ''
        return f'Tensor({dstr}{gstr})'

    # Extract one numerical value from this tensor.
    def item(self):
        return self.data.item()

    # Operator +
    def __add__(self, right):
        # Call the helper function defined below.
        return addition(self, right)

    # Operator -
    def __sub__(self, right):
        # Call the helper function defined below.
        return subtraction(self, right)

    # Operator @
    def __matmul__(self, right):
        # Call the helper function defined below.
        return matmul(self, right)

    # Operator **
    def __pow__(self, right):
        # NOTE! We are assuming that right is an integer here, not a Tensor!
        if not isinstance(right, int):
            raise Exception('only integers allowed')
        if right < 2:
            raise Exception('power must be >= 2')
        # Call the helper function defined below.
        return power(self, right)


    # Backward computations. Will be implemented in Task 4.
    def backward(self, grad_output=None):
        # We first check if this tensor has a grad_fn: that is, one of the
        # nodes that you defined in Task 3.
        if self.grad_fn is not None:
            # If grad_fn is defined, we have computed this tensor using some operation.
            if grad_output is None:
                # This is the starting point of the backward computation.
                # This will typically be the tensor storing the output of
                # the loss function, on which we have called .backward()
                # in the training loop.
                self.grad_fn.backward(grad_output=1.0)
            else:
                # This is an intermediate node in the computational graph.
                # This corresponds to any intermediate computation, such as
                # a hidden layer.
                self.grad = grad_output
                self.grad_fn.backward(self.grad)
        else:
            # If grad_fn is not defined, this is an endpoint in the computational
            # graph: learnable model parameters or input data.
            if self.requires_grad:
                # This tensor *requires* a gradient to be computed. This will
                # typically be a tensor that holds learnable parameters.
                self.grad = grad_output
            else:
                # This tensor *does not require* a gradient to be computed. This
                # will typically be a tensor holding input data.
                pass


# A small utility where we simply create a Tensor object. We use this to
# mimic torch.tensor.
def tensor(data, requires_grad=False):
    return Tensor(data, requires_grad)

# We define helper functions to implement the various arithmetic operations.

# This function takes two tensors as input, and returns a new tensor holding
# the result of an element-wise addition on the two input tensors.
def addition(left, right):
    new_data = left.data + right.data
    grad_fn = AdditionNode(left, right)
    return Tensor(new_data, grad_fn=grad_fn)

def subtraction(left, right):
    new_data = left.data - right.data
    grad_fn = SubtractionNode(left, right)
    return Tensor(new_data, grad_fn=grad_fn)

def matmul(left, right):
    new_data = left.data @ right.data
    grad_fn = MatrixMultiplicationNode(left, right)
    return Tensor(new_data, grad_fn=grad_fn)

def power(left, right):
    new_data = left.data ** right
    grad_fn = PowerNode(left, right)
    return Tensor(new_data, grad_fn=grad_fn)

Some sanity checks.

In [7]:
# Two tensors holding row vectors.
x1 = tensor(np.array([[2.0, 3.0]]))
x2 = tensor(np.array([[1.0, 4.0]]))
# A tensors holding a column vector.
w = tensor(np.array([[-1.0], [1.2]]))

# Test the arithmetic operations.
test_plus = x1 + x2
test_minus = x1 - x2
test_power = x2 ** 2
test_matmul = x1 @ w

print(f'Test of addition: {x1.data} + {x2.data} = {test_plus.data}')
print(f'Test of subtraction: {x1.data} - {x2.data} = {test_minus.data}')
print(f'Test of power: {x2.data} ** 2 = {test_power.data}')
print(f'Test of matrix multiplication: {x1.data} @ {w.data} = {test_matmul.data}')

# Check that the results are as expected. Will crash if there is a miscalculation.
assert(np.allclose(test_plus.data, np.array([[3.0, 7.0]])))
assert(np.allclose(test_minus.data, np.array([[1.0, -1.0]])))
assert(np.allclose(test_power.data, np.array([[1.0, 16.0]])))
assert(np.allclose(test_matmul.data, np.array([[1.6]])))

Test of addition: [[2. 3.]] + [[1. 4.]] = [[3. 7.]]
Test of subtraction: [[2. 3.]] - [[1. 4.]] = [[ 1. -1.]]
Test of power: [[1. 4.]] ** 2 = [[ 1. 16.]]
Test of matrix multiplication: [[2. 3.]] @ [[-1. ]
 [ 1.2]] = [[1.6]]


# Tasks 3 and 4

**NOTE:** Moved Node cell above for easier execution of the whole notebook.

Sanity check for Task 3.

In [8]:
x = tensor(np.array([[2.0, 3.0]]))
w1 = tensor(np.array([[1.0, 4.0]]), requires_grad=True)
w2 = tensor(np.array([[3.0, -1.0]]), requires_grad=True)

test_graph = x + w1 + w2

print('Computational graph top node after x + w1 + w2:', test_graph.grad_fn)

assert(isinstance(test_graph.grad_fn, AdditionNode))
assert(test_graph.grad_fn.right is w2)
assert(test_graph.grad_fn.left.grad_fn.left is x)
assert(test_graph.grad_fn.left.grad_fn.right is w1)

Computational graph top node after x + w1 + w2: <class '__main__.AdditionNode'>


Sanity check for Task 4.

In [9]:
x = tensor(np.array([[2.0, 3.0]]))
w = tensor(np.array([[-1.0], [1.2]]), requires_grad=True)
y = tensor(np.array([[0.2]]))

# We could as well write simply loss = (x @ w - y)**2
# We break it down into steps here if you need to debug.

model_out = x @ w
diff = model_out - y
loss = diff ** 2

loss.backward()

print('Gradient of loss w.r.t. w =\n', w.grad)

assert(np.allclose(w.grad, np.array([[5.6], [8.4]])))
assert(x.grad is None)
assert(y.grad is None)

Gradient of loss w.r.t. w =
 [[5.6]
 [8.4]]


An equivalent cell using PyTorch code. Your implementation should give the same result for `w.grad`.

In [10]:
pt_x = torch.tensor(np.array([[2.0, 3.0]]))
pt_w = torch.tensor(np.array([[-1.0], [1.2]]), requires_grad=True)
pt_y = torch.tensor(np.array([[0.2]]))

pt_model_out = pt_x @ pt_w
pt_model_out.retain_grad() # Keep the gradient of intermediate nodes for debugging.

pt_diff = pt_model_out - pt_y
pt_diff.retain_grad()

pt_loss = pt_diff ** 2
pt_loss.retain_grad()

pt_loss.backward()
pt_w.grad

tensor([[5.6000],
        [8.4000]], dtype=torch.float64)

# Task 5

In [11]:
class Optimizer:
    def __init__(self, params):
        self.params = params

    def zero_grad(self):
        for p in self.params:
            p.grad = np.zeros_like(p.data)

    def step(self):
        raise NotImplementedError('Unimplemented')


class SGD(Optimizer):
    def __init__(self, params, lr):
        super().__init__(params)
        self.lr = lr

    def step(self):
        for p in self.params:
            p.data -= self.lr * p.grad

Sanity check for Task 5.

In [12]:
np.random.seed(1)

w_init = np.random.normal(size=(2, 1))
b_init = np.random.normal(size=(1, 1))

# We just declare the parameter tensors. Do not use nn.Linear.
w = tensor(w_init, requires_grad=True)
b = tensor(b_init, requires_grad=True)


eta = 1e-2
# A SGD optimizer with a learning rate of eta
opt = SGD([w, b], lr=eta)

t5_mse_per_epoch = []

for i in range(10):

    sum_err = 0

    for row in range(X.shape[0]):
        x = tensor(X[[row], :])
        y = tensor(Y[[row]])

        # Forward pass.
        # Compute predicted value for x
        y_pred = x @ w + b
        # Compute squared error loss
        err = (y - y_pred)**2

        # Backward and update.
        # Compute gradients and then update the model.
        opt.zero_grad()
        err.backward()
        opt.step()

        # For statistics.
        sum_err += err.item()

    mse = sum_err / X.shape[0]
    print(f'Epoch {i+1}: MSE =', mse)
    t5_mse_per_epoch.append(mse)

# Check that the results are as expected. Will crash if there is a miscalculation.
#assert(t1_mse_per_epoch == t5_mse_per_epoch)

# The normal assertion will fail due to very small decimals.
np.testing.assert_almost_equal(t1_mse_per_epoch, t5_mse_per_epoch, decimal=15)

Epoch 1: MSE = 0.7999661130823179
Epoch 2: MSE = 0.017392390107906875
Epoch 3: MSE = 0.009377418010839892
Epoch 4: MSE = 0.009355326971438458
Epoch 5: MSE = 0.009365440968904258
Epoch 6: MSE = 0.009366989180952535
Epoch 7: MSE = 0.009367207398577987
Epoch 8: MSE = 0.00936723898397449
Epoch 9: MSE = 0.009367243704122534
Epoch 10: MSE = 0.009367244427185761


# Task 6

In [13]:
!wget --no-check-certificate https://www.cse.chalmers.se/~richajo/dit866/assignments/a4/data/raisins.csv

zsh:1: command not found: wget


In [14]:
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split

# You may need to edit the path, depending on where you put the files.
a4data = pd.read_csv('raisins.csv')

X = scale(a4data.drop(columns='Class'))
Y = 1.0*(a4data.Class == 'Besni').to_numpy()

np.random.seed(0)
shuffle = np.random.permutation(len(Y))
X = X[shuffle]
Y = Y[shuffle]

Xtrain, Xtest, Ytrain, Ytest = train_test_split(X, Y, random_state=0, test_size=0.2)

In [15]:
class LinearNode(Node):
    def __init__(self, input, weight, bias):
        self.input = input
        self.weight = weight
        self.bias = bias

    def backward(self, grad_output):
        input_grad = grad_output @ self.weight.data.T
        weight_grad = self.input.data.T @ grad_output
        bias_grad = np.sum(grad_output, axis=0)
        self.input.backward(input_grad)
        self.weight.backward(weight_grad)
        self.bias.backward(bias_grad)

class TanhNode(Node):
    def __init__(self, input):
        self.input = input

    def backward(self, grad_output):
        tanh_grad = 1 - np.tanh(self.input.data)**2
        self.input.backward(grad_output * tanh_grad)


class SigmoidNode(Node):
    def __init__(self, input):
        self.input = input

    def backward(self, grad_output):
        sigmoid_output = 1 / (1 + np.exp(-self.input.data))
        sigmoid_grad = sigmoid_output * (1 - sigmoid_output)
        self.input.backward(grad_output * sigmoid_grad)

class BinaryCrossEntropyLossNode(Node):
    def __init__(self, input, target):
        self.input = input
        self.target = target

    def backward(self, grad_output, epsilon=1e-10):
        input_clamped = np.clip(self.input.data, epsilon, 1 - epsilon)  # to avoid input.data being 0 or 1
        input_grad = -(self.target.data / input_clamped - (1 - self.target.data) / (1 - input_clamped))
        self.input.backward(input_grad)


def linear(input, weight, bias):
    new_data = input.data @ weight.data + bias.data
    grad_fn = LinearNode(input, weight, bias)
    return Tensor(new_data, grad_fn=grad_fn)

def tanh(input):
    new_data = np.tanh(input.data)
    grad_fn = TanhNode(input)
    return Tensor(new_data, grad_fn=grad_fn)

def sigmoid(input):
    new_data = 1 / (1 + np.exp(-input.data))
    grad_fn = SigmoidNode(input)
    return Tensor(new_data, grad_fn=grad_fn)


def binary_cross_entropy(input, target, epsilon=1e-10):
    input_clamped = np.clip(input.data, epsilon, 1 - epsilon)  # to avoid input.data being 0 or 1
    new_data = -target.data * np.log(input_clamped) - (1 - target.data) * np.log(1 - input_clamped)
    grad_fn = BinaryCrossEntropyLossNode(input, target)
    return Tensor(new_data, grad_fn=grad_fn)


class Model:
    def __init__(self, input_dim, hidden_dim):
        self.weight1 = tensor(np.random.randn(input_dim, hidden_dim), requires_grad=True)
        self.bias1 = tensor(np.zeros(hidden_dim), requires_grad=True)
        self.weight2 = tensor(np.random.randn(hidden_dim, 1), requires_grad=True)
        self.bias2 = tensor(np.zeros(1), requires_grad=True)

    def forward(self, input):
        h = tanh(linear(input, self.weight1, self.bias1))
        output = sigmoid(linear(h, self.weight2, self.bias2))
        return output

def train(model, optimizer, Xtrain, Ytrain, epochs):
    for epoch in range(epochs):
        optimizer.zero_grad()
        input = tensor(Xtrain)
        target = tensor(Ytrain.reshape(-1, 1))
        output = model.forward(input)
        loss = binary_cross_entropy(output, target)
        loss.backward()
        optimizer.step()
    return model

# Evaluation function
def evaluate(model, X_test, y_test):
    input = tensor(X_test)
    output = model.forward(input)
    predictions = np.round(output.data)
    y_test = y_test.reshape(-1, 1)
    accuracy = np.mean(predictions == y_test)
    return accuracy

In [16]:
model = Model(Xtrain.shape[1], 10)
optimizer = SGD([model.weight1, model.bias1, model.weight2, model.bias2], lr=0.001)
trained_model = train(model, optimizer, Xtrain, Ytrain, epochs=1000)
accuracy = evaluate(trained_model, Xtest, Ytest)
print(f"Accuracy: {accuracy}")

Accuracy: 0.8833333333333333


In [17]:
# Custom grid search
learning_rates = [0.0001, 0.001, 0.01, 0.1]
hidden_dims = [5, 7, 10, 20]
epochs = [100, 500, 1000, 10000]

def grid_search(learning_rates, hidden_dims, epochs):
    best_accuracy = 0
    best_combination = None
    for lr in learning_rates:
        for hidden_dim in hidden_dims:
            for epoch in epochs:
                model = Model(Xtrain.shape[1], hidden_dim)
                optimizer = SGD([model.weight1, model.bias1, model.weight2, model.bias2], lr=lr)
                trained_model = train(model, optimizer, Xtrain, Ytrain, epochs=epoch)
                accuracy = evaluate(trained_model, Xtest, Ytest)
                if accuracy > best_accuracy:
                    best_accuracy = accuracy
                    best_combination = (lr, hidden_dim, epoch)

    return best_accuracy, best_combination

best_accuracy, best_combination = grid_search(learning_rates, hidden_dims, epochs)
print(f"Best accuracy: {best_accuracy * 100}%")
print(f"Best combination: {best_combination}")

Best accuracy: 88.88888888888889%
Best combination: (0.0001, 5, 500)
