# Part One: Deep Learning Architecture

In [1]:
import numpy as np
from typing import List, Tuple
from copy import deepcopy

In [2]:
# Function that check dimension compatibility between ndarrays
def assert_same_shape(array1: np.ndarray, array2: np.ndarray):
    assert array1.shape == array2.shape, 'The two arrays have incompatible shapes: {} and {}'.format(tuple(array1.shape), tuple(array2.shape))
    return None

## Operation classes

In [3]:
# We first make a class for a basic operation, which comprehends both matrix operations (as matmul) and element-wise ones (as sigmoid)
class Operation(object):

  def __init__(self):
    pass
  
  def _output(self) -> np.ndarray: # This method must be defined for each Operation
    raise NotImplementedError()


  def forward(self, input_: np.ndarray):
    self.input_ = input_ # Store input_ in the self.input_ instance variable
    self.output = self._output() # Call the self._output() function

    return self.output


  def _input_grad(self, output_grad: np.ndarray) -> np.ndarray: # This method must be defined for each Operation
    raise NotImplementedError()


  def backward(self, output_grad: np.ndarray) -> np.ndarray:
    assert_same_shape(self.output, output_grad) # Check that shapes match

    self.input_grad = self._input_grad(output_grad) # Call the self._input_grad() function

    assert_same_shape(self.input_, self.input_grad) # Check that shapes match

    return self.input_grad

In [4]:
# We also define a subclass for operations involving parameters (weights and biases)
class ParametersOperation(Operation):

  def __init__(self, param: np.ndarray) -> np.ndarray: # ParameterOperation Method
    super().__init__()
    self.param = param


  def _param_grad(self, output_grad: np.ndarray) -> np.ndarray: # This method must be defined for each ParameterOperation
    raise NotImplementedError()


  def backward(self, output_grad: np.ndarray) -> np.ndarray:
    assert_same_shape(self.output, output_grad) # Check that shapes match

    self.input_grad = self._input_grad(output_grad) # Call the self._input_grad() function
    self.param_grad = self._param_grad(output_grad) # Call the self._param_grad() function

    assert_same_shape(self.input_, self.input_grad) # Check that shapes match
    assert_same_shape(self.param, self.param_grad)

    return self.input_grad 

In [5]:
# Subclass for matrix multiplication between wieghts and input
class WeightMul(ParametersOperation):

  def __init__(self, W: np.ndarray):
    super().__init__(W) # Initialize operation with self.param = W


  def _output(self) -> np.ndarray: # Compute matrix multiplication
    return np.dot(self.input_, self.param) 


  def _input_grad(self, output_grad: np.ndarray) -> np.ndarray: # Compute the gradient of the input
    return np.dot(output_grad, np.transpose(self.param, (1,0)))


  def _param_grad(self, output_grad: np.ndarray) -> np.ndarray: # Compute the gradient of the parameter
    return np.dot(np.transpose(self.input_, (1,0)), output_grad)

In [6]:
# Subclass for adding bias
class BiasAdd(ParametersOperation):

  def __init__(self, b: np.ndarray):
    assert b.shape[0] == 1 # Check if b has the appropriate shape

    super().__init__(b) # Initialize operation with self.param = b


  def _output(self) -> np.ndarray: # Compute sum with bias
    return self.input_ + self.param

  
  def _input_grad(self, output_grad: np.ndarray) -> np.ndarray: # Compute the gradient of the input
    return np.ones_like(self.input_) * output_grad


  def _param_grad(self, output_grad: np.ndarray) -> np.ndarray: # Compute the gradient of the parameter
      output_grad_reshaped = np.sum(output_grad, axis=0).reshape(1, -1)
      param_grad = np.ones_like(self.param)
      return param_grad * output_grad_reshaped

### Activation functions

In [7]:
# Subclass for the sigmoid activation function
class Sigmoid(Operation):
  def __init__(self) -> None:
    super().__init__()


  def _output(self) -> np.ndarray: # Apply sigmoid element-wise
    return 1.0 / (1.0 + np.exp(-1.0 * self.input_))


  def _input_grad(self, output_grad: np.ndarray) -> np.ndarray: # Compute the gradient of the input
    sigmoid_backward = self.output * (1.0 - self.output)
    input_grad = sigmoid_backward * output_grad
    return input_grad

In [8]:
# Subclass for the Linear activation function (it does nothing)
class Linear(Operation):
    def __init__(self) -> None:        
        super().__init__()

    def _output(self) -> np.ndarray:
        return self.input_

    def _input_grad(self, output_grad: np.ndarray) -> np.ndarray:
        return output_grad

In [9]:
# Subclass for the Rectified Linear Unit activation function
class ReLU(Operation):
  def __init__(self) -> None:
    super().__init__()


  def _output(self) -> np.ndarray: # Apply ReLU element-wise
    return np.clip(self.input_, 0, None)


  def _input_grad(self, output_grad: np.ndarray) -> np.ndarray: # Compute the gradient of the input
    non_neg = self.output >= 0
    return output_grad * non_neg

In [10]:
# Subclass for the Hyperbolic Tangent activation function
class Tanh(Operation):
  def __init__(self) -> None:
    super().__init__()


  def _output(self) -> np.ndarray: # Apply tanh element-wise
    return np.tanh(self.input_)


  def _input_grad(self, output_grad: np.ndarray) -> np.ndarray: # Compute the gradient of the input
    return output_grad * (1 - self.output * self.output)

## Layer and Dense classes

In [11]:
# We define a class for a single layer of our Neural Network
class Layer(object):
  def __init__(self, neurons: int): 
    self.neurons = neurons # The number of units in the layer
    self.first = True # Check if the layer is the first one
    self.params: List[np.ndarray] = [] # List of parameters
    self.param_grads: List[np.ndarray] = [] # List of parameter gradients
    self.operations: List[Operation] = [] # List of operations


  def _setup_layer(self, num_in: int) -> None: # This method must be defined for each Layer
    raise NotImplementedError()


  def _params(self) -> np.ndarray:
    self.params = [] # Empty params

    # Extract the _params from layer's operations with parameters
    for operation in self.operations:
      if issubclass(operation.__class__, ParametersOperation):
        self.params.append(operation.param)


  def forward(self, input_: np.ndarray) -> np.ndarray:
    # Set up the first layer
    if self.first:
      self._setup_layer(input_)
      self.first = False

    self.input_ = input_ # Store input_ variable

    # Perform the operations
    for operation in self.operations:
      input_ = operation.forward(input_)

    self.output = input_ # Store input_ variable after the operations

    return self.output


  def _param_grads(self) -> np.ndarray:
    self.param_grads = [] # Empty param_grads

    # Extract the _param_grads from layer's operations with parameters
    for operation in self.operations:
      if issubclass(operation.__class__, ParametersOperation):
        self.param_grads.append(operation.param_grad)


  def backward(self, output_grad: np.ndarray) -> np.ndarray:
    assert_same_shape(self.output, output_grad) # Check that shapes match

    # Perform operations on gradients
    for operation in reversed(self.operations):
      output_grad = operation.backward(output_grad)

    input_grad = output_grad # Store ouput grad after operations

    self._param_grads() # Use _param_grads method

    return input_grad

In [12]:
# Subclass of a dense layer, i.e. a layer which is full connected
class Dense(Layer):
  def __init__(self, neurons: int, activation: Operation = Sigmoid()) -> None:
    super().__init__(neurons)
    self.activation = activation # Initialize the activation function (default: Sigmoid)


  def _setup_layer(self, input_: np.ndarray) -> None:
    # Use the seed of the Neural Network (if present)
    if self.seed:
      np.random.seed(self.seed)

    self.params = [] # Empty params

    self.params.append(np.random.randn(input_.shape[1], self.neurons)) # Initialize weights
    self.params.append(np.random.randn(1,self.neurons)) # Initialize bias

    self.operations = [WeightMul(self.params[0]), BiasAdd(self.params[1]), self.activation] # Operations of the Dense Layer

    return None

## Loss functions

In [13]:
# We make a class for the loss function
class Loss(object):
  def __init__(self) -> None:
    pass


  def _output(self) -> float: # This method must be defined for each Loss
    raise NotImplementedError()


  def forward(self, prediction: np.ndarray, target: np.ndarray) -> float:
    assert_same_shape(prediction, target) # Check that shapes match

    self.prediction = prediction # Store prediction
    self.target = target # Store target

    loss_value = self._output() # Compute the loss value

    return loss_value


  def _input_grad(self) -> np.ndarray: # This method must be defined for each Loss
    raise NotImplementedError()


  def backward(self) -> np.ndarray:
    self.input_grad = self._input_grad() # Compute the gradient of the loss wrt the input

    assert_same_shape(self.prediction, self.input_grad) # Check that shapes match

    return self.input_grad

In [14]:
# Subclass for Mean Squared Error loss function
class MeanSquaredError(Loss):
  def __init__(self) -> None:
    super().__init__()


  # Compute MSE per-example
  def _output(self) -> float:
    loss = np.sum(np.power(self.prediction - self.target, 2)) / self.prediction.shape[0]

    return loss


  # Compute the loss gradient
  def _input_grad(self) -> np.ndarray:
    return 2.0 * (self.prediction - self.target) / self.prediction.shape[0]

## Neural Network

In [16]:
# We make a class for Neural Networs
class NeuralNetwork(object):
  def __init__(self, layers: List[Layer], loss: Loss, seed: float = 42) -> None:
    self.layers = layers # Initialize number of layers
    self.loss = loss # Initialize loss function
    self.seed = seed # Initialize seed (optional, default: 1)
    if seed:
      for layer in self.layers:
        setattr(layer, "seed", self.seed)

  
  # Go forward through layers
  def forward(self, x_batch: np.ndarray) -> np.ndarray:
    x_out = x_batch
    for layer in self.layers:
      x_out = layer.forward(x_out)

    return x_out


  # Go ackward through layers
  def backward(self, loss_grad: np.ndarray) -> None:
    grad = loss_grad
    for layer in reversed(self.layers):
      grad = layer.backward(grad)

    return None


  # Go forward through layers, compute loss and go backward through layers
  def train_batch(self, x_batch: np.ndarray, y_batch: np.ndarray) -> float:
    predictions = self.forward(x_batch)

    loss = self.loss.forward(predictions, y_batch)

    self.backward(self.loss.backward())

    return loss

  
  # Parameters for the NN
  def params(self):
    for layer in self.layers:
      yield from layer.params
    

  # Loss gradients wrt parameters for the NN
  def param_grads(self):
    for layer in self.layers:
      yield from layer.param_grads

## Optimizer classes

In [17]:
# We make a class for an optimizer to update paramters of a NN
class Optimizer(object):
  def __init__(self, learning_rate: float = 0.01):
    self.learning_rate = learning_rate # Initialize the learning rate (default: 0.01)


  def step(self) -> None:
    pass # This method must be defined for each Optimizer

In [18]:
# Stochastic gradient descent subclass
class SGD(Optimizer):
  def __init__(self, learning_rate: float = 0.01) -> None:
    super().__init__(learning_rate)


  def step(self):
    for (param, param_grad) in zip(self.net.params(), self.net.param_grads()): #self.net comes from Trainer class
      param -= self.learning_rate * param_grad

## Trainer class

In [19]:
# Helper function to shuffle dataset examples
def permute_data(X, y):
    perm = np.random.permutation(X.shape[0])
    return X[perm], y[perm]

In [20]:
# We make a Trainer class to put together NeuralNetwork and Optimizer
class Trainer(object):
  def __init__(self, net: NeuralNetwork, optim: Optimizer):
    self.net = net # Initialize a neural network
    self.optim = optim # Initialize an optimizer
    setattr(self.optim, 'net', self.net) # Set net as an attribute of optim

    self.best_loss = 1e9


  # Generate batches
  def generate_batches(self, X: np.ndarray, y: np.ndarray, size: int = 32) -> Tuple[np.ndarray]:

        assert X.shape[0] == y.shape[0], "Features and target must have the same number of rows, instead features has {} and target has {}".format(X.shape[0], y.shape[0])

        N = X.shape[0]

        for j in range(0, N, size):
            X_batch, y_batch = X[j:j+size], y[j:j+size]

            yield X_batch, y_batch
  

  # Fit the NN on training data for a number of epochs and evaluate it every 'eval_every' epochs
  def fit(self, X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, y_test: np.ndarray,
          epochs: int = 100, eval_every: int = 10, batch_size: int = 32, seed: int = 42, restart: bool = True) -> None:

    np.random.seed(seed) # Set the seed

    if restart:
      for layer in self.net.layers:
        layer.first = True

      self.best_loss = 1e9

    for t in range(epochs):
      # Early stopping
      if (t+1) % eval_every == 0:
        last_model = deepcopy(self.net)

      X_train, y_train = permute_data(X_train,y_train) # Shuffle data

      batch_generator = self.generate_batches(X_train, y_train, batch_size)

      for j, (X_batch, y_batch) in enumerate(batch_generator):
        self.net.train_batch(X_batch, y_batch)
        self.optim.step()

      if (t+1) % eval_every == 0:
        test_preds = self.net.forward(X_test)
        loss = self.net.loss.forward(test_preds, y_test)
        
        if loss < self.best_loss:
          print(f"Validation loss after {t+1} epochs is {loss:.3f}.")
          self.best_loss = loss

        else:
          print(f"Loss increased after epoch {t+1}, the final loss was {self.best_loss:.3f}, using the model from epoch {t+1-eval_every}")
          self.net = last_model
          
          setattr(self.optim, 'net', self.net) # Ensure self.optim still update self.net
          
          break

# Part Two: Application to Data

In [21]:
# Some metrics functions

# Maen Absolute Error
def mae(y_true: np.ndarray, y_pred: np.ndarray):    
    return np.mean(np.abs(y_true - y_pred))

# Root Mean Squared Error
def rmse(y_true: np.ndarray, y_pred: np.ndarray):
    return np.sqrt(np.mean(np.power(y_true - y_pred, 2)))

# Compute MAE and RMSE for a NN
def eval_regression_model(model: NeuralNetwork, X_test: np.ndarray, y_test: np.ndarray):
    preds = model.forward(X_test)
    preds = preds.reshape(-1, 1)
    print("Mean absolute error: {:.2f}".format(mae(preds, y_test)))
    print("\nRoot mean squared error {:.2f}".format(rmse(preds, y_test)))

## Three Neural Networks

In [22]:
# 1 layer nn (aka linear regression)
linear_regression = NeuralNetwork(
    layers = [Dense(neurons = 1, activation = Linear())],
    loss = MeanSquaredError(),
    seed = 42
)

# 2 layer nn
nn = NeuralNetwork(
    layers = [Dense(neurons = 13, activation = Sigmoid()),
              Dense(neurons = 1, activation = Linear())],
    loss = MeanSquaredError(),
    seed = 42
)

# 3 layer nn
deep_nn = NeuralNetwork(
    layers = [Dense(neurons = 13, activation = Sigmoid()),
              Dense(13, activation = Sigmoid()),
              Dense(1, activation = Linear())],
    loss = MeanSquaredError(),
    seed = 42
)

## Import Data

In [23]:
from sklearn.datasets import load_boston

boston = load_boston()
data = boston.data
target = boston.target
features = boston.feature_names


    The Boston housing prices dataset has an ethical problem. You can refer to
    the documentation of this function for further details.

    The scikit-learn maintainers therefore strongly discourage the use of this
    dataset unless the purpose of the code is to study and educate about
    ethical issues in data science and machine learning.

    In this special case, you can fetch the dataset from the original
    source::

        import pandas as pd
        import numpy as np


        data_url = "http://lib.stat.cmu.edu/datasets/boston"
        raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)
        data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
        target = raw_df.values[1::2, 2]

    Alternative datasets include the California housing dataset (i.e.
    :func:`~sklearn.datasets.fetch_california_housing`) and the Ames housing
    dataset. You can load the datasets as follows::

        from sklearn.datasets import fetch_california_h

In [24]:
# Scale data
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
data = scaler.fit_transform(data)

In [25]:
# Function to turn 1D arrays into 2D arrays 
def to_2d_np(a: np.ndarray, type: str = "col") -> np.ndarray:

    assert a.ndim == 1, "Input tensors must be 1 dimensional"
    
    if type == "col":        
        return a.reshape(-1, 1)
    elif type == "row":
        return a.reshape(1, -1)

In [26]:
# Split train and test and make targets 2D arrays
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.3, random_state=80718)

y_train, y_test = to_2d_np(y_train), to_2d_np(y_test)

## Train the three NNs

In [27]:
# Train linear regression
trainer = Trainer(linear_regression, SGD(learning_rate = 0.01))

trainer.fit(X_train, y_train, X_test, y_test, epochs = 50, eval_every = 10, seed = 42);
print()
eval_regression_model(linear_regression, X_test, y_test)

Validation loss after 10 epochs is 37.226.
Validation loss after 20 epochs is 29.312.
Validation loss after 30 epochs is 26.731.
Validation loss after 40 epochs is 26.571.
Loss increased after epoch 50, the final loss was 26.571, using the model from epoch 40

Mean absolute error: 3.68

Root mean squared error 5.20


In [28]:
# Train 2 layers nn
trainer = Trainer(nn, SGD(learning_rate = 0.01))

trainer.fit(X_train, y_train, X_test, y_test, epochs = 50, eval_every = 10, seed = 42);
print()
eval_regression_model(nn, X_test, y_test)

Validation loss after 10 epochs is 35.272.
Validation loss after 20 epochs is 20.043.
Validation loss after 30 epochs is 17.688.
Validation loss after 40 epochs is 15.706.
Validation loss after 50 epochs is 14.666.

Mean absolute error: 2.53

Root mean squared error 3.83


In [29]:
# Train 3 layers nn
trainer = Trainer(deep_nn, SGD(learning_rate = 0.01))

trainer.fit(X_train, y_train, X_test, y_test, epochs = 50, eval_every = 10, seed = 42);
print()
eval_regression_model(deep_nn, X_test, y_test)

Validation loss after 10 epochs is 89.403.
Validation loss after 20 epochs is 19.193.
Validation loss after 30 epochs is 16.681.
Validation loss after 40 epochs is 14.424.
Validation loss after 50 epochs is 12.989.

Mean absolute error: 2.37

Root mean squared error 3.60
