## WARNING
I tried to implement from scratch LeCun's neural network. Unfortunately, **I wasn't able to do it precisely** (I suspect the gradient calculation in the first *H1* layer might not be entirely properly implemented). Thus, I moved to PyTorch (in the other notebook) and used the framework to recreate a network, similar (but not 100% identical) to LeCun's one.

- - - 

## DEPRECATED

NB: You need the latest version of **dmlearn** in order to run current notebook:

```pip install dmlearn```

In [1]:
import io
import numpy as np
import pandas as pd
import PIL.Image as Image
from dmlearn.signal import convolve_tensor, correlate_tensor

## 1. Data

### 1.1 Load from HF
[USPS](https://huggingface.co/datasets/flwrlabs/usps) is a digit dataset scanned from envelopes by the U.S. Postal Service containing a total of $9,298$ samples of handwritten digits:
* $7,291$ digits are used for training
* $2,007$ digits are used for testing
* each image is $16$ x $16$ pixels grayscale (not binary)
* the images are within $[0,255]$ range, but we will normalize it to $[-1,1]$
* the images are centered
* they show a broad range of font styles

One important feature of these images is that both the training set and the testing set contain numerous examples that are ambiguous, unclassifiable, or even misclassified.

In [2]:
splits = {'train': 'data/train-00000-of-00001.parquet', 'test': 'data/test-00000-of-00001.parquet'}
train_df = pd.read_parquet("hf://datasets/flwrlabs/usps/" + splits["train"])
test_df = pd.read_parquet("hf://datasets/flwrlabs/usps/" + splits["test"])
train_df.head()

Unnamed: 0,image,label
0,{'bytes': b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHD...,6
1,{'bytes': b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHD...,5
2,"{'bytes': b""\x89PNG\r\n\x1a\n\x00\x00\x00\rIHD...",4
3,{'bytes': b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHD...,7
4,{'bytes': b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHD...,3


### 1.2 Convert bytes to integers

In [3]:
def convert_image_bytes_to_int(img: dict) -> np.ndarray:
    # Convert the image bytes to numpy arrays
    image = Image.open(io.BytesIO(img['bytes']))
    return np.array(image)

def convert_image_df_to_numpy(image_df: pd.DataFrame) -> np.ndarray:
    images = image_df['image'].apply(convert_image_bytes_to_int)
    # Convert to numpy array (consider the shape of all images is the same - 16x16)
    train_images = np.zeros((len(images), 1, 16, 16))
    for i, image in enumerate(images):
        train_images[i][0] = image
    return train_images

train_images = convert_image_df_to_numpy(train_df)
train_images = train_images[:100] # For testing purposes, use only 100 of all 7291 training samples

print(f"=== TRAIN SET ===")
print(f"train_images shape: {train_images.shape}")
print(f"train_image_0 shape: {train_images[0][0].shape}")

=== TRAIN SET ===
train_images shape: (100, 1, 16, 16)
train_image_0 shape: (16, 16)


In [4]:
test_images = convert_image_df_to_numpy(test_df)
print(f"=== TEST SET ===")
print(f"test_images shape: {test_images.shape}")
print(f"test_image_0 shape: {test_images[0][0].shape}")

=== TEST SET ===
test_images shape: (2007, 1, 16, 16)
test_image_0 shape: (16, 16)


## 2. Preprocess

### 2.1 Normalize data [-1, 1]

In order to prevent exploding gradients and slower computation we will scale the data to smaller numbers. 

In the original paper the data was scaled to the range of $[-1,1]$. We will do the same.

In [5]:
# Scale data [-1, 1]. We first scale the data to the range of [0, 2] and then shift it to the range of [-1, 1].
train_images = train_images / 127.5 - 1
test_images = test_images / 127.5 - 1

### 2.2 One-hot encode the labels

In [6]:
### Convert labels to one-hot encoding
def convert_labels_to_one_hot(df: pd.DataFrame) -> np.ndarray:
    labels = df['label'].values
    labels = np.eye(10)[labels]
    return labels

train_labels = convert_labels_to_one_hot(train_df)
# Transform each label from horizontal to vertical
train_labels = train_labels.reshape(train_labels.shape[0], 10, 1)
print(f"=== TRAIN SET ===")
print(f"train_labels.shape: {train_labels.shape}")
print(f"train_labels[0]: {train_labels[0]}")

=== TRAIN SET ===
train_labels.shape: (7291, 10, 1)
train_labels[0]: [[0.]
 [0.]
 [0.]
 [0.]
 [0.]
 [0.]
 [1.]
 [0.]
 [0.]
 [0.]]


In [7]:
test_labels = convert_labels_to_one_hot(test_df)
print(f"=== TEST SET ===")
print(f"test_labels.shape: {test_labels.shape}")
print(f"test_labels[0]: {test_labels[0]}")

=== TEST SET ===
test_labels.shape: (2007, 10)
test_labels[0]: [0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]


## 3. Convolution operation
We need to implement an operation that convolves two 3D matrices. We can't directly use the `scipy.signal` library since, it doesn't fully support **stride**, **padding** and **padding_fill_value** configuration.

Thus, I have developed a private library that provides full flexibility in regard to these parameters.

To install the library run:

```
pip install dmlearn
```


In [8]:
# Test the function with a 3D matrix and a 2D kernel
tensord_3d = np.array([[[1, 2, 2, 3, 3],
                    [4, 5, 5, 6, 6],
                    [7, 8, 8, 9, 9],
                    [10, 11, 11, 12, 12],
                    [13, 14, 14, 15, 15]],
                   [[1, 2, 2, 3, 3],
                    [4, 5, 5, 6, 6],
                    [7, 8, 8, 9, 9],
                    [10, 11, 11, 12, 12],
                    [13, 14, 14, 15, 15]]])

# The filter (kernel) to convolve the tensor with
kernel = np.array([[0, 0, 0],
                   [0, 1, 0],
                   [0, 0, 0]])

# Convolution parameters
stride = 2
padding = 1
padding_value = -1

# Perform the convolution
conv_result = convolve_tensor(tensord_3d, kernel, step_size=stride, padding=padding, fill_value=padding_value)
print(f"Convolution result:\n\n{conv_result}")

Convolution result:

[[[-1 -1 -1]
  [-1 -1 -1]
  [-1 -1 -1]]

 [[ 1  2  3]
  [ 7  8  9]
  [13 14 15]]

 [[ 1  2  3]
  [ 7  8  9]
  [13 14 15]]

 [[-1 -1 -1]
  [-1 -1 -1]
  [-1 -1 -1]]]


## 4. Neural Net Design

### 4.1 Random weights
Weights are initialized with random values within $U[-2.4/F, 2.4/F]$ range, where $F$ is the fan-in (number of inputs, connected to a neuron or a layer). 

**Reasoning**: *"tends to keep total inputs in operating range of sigmoid"*


In [9]:
def random_weights(shape: tuple, fan_in: int) -> np.ndarray:
    min_val = -np.sqrt(2.4 / fan_in)
    max_val = np.sqrt(2.4 / fan_in)
    return np.random.uniform(min_val, max_val, shape)


### 4.2 Activation function - LiSHT
Basically, this function computes linearly scaled hyperbolic tangent:

$$\text{lisht}(x) = x * \text{tanh}(x)$$

#### Why Tanh

In this paper LeCun's team applied **scaled hyperbolic tangent** (i.e. tanh) function to the output of each layer in the neural net. So the question is why this function? Why not simple `sigmoid` which is basically the same function, but within the $[0,1]$ range (`tanh`'s range is $[-1, 1]$).

The only reason I can come up with is that `tanh` has steeper gradients than sigmoid (due to the bigger range), which means faster learning. Of course, we can achieve faster learning with higher `learning rate`. However, latter would increase the risk of divergence.

Read more [HERE >>>](https://stats.stackexchange.com/questions/330559/why-is-tanh-almost-always-better-than-sigmoid-as-an-activation-function) .

LeCun himself wrote following text in another paper:
* *Symmetric functions of that kind are believed to yield **faster convergence**, although the learning can be extremely slow if some weights are too small (LeCun 1987).*

In [10]:
def lisht(x: np.ndarray) -> np.ndarray:
    return x * np.tanh(x)

def lisht_derivative(x: np.ndarray) -> np.ndarray:
    return np.tanh(x) + x * (1 - np.tanh(x)**2)

### 4.3 Cost function - MSE
The output cost function was the mean squared error. 

In [11]:
def loss_mse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """
    Mean Squared Error

    Args:
        y_true: np.ndarray - the true labels - one-hot encoded (e.g. [0, 0, 0, 1, 0, 0, 0, 0, 0, 0])
        y_pred: np.ndarray - the predicted labels - one-hot encoded (e.g. [-0.05, -0.5, 0.15, 0.9, 0.25, 0.35, -0.6, 0.1, -0.7, 0])
    Returns:
        float - the mean squared error
    """
    return np.mean((y_true - y_pred)**2)

def loss_mse_derivative(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
    return 2 * (y_pred - y_true)

### 4.4 Stochastic gradient descent
In this paper SGD is chosen over Batch GD with following argument:
* *The weights were updated according to the so-called stochastic gradient or "on-line" procedure (updating after each presentation of a single pattern) as opposed to the "true" gradient procedure (averaging over the whole training set before updating the weights). From empirical study (supported by theoretical arguments), the stochastic gradient was found to converge much faster than the true gradient, especially on large, redundant data bases. It also finds solutions that are
more robust.*

### 4.5 Neural Net Design

<center><img src="img/lecun_zip_code_nn.png" alt="Neural Network Architecture" width="921" height="468" /></center>
<p style="text-align: center; font-size: small;"><i><b>Figure 1.</b> 1989 LeCun ConvNet per description in the paper</i></p>

In [12]:
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    def forward(self, input: np.ndarray) -> np.ndarray:
        pass

    def backward(self, output_gradient: np.ndarray, learning_rate: float) -> np.ndarray:
        pass

In [13]:
class Convolutional(Layer):
    def __init__(self):
        self.stride = 2
        self.padding_value = -1         # In this paper, the padding value is -1 (the min value in the dataset)
        self.num_of_kernels = 12        # In all layers, the number of kernels is 12

In [14]:
class H1(Convolutional):
    def __init__(self):
        super().__init__()
        self.input_shape = (1, 16, 16)                      # In first layer the input is basically a 2D 16x16 image.
        self.padding = 2
        self.kernel_shape = (self.num_of_kernels, 5, 5)
        self.output_shape = (self.num_of_kernels, 8, 8)     # In this paper, the output is always 1/2 of the input

        # Generate random weights and biases
        # TODO: use Xavier initialization
        fan_in = self.input_shape[1] * self.input_shape[2]
        self.kernels = random_weights(self.kernel_shape, fan_in)  # (12, 5, 5) - 12 kernels, each with 25 weights (300 params in total)
        self.biases = random_weights(self.output_shape, fan_in)   # (12, 8, 8) - each unit has its own bias (768 params in total)

    def forward(self, input: np.ndarray) -> np.ndarray:
        self.input = input                  # 1 x 16 x 16
        self.output = np.copy(self.biases)  # 12 x 8 x 8
        self.logits = np.zeros((self.num_of_kernels, self.output_shape[1], self.output_shape[2]))

        # For each kernel (we have 12 of them in this layer)
        for i in range(self.num_of_kernels): 
            # For each depth of the input (in this layer it's 1)
            # The formula is:
            # - output[i] = input[j] * kernels[i]
            # where:
            # - output[i] is the output of the i-th kernel
            # - input is basically the input image (16 x 16). We ignore the first dimension, since the image it's 1 (black and white image).
            # - kernels[i] is the i-th kernel
            cur_output = convolve_tensor(self.input[0], self.kernels[i], step_size=self.stride, padding=self.padding, fill_value=self.padding_value)
            # We apply LiSHT (X * tanh(X)) in all units of the layer
            self.logits[i] = cur_output
            self.output[i] = lisht(cur_output)

        # Return the output of the layer
        return self.output

    def backward(self, output_gradient: np.ndarray, learning_rate: float) -> np.ndarray:
        kernels_gradient = np.zeros(self.kernel_shape) # 12 x 4 x 4
        biases_gradient = np.zeros(self.output_shape)  # 12 x 4 x 4
        lisht_gradient = np.zeros((self.output_shape[0], self.output_shape[1], self.output_shape[2])) # 12 x 4 x 4

        # For each output kernel (we have 12 of them in this layer)
        for i in range(self.num_of_kernels):
            # Calculate the gradient of the loss with respect to the input of the j-th feature map
            lisht_gradient[i] = lisht_derivative(self.logits[i]) * output_gradient[i]
            kernels_gradient[i] = correlate_tensor(self.input[0], lisht_gradient[i], step_size=self.stride, padding=0, fill_value=self.padding_value)
            biases_gradient[i] = lisht_gradient[i]
        
        self.kernels -= learning_rate * kernels_gradient
        self.biases -= learning_rate * biases_gradient

In [15]:
# Test the forward pass of H1
h1 = H1()
print(train_images[0].shape)
h1_output = h1.forward(train_images[0])
print(h1_output.shape)

(1, 16, 16)
(12, 8, 8)


In [16]:
class H2(Convolutional):
    def __init__(self):
        super().__init__()
        self.input_shape = (12, 8, 8)                       # In second layer the input is 12 times 8x8 feature maps.
        self.padding = 1
        self.kernel_shape = (self.num_of_kernels, 4, 4)
        self.output_shape = (self.num_of_kernels, 4, 4)     # In this paper, the output is always 1/2 of the input
        
        # Generate random weights and biases
        # TODO: use Xavier initialization
        fan_in = self.input_shape[0] * self.input_shape[1] * self.input_shape[2]
        self.kernels = random_weights(self.kernel_shape, fan_in)  # (12, 4, 4) - 12 kernels, each with 16 weights (192 params in total)
        self.biases = random_weights(self.output_shape, fan_in)   # (12, 4, 4) - each unit has its own bias (192 params in total)

    def forward(self, input: np.ndarray) -> np.ndarray:
        self.input = input                  # 12 x 8 x 8
        self.output = np.copy(self.biases)  # 12 x 4 x 4
        self.logits = np.zeros((self.num_of_kernels, 12, self.output_shape[1], self.output_shape[2]))

        # For each output kernel (we have 12 of them in this layer)
        for i in range(self.num_of_kernels):
            # NB: We pick only 8 out of 12 input feature maps. We want for each `i` the picked 8 input feature maps to be different.
            # We pick the 8 input feature maps by using the `i` index.
            # For example, if `i = 0`, we pick the first 8 input feature maps.
            # If `i = 1`, we pick the second 8 input feature maps.
            # And so on.
            # We do this because we want the picked 8 input feature maps to be different for each kernel.
            # This is important because we want the picked 8 input feature maps to be different for each kernel.
            # In the paper is written: "The eight maps in H1 on which a map in H2 takes its inputs are chosen according a scheme that will not be described here" :)
            feature_maps_range = (np.array([0, 1, 2, 3, 4, 5, 6, 7]) + i) % 12
            # For each of the 8 input feature maps we calculate the convolution with the i-th kernel.
            for j in feature_maps_range:
                # The formula is:
                # - output[i] += input[j] * kernels[i]
                # where:
                # - output[i] is the output of the i-th kernel
                # - input[j] is basically a feature map [8x8]. We ignore the first dimension, since the image it's 1 (black and white image).
                # - kernels[i] is the i-th kernel
                cur_output = convolve_tensor(self.input[j], self.kernels[i], step_size=self.stride, padding=self.padding, fill_value=self.padding_value)
                # We apply LiSHT (X * tanh(X)) in all units of the layer
                self.logits[i, j] = cur_output
                self.output[i] += lisht(cur_output)  # TODO: should we have output[i][j] = cur_output?

        # Return the output of the layer
        # Output = bias + sum(input * kernel)
        return self.output

    def backward(self, output_gradient: np.ndarray, learning_rate: float) -> np.ndarray:
        kernels_gradient = np.zeros(self.kernel_shape) # 12 x 4 x 4
        biases_gradient = np.zeros(self.output_shape)  # 12 x 4 x 4
        input_gradient = np.zeros(self.input_shape)    # 12 x 8 x 8
        lisht_gradient = np.zeros((self.input_shape[0], self.output_shape[0], self.output_shape[1], self.output_shape[2])) # 12 x 12 x 4 x 4

        # For each output kernel (we have 12 of them in this layer)
        for i in range(self.num_of_kernels):
            # Determine the feature maps that were used to calculate the output of the i-th kernel
            feature_maps_range = (np.array([0, 1, 2, 3, 4, 5, 6, 7]) + i) % 12
            dj_dk_i = np.zeros((12, self.output_shape[1], self.output_shape[2])) # 12 x 4 x 4
            input_grads = np.zeros((12, 8, 8))
            for j in feature_maps_range:
                # Calculate the gradient of the loss with respect to the input of the j-th feature map
                lisht_gradient[i][j] = lisht_derivative(self.logits[i, j]) * output_gradient[i]
                dj_dk_i[j] = correlate_tensor(self.input[j], lisht_gradient[i][j], step_size=self.stride, padding=self.padding, fill_value=self.padding_value)
                input_grads[j] = convolve_tensor(lisht_gradient[i][j], self.kernels[i], step_size=self.stride, padding=7, fill_value=self.padding_value)
            kernels_gradient[i] = np.sum(dj_dk_i, axis=0)
            biases_gradient[i] = np.sum(lisht_gradient[i], axis=0)
            input_gradient[i] = np.sum(input_grads, axis=0)
        
        self.kernels -= learning_rate * kernels_gradient
        self.biases -= learning_rate * biases_gradient
        return input_gradient

In [17]:
# Test the forward pass of H2
h2 = H2()
h2_output = h2.forward(h1_output)
print(h2_output.shape)

(12, 4, 4)


In [18]:
class Dense(Layer):
    def __init__(self):
        super().__init__()    

In [19]:
class H3(Dense):
    """
    Dense (fully-connected) layer.
    """
    def __init__(self):
        super().__init__()
        # Initialize random weights and biases
        fan_in = 12 * 4 * 4
        self.weights = random_weights((30, fan_in), fan_in) # 30 units in the output layer, 192*30 parameters in total
        self.bias = random_weights((30, 1), fan_in)

    def forward(self, input: np.ndarray) -> np.ndarray:
        self.input = input  # 12 x 4 x 4
        # Return the standard output of the layer (weights * input + bias)
        self.logits = np.dot(self.weights, self.input.reshape(12*4*4, 1)) + self.bias
        # We apply LiSHT (X * tanh(X)) in all units of the layer
        return lisht(self.logits)

    def backward(self, output_gradient: np.ndarray, learning_rate: float) -> np.ndarray:
        # Firstly, we need to calculate the derivative of the activation function.
        lisht_gradient = lisht_derivative(self.logits) * output_gradient
        # Calculate the weights gradient.
        # NB: We need to reshape the input from (12x4x4) to (192, 1). In other words, we need to flatten the input.
        # (30, 192) = (30, 1) x (1, 192)
        weights_gradient = np.dot(lisht_gradient, self.input.reshape(12*4*4, 1).T)
        # Calculate the input gradient
        input_gradient = np.dot(self.weights.T, lisht_gradient)
        # COnvert (192, 1) to (12, 4, 4)
        input_gradient = input_gradient.reshape(12, 4, 4)
        # Update the weights and bias
        self.weights -= learning_rate * weights_gradient
        self.bias -= learning_rate * lisht_gradient
        return input_gradient  

In [20]:
# Test the forward pass of H2
h3 = H3()
h3_output = h3.forward(h2_output)
print(h3_output.shape)

(30, 1)


In [21]:
class H4(Dense):
    """
    Dense (fully-connected) layer.
    """
    def __init__(self):
        super().__init__()
        # Initialize random weights and biases
        fan_in = 30
        self.weights = random_weights((10, fan_in), fan_in) # 10 units in the output layer, 30*10 parameters in total
        self.bias = random_weights((10, 1), fan_in)

    def forward(self, input: np.ndarray) -> np.ndarray:
        self.input = input  # 30 x 1
        # Return the standard output of the layer (weights * input + bias)
        self.logits = np.dot(self.weights, self.input) + self.bias
        # We apply LiSHT (X * tanh(X)) in all units of the layer
        return lisht(self.logits)
    
    def backward(self, output_gradient: np.ndarray, learning_rate: float) -> np.ndarray:
        # Firstly, we need to calculate the derivative of the activation function.
        lisht_gradient = lisht_derivative(self.logits) * output_gradient
        # Calculate the weights gradient.
        weights_gradient = np.dot(lisht_gradient, self.input.T)
        # Calculate the input gradient
        input_gradient = np.dot(self.weights.T, lisht_gradient)
        # Update the weights and bias
        self.weights -= learning_rate * weights_gradient
        self.bias -= learning_rate * lisht_gradient
        return input_gradient  

In [22]:
# Test the forward pass of H2
h4 = H4()
h4_output = h4.forward(h3_output)
print(h4_output.shape)

(10, 1)


## 5. Train

In [26]:
def train(network, x_train, y_train, epochs = 23, learning_rate = 1.e-5, verbose = True):
    error_history = []
    for e in range(epochs):
        error = 0
        for x, y in zip(x_train, y_train):
            output = x
            # forward
            for layer in network:
                output = layer.forward(output)

            # error
            error += loss_mse(y, output)

            # backward
            grad = loss_mse_derivative(y, output)
            for layer in reversed(network):
                grad = layer.backward(grad, learning_rate)

        error /= len(x_train)
        error_history.append(error)
        if verbose:
            print(f"{e + 1}/{epochs}, error={error}")
    return error_history

network = [H1(), H2(), H3(), H4()]
error_history = train(network, train_images, train_labels)

1/23, error=0.09558457757540668
2/23, error=0.09551593776192796
3/23, error=0.09539353001158467
4/23, error=0.0952071945309306
5/23, error=0.09494942600589401
6/23, error=0.09475451194296466
7/23, error=0.09510345685433641
8/23, error=0.09674857277936397
9/23, error=0.0998480705358504
10/23, error=0.10313045136695857
11/23, error=0.10515618621721469
12/23, error=0.10608429436253167
13/23, error=0.10736460147461813
14/23, error=0.110686745381784
15/23, error=0.11802939773811166
16/23, error=0.13307270140649197
17/23, error=0.16118485431225263
18/23, error=0.20140218516166913
19/23, error=0.24539093375950485
20/23, error=0.27744153934643007
21/23, error=0.3169795618391953
22/23, error=0.34752027708900074
23/23, error=0.3527431798950319
