In [1]:
import pandas as pd
import numpy as np
import torch

In [2]:
print(torch.__version__)

2.2.0+cu121


# Task 1

Loading the synthetic dataset.

In [3]:
# You may need to edit the path, depending on where you put the files.
data = pd.read_csv('a4_synthetic.csv')

X = data.drop(columns='y').to_numpy()
Y = data.y.to_numpy()

Training a linear regression model for this synthetic dataset.

In [4]:
# Initialize parameters
np.random.seed(69)
w_init = np.random.normal(size=(2, 1))
b_init = np.random.normal(size=(1, 1))

# Declare parameter tensors
w = torch.tensor(w_init, requires_grad=True)
b = torch.tensor(b_init, requires_grad=True)

eta = 1e-2
opt = torch.optim.SGD([w, b], lr=eta)  # Define SGD optimizer

# Training loop
for i in range(10):
    sum_err = 0
    for row in range(X.shape[0]):
        x = torch.tensor(X[[row], :],)
        y = torch.tensor(Y[[row]])

        # Forward pass
        y_pred = x @ w + b  # Matrix multiplication for prediction
        err = torch.square((y_pred - y))  # Compute squared error loss

        # Backward pass and update
        
        err.backward()  # Compute gradients
        opt.step()  # Update parameters
        opt.zero_grad()  # Clear gradients

        # For statistics
        sum_err += err.item()

    mse = sum_err / X.shape[0]
    print(f'Epoch {i+1}: MSE =', mse)


Epoch 1: MSE = 0.39164808632788506
Epoch 2: MSE = 0.016984872842749174
Epoch 3: MSE = 0.009564297806691957
Epoch 4: MSE = 0.009377840166444317
Epoch 5: MSE = 0.009368557944496174
Epoch 6: MSE = 0.009367457445536662
Epoch 7: MSE = 0.009367280149523092
Epoch 8: MSE = 0.00936725047479511
Epoch 9: MSE = 0.009367245535839945
Epoch 10: MSE = 0.009367244720749514


# Task 2, 3 & 4

In [5]:
class Node:
    def __init__(self):
        pass

    def backward(self, grad_output):
        if self.grad_fn is not None:
            self.grad_fn.backward(grad_output)

    def __repr__(self):
        return str(type(self))


class AdditionNode(Node):
    def __init__(self, left, right):
        self.left = left
        self.right = right

    def backward(self, grad_output):
        self.left.backward(grad_output)
        self.right.backward(grad_output)
        
        
class SubtractionNode(Node):
    def __init__(self, left, right):
        self.left = left
        self.right = right
        
    def backward(self, grad_output):
        self.left.backward(grad_output)
        self.right.backward(grad_output)
    
        
class MatrixMultiplicationNode(Node):
    def __init__(self, left, right):
        self.left = left
        self.right = right
    
    def backward(self, grad_output):
        self.left.backward(grad_output @ self.right.data.T)
        self.right.backward(self.left.data.T @ grad_output)
        
        
class ExponentiationNode(Node):
    def __init__(self, tensor, exponent):
        self.tensor = tensor
        self.exponent = exponent
        
    def backward(self, grad_output):
        grad_input = self.exponent * self.tensor.data ** (self.exponent - 1) * grad_output
        self.tensor.backward(grad_input)
        
        
class TanhNode(Node):
    def __init__(self, tensor):
        self.tensor = tensor

    def backward(self, grad_output):
        grad_input = grad_output * (1 - np.tanh(self.tensor.data))**2
        self.tensor.backward(grad_input)
        

class BinaryCrossEntropyLossNode(Node):
    def __init__(self, prediction, target):
        self.prediction = prediction
        self.target = target

    def backward(self, grad_output):
        sig = 1 / (1 + np.exp(-self.prediction.data))  # Apply sigmoid function to prediction data
        sig_neg_x = 1 / (1 + np.exp(self.prediction.data))
    
        # Calculate gradient of loss with respect to prediction
        grad_loss = (-self.target.data * sig_neg_x + (1 - self.target.data) * sig) * grad_output
        
        # Backward pass to propagate gradient
        self.prediction.backward(grad_loss)

In [6]:
class Tensor:

    # Constructor. Just store the input values.
    def __init__(self, data, requires_grad=False, grad_fn=None):
        self.data = data
        self.shape = data.shape
        self.grad_fn = grad_fn
        self.requires_grad = requires_grad
        self.grad = None

    # So that we can print the object or show it in a notebook cell.
    def __repr__(self):
        dstr = repr(self.data)
        if self.requires_grad:
            gstr = ', requires_grad=True'
        elif self.grad_fn is not None:
            gstr = f', grad_fn={self.grad_fn}'
        else:
            gstr = ''
        return f'Tensor({dstr}{gstr})'

    # Extract one numerical value from this tensor.
    def item(self):
        return self.data.item()

    # For Task 2:

    # Operator +
    def __add__(self, right):
        # Call the helper function defined below.
        return addition(self, right)

    # Operator -
    def __sub__(self, right):
        return subtraction(self, right)

    # Operator @
    def __matmul__(self, right):
        return matmul(self, right)

    # Operator **
    def __pow__(self, right):
        # NOTE! We are assuming that right is an integer here, not a Tensor!
        if not isinstance(right, int):
            raise Exception('only integers allowed')
        if right < 2:
            raise Exception('power must be >= 2')
        return exponentiation(self, right)


    def backward(self, grad_output=None):
        if self.grad_fn is not None:
            # If grad_fn is defined, we have computed this tensor using some operation.
            if grad_output is None:
                #raise an error if gradient of the loss function is required
                self.grad_fn.backward(np.ones(self.shape))
            else:
                # This is an intermediate node in the computational graph.
                self.grad_fn.backward(grad_output)
        else:
            # If grad_fn is not defined, this is an endpoint in the computational
            # graph: learnable model parameters or input data.
            if self.requires_grad:
                self.grad = grad_output
            else:
                # This tensor *does not require* a gradient to be computed. This
                # will typically be a tensor holding input data.
                return
            
            # Tanh activation function
    def tanh(self):
        new_data = np.tanh(self.data)
        grad_fn = TanhNode(self)
        return Tensor(new_data, grad_fn=grad_fn)


# A small utility where we simply create a Tensor object. We use this to
# mimic torch.tensor.
def tensor(data, requires_grad=False):
    return Tensor(data, requires_grad)

# We define helper functions to implement the various arithmetic operations.

# This function takes two tensors as input, and returns a new tensor holding
# the result of an element-wise addition on the two input tensors.
def addition(left, right):
    new_data = left.data + right.data
    grad_fn = AdditionNode(left, right)
    
    return Tensor(new_data, grad_fn=grad_fn)

def subtraction(left, right):
    new_data = left.data - right.data
    grad_fn = SubtractionNode(left, right)
    
    return Tensor(new_data, grad_fn=grad_fn)

def exponentiation(left, right):
    # Check if the exponent is an integer and >= 2
    if not isinstance(right, int):
        raise ValueError("Exponent must be an integer")
    if right < 2:
        raise ValueError("Exponent must be >= 2")
    new_data = left.data ** right
    grad_fn = ExponentiationNode(left, right)
    
    return Tensor(new_data, grad_fn=grad_fn)

def matmul(left, right):
    # Check if the shapes of the tensors are compatible for matrix multiplication
    if left.shape[1] != right.shape[0]:
        raise ValueError("Shapes are not compatible for matrix multiplication")
    new_data = left.data @ right.data
    grad_fn = MatrixMultiplicationNode(left, right)
    return Tensor(new_data, grad_fn=grad_fn)

Some sanity checks.

In [7]:
# Two tensors holding row vectors.
x1 = tensor(np.array([[2.0, 3.0]]))
x2 = tensor(np.array([[1.0, 4.0]]))
# A tensors holding a column vector.
w = tensor(np.array([[-1.0], [1.2]]))

# Test the arithmetic operations.
test_plus = x1 + x2
test_minus = x1 - x2
test_power = x2 ** 2
test_matmul = x1 @ w

print(f'Test of addition: {x1.data} + {x2.data} = {test_plus.data}')
print(f'Test of subtraction: {x1.data} - {x2.data} = {test_minus.data}')
print(f'Test of power: {x2.data} ** 2 = {test_power.data}')
print(f'Test of matrix multiplication: {x1.data} @ {w.data} = {test_matmul.data}')

# Check that the results are as expected. Will crash if there is a miscalculation.
assert(np.allclose(test_plus.data, np.array([[3.0, 7.0]])))
assert(np.allclose(test_minus.data, np.array([[1.0, -1.0]])))
assert(np.allclose(test_power.data, np.array([[1.0, 16.0]])))
assert(np.allclose(test_matmul.data, np.array([[1.6]])))

Test of addition: [[2. 3.]] + [[1. 4.]] = [[3. 7.]]
Test of subtraction: [[2. 3.]] - [[1. 4.]] = [[ 1. -1.]]
Test of power: [[1. 4.]] ** 2 = [[ 1. 16.]]
Test of matrix multiplication: [[2. 3.]] @ [[-1. ]
 [ 1.2]] = [[1.6]]


Sanity check for Task 3.

In [8]:
x = tensor(np.array([[2.0, 3.0]]))
w1 = tensor(np.array([[1.0, 4.0]]), requires_grad=True)
w2 = tensor(np.array([[3.0, -1.0]]), requires_grad=True)

test_graph = x + w1 + w2

print('Computational graph top node after x + w1 + w2:', test_graph.grad_fn)

assert(isinstance(test_graph.grad_fn, AdditionNode))
assert(test_graph.grad_fn.right is w2)
assert(test_graph.grad_fn.left.grad_fn.left is x)
assert(test_graph.grad_fn.left.grad_fn.right is w1)

Computational graph top node after x + w1 + w2: <class '__main__.AdditionNode'>


Sanity check for Task 4.

In [9]:
x = tensor(np.array([[2.0, 3.0]]))
w = tensor(np.array([[-1.0], [1.2]]), requires_grad=True)
y = tensor(np.array([[0.2]]))

# We could as well write simply loss = (x @ w - y)**2
# We break it down into steps here if you need to debug.

model_out = x @ w
diff = model_out - y
loss = diff ** 2
loss.backward()

print('Gradient of loss w.r.t. w =\n', w.grad)

assert(np.allclose(w.grad, np.array([[5.6], [8.4]])))
assert(x.grad is None)
assert(y.grad is None)

Gradient of loss w.r.t. w =
 [[5.6]
 [8.4]]


An equivalent cell using PyTorch code. Your implementation should give the same result for `w.grad`.

In [10]:
pt_x = torch.tensor(np.array([[2.0, 3.0]]))
pt_w = torch.tensor(np.array([[-1.0], [1.2]]), requires_grad=True)
pt_y = torch.tensor(np.array([[0.2]]))

pt_model_out = pt_x @ pt_w
pt_model_out.retain_grad()

pt_diff = pt_model_out - pt_y
pt_diff.retain_grad()

pt_loss = pt_diff ** 2
pt_loss.retain_grad()

pt_loss.backward()
pt_w.grad

tensor([[5.6000],
        [8.4000]], dtype=torch.float64)

# Task 5

In [11]:
class Optimizer:
    def __init__(self, params):
        self.params = params

    def zero_grad(self):
        for p in self.params:
            p.grad = np.zeros_like(p.data)

    def step(self):
        # This method does nothing in the base optimizer class.
        pass


class SGD(Optimizer):
    def __init__(self, params, lr):
        super().__init__(params)
        self.lr = lr

    def step(self):
        for param in self.params:

            # Update parameter values based on the gradient and learning rate
            param.data -= self.lr * param.grad
            

In [12]:
# Initialize parameters
np.random.seed(69)
w_init = np.random.normal(size=(2, 1))
b_init = np.random.normal(size=(1, 1))

# Declare parameter tensors
w = tensor(w_init, requires_grad=True)
b = tensor(b_init, requires_grad=True)

eta = 1e-2
opt = SGD([w, b], lr=eta)  # Use your own SGD optimizer

# Training loop
for i in range(10):
    sum_err = 0
    for row in range(X.shape[0]):
        x = tensor(X[[row], :])
        y = tensor(Y[[row]])

        # Forward pass
        y_pred = x @ w + b  # Matrix multiplication for prediction
        err = (y_pred - y) ** 2  # Compute squared error loss

        # Backward pass and update
        err.backward()  # Compute gradients
        opt.step()  # Update parameters
        opt.zero_grad()  # Clear gradients

        # For statistics
        sum_err += err.item()

    mse = sum_err / X.shape[0]
    print(f'Epoch {i+1}: MSE =', mse)

Epoch 1: MSE = 0.3916480863278851
Epoch 2: MSE = 0.016984872842749174
Epoch 3: MSE = 0.009564297806691956
Epoch 4: MSE = 0.009377840166444315
Epoch 5: MSE = 0.009368557944496175
Epoch 6: MSE = 0.009367457445536664
Epoch 7: MSE = 0.009367280149523092
Epoch 8: MSE = 0.009367250474795108
Epoch 9: MSE = 0.009367245535839947
Epoch 10: MSE = 0.00936724472074951


# Task 6

In [13]:
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split

# You may need to edit the path, depending on where you put the files.
a4data = pd.read_csv('raisins.csv')

X = scale(a4data.drop(columns='Class'))
Y = 1.0*(a4data.Class == 'Besni').to_numpy()

np.random.seed(0)
shuffle = np.random.permutation(len(Y))
X = X[shuffle]
Y = Y[shuffle]

Xtrain, Xtest, Ytrain, Ytest = train_test_split(X, Y, random_state=0, test_size=0.2)

In [14]:
Xtrain.shape, Ytrain.shape

((720, 7), (720,))

In [15]:
Xtrain

array([[-0.24935912, -0.30801062, -0.00223273, ..., -0.26753351,
         0.06541223, -0.25074774],
       [-0.49779143, -0.58138353, -0.09820911, ..., -0.44678758,
         0.49097416, -0.31407094],
       [-0.18899551,  0.15973552, -0.44359745, ..., -0.16507106,
        -0.47292075,  0.03800403],
       ...,
       [-0.69660911, -0.91892504, -0.24538342, ..., -0.70204792,
         0.41234054, -0.82021434],
       [ 0.65057947,  0.48147022,  0.85310643, ...,  0.62201236,
         0.74952635,  0.53793425],
       [ 0.24907038,  0.48264127,  0.05568096, ...,  0.22112035,
        -0.19833912,  0.39097442]])

Class for the NN so we cna split up functions easier.

In [16]:
class NeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size, lr=0.01):
        # Initialize weights and biases, inout layer need to match input to next layer etc.
        self.w1 = tensor(np.random.normal(size=(input_size, hidden_size)), requires_grad=True)
        self.b1 = tensor(np.random.normal(size=(1, hidden_size)), requires_grad=True)  # Adjusted shape
        self.w2 = tensor(np.random.normal(size=(hidden_size, output_size)), requires_grad=True)
        self.b2 = tensor(np.random.normal(size=(1, output_size)), requires_grad=True)  # Adjusted shape
        self.optimizer = SGD([self.w1, self.b1, self.w2, self.b2], lr=lr)

    
    def tanh(self, tensor):
        new_data = np.tanh(tensor.data)
        grad_fn = TanhNode(tensor)
        return Tensor(new_data, grad_fn=grad_fn)
    
    def binary_cross_entropy_loss(self, prediction, target):
        
        sig = 1/(1+np.exp(-prediction.data))
        result = -target.data * np.log(sig) - (1-target.data) * np.log(1-sig)
        
        return Tensor(result, grad_fn=BinaryCrossEntropyLossNode(prediction, target))

    
    def sigmoid(self, tensor):
        new_data = 1 / (1 + np.exp(-tensor.data))
        grad_fn = Sigmoid(tensor)
        return Tensor(new_data, grad_fn=grad_fn)
    
    def forward(self, X):
        
        hidden = X @ self.w1 + self.b1
        hidden_activation = self.tanh(hidden)
        output = hidden_activation @ self.w2 + self.b2
        return output

    
    def train(self, X, Y, epochs=1000):
        for epoch in range(epochs):
            for row in range(X.shape[0]):
                x_row = X[row, :] 
                y_true = Y[row]    

                x_row_tensor = Tensor(x_row[np.newaxis, :])  #
                y_true_tensor = Tensor(np.array([[y_true]]))  #

                y_pred = self.forward(x_row_tensor)

                #BCE loss
                loss = self.binary_cross_entropy_loss(y_pred, y_true_tensor)

                #backward
                loss.backward()

                #update
                self.optimizer.step()
                self.optimizer.zero_grad()

            if epoch % 10 == 0:
                print(f'Epoch {epoch}, Loss: {loss.item()}')

    def evaluate(self, X_test, Y_test):
        correct = 0
        total = len(Y_test)
        for i in range(total):
            x_test_row = X_test[i]
            y_test_true = Y_test[i]
            x_test_row_tensor = Tensor(x_test_row[np.newaxis, :])
            y_test_true_tensor = Tensor(np.array([[y_test_true]]))

            y_test_pred = self.forward(x_test_row_tensor).item()
            if (y_test_pred >= 0.5 and y_test_true == 1) or (y_test_pred < 0.5 and y_test_true == 0):
                correct += 1

        accuracy = correct / total
        return accuracy

    def predict(self, X):
        predictions = []
        for row in range(X.shape[0]):
            x_row = X[row, :]
            x_row_tensor = Tensor(x_row[np.newaxis, :])
            y_pred = self.forward(x_row_tensor)
            predictions.append(y_pred.item())
        return np.array(predictions)

In [None]:
input_size = Xtrain.shape[1]
hidden_size = 7
output_size = 1

#init NN
model = NeuralNetwork(input_size, hidden_size, output_size, lr=0.01)

#train model
model.train(Xtrain, Ytrain, epochs=100)

accuracy = model.evaluate(Xtest, Ytest)
print(f"Test Accuracy: {accuracy}")

Epoch 0, Loss: 0.08280642395616747
Epoch 10, Loss: 0.06792364654262412
Epoch 20, Loss: 0.08064875144371783
Epoch 30, Loss: 0.0644046645570351
Epoch 40, Loss: 0.059926449917948266
Epoch 50, Loss: 0.04087231089806514
Epoch 60, Loss: 0.049329726428653325
Epoch 70, Loss: 0.054449015583669734
Epoch 80, Loss: 0.05584741861078787
