# CNN Inference with NumPy
In the classic CNN behaviour, we are given an image, the kernel passes over each subset of the image and for each subset it computes the sum of element-wise products. If done with vanilla Python, this procedure results in a slow execution. In PyTorch the problem is solved thanks to the framework given by the module itself. The idea of using numpy consists in transforming the given image in a matrix where each row is composed by one of the subset of the image interested in the convolution with the kernel, then vectorize the kernel and then perform the convolution step by simply operating a dot product between the subset and the kernel. This operation is fast in numpy, which is the Python module responsible for numeric computations. This approach is good but it introduces redundancy in data. A tradeoff between memory and cpu

# provare jax

`PLACEHOLDER`

## Theory

### Kernel Multiplication
Convolving an image of dimensions 800 x 600 x 3 where 3 is the number of channels (one for red, one for green and one for blue) with a kernel of size 2x2x3 outputs an image of dimensions 799x599x1. Kernel's number of channels must match the number of channels of the input image. Therefore if i want, for example, to obtain a color image as the result of a convolution, i would need to convolve the input image with three kernels of three channels. Another way to see this is that the number of kernels i use in the convolution will become the number of channels for the output image 

`X = X[::3,:]` means "Please select the first three rows of matrix X and return them"

### Channels Problem: How to consider multiple channels:
taking the previous matrix as an example, and adding a channel where every number is the corresponding of the first channel but *100 we obtain:
01,02,05,06,101,102,105,106
Which means just append the next channel's corresponding flattened window to the previous one

### Batch Problem: How to consider multiple images:
just add the obtained windows at the end of the ones of the previous image, so instead of having a 5 rows 9 columns matrix of windows the result will be a matrix of 10 rows and 9 columns.

### Final input management
The more images can be stacked in the input, the faster the training will be, therefore the stack dimension must be taken into account. If the desire is to manage stacks of 8, 800x600 resolution color images (numbers were chosen so that they are better recognizable) with a 5x4 kernel, the function to create windows will have this shape:

`X = np.random.rand(8*3*800*600).reshape(8,3,800,600)`

`y = np.lib.stride_tricks.sliding_window_view(X,(1,3,5,4)).reshape(-1,(3*5*4))`

Which translates in:

`y = np.lib.stride_tricks.sliding_window_view(X,(1,CHANNEL_SIZE,KERNEL_WIDTH,KERNEL_HEIGHT))`

`y = y.reshape(-1,(CHANNEL_SIZE*KERNEL_WIDTH*KERNEL_HEIGHT))`

## Trials

In [1]:
import numpy as np
trials = True
def tprint(value):
    print(value) if trials else print()

### Inefficient Max Pooling

In [None]:
def MaxPooling2DIneff(batch_of_images,win_size = 2, stride=2):
    # very inefficient... but it works...
    # it gives the same result as PyTorch's nn.MaxPool2d(kernel_size=2, stride=2)
    bs, nc, iw, ih = batch_of_images.shape # batch of images' number of images, number of channels, single image's width, single images's height
    #print(batch_of_images.shape)
    niw = round(iw/2) if iw % 2 == 0 else round(iw/2)-1
    nih = round(ih/2) if ih % 2 == 0 else round(ih/2)-1
    batch = []
    for bb in range(bs):
        channel=[]
        for cc in range(nc):
            y=[]
            for i in range(0,ih,stride):
                for j in range(0,iw,stride):
                    if (j+win_size)>iw:
                        continue
                    if (i+win_size)>ih:
                        continue
                    y.append(batch_of_images[bb,cc,i:(i+win_size),j:(j+win_size)].max())
            y=np.array(y).reshape(niw,nih)
            channel.append(y)
        batch.append(channel)
    return np.array(batch)
## this is an example
#X = np.arange(1,(6*6*2*2)+1).reshape(2,2,6,6)
#print(X)
#r=MaxPooling2D(X)
#print(r)

### Sliding Window View
To implement the im2col convolution approach, the first thing to do is creating the matrix of windows from the given image. This can be done with the helpful function `np.lib.stride_tricks.sliding_window_view(image,kernel_shape)` which exactly returns what this approach needs. In the next panel the functioning of this function will be done.


In [2]:
X = np.arange(1,(2*2*3*3)+1).reshape(2,2,3,3)
tprint(X)
tprint("desired result")
tprint("[1,2,4,5]")
tprint("[2,3,5,6]")
tprint("...")
tprint("correct one:")
y = np.lib.stride_tricks.sliding_window_view(X,(1,1,2,2))
tprint("wrong: after the first window there is the first window of the channel")
y = np.lib.stride_tricks.sliding_window_view(X,(1,1,2,2))

[[[[ 1  2  3]
   [ 4  5  6]
   [ 7  8  9]]

  [[10 11 12]
   [13 14 15]
   [16 17 18]]]


 [[[19 20 21]
   [22 23 24]
   [25 26 27]]

  [[28 29 30]
   [31 32 33]
   [34 35 36]]]]
desired result
[1,2,4,5]
[2,3,5,6]
...
correct one:
wrong: after the first window there is the first window of the channel


### Back from Pooling


In [43]:
X= np.zeros(4*4).reshape(4,4)
values = np.array([1,2,3,4]).reshape(2,2)
indices=np.array([0,1,3,2]).reshape(2,2)
print(X)
np.add.at(X,(indices,indices),values)
print(X)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
[[1. 0. 0. 0.]
 [0. 2. 0. 0.]
 [0. 0. 4. 0.]
 [0. 0. 0. 3.]]


In [40]:
import numpy as np
X = np.arange(1,17).reshape(4,4)
print(X)
stride = winSize = 2
small_X = np.array([6,5,6,7]).reshape(2,2)
small_dX = np.array([7,3,4,5]).reshape(2,2)
H_out,W_out = small_dX.shape
ind = np.array([3,0,2,1])
back_X = np.zeros(X.shape)
row = ind // winSize # if [3,3,3,3] // 2 returns [1,1,1,1]
col = ind % winSize # if [3,3,3,3] // 2 returns [1,1,1,1]
print("row,col")
print(row,col)
# it means, the first element must be at row one and col one starting from zero.
r_out_idx, c_out_idx = np.indices((H_out, W_out))
print("r_out_idx")
print(r_out_idx)
print("c_out_idx")
print(c_out_idx)

vert_start = r_out_idx * stride # Forma (bs, nc, H_out, W_out)
horiz_start = c_out_idx * stride # Forma (bs, nc, H_out, W_out)
print("vert_start")
print(vert_start)
print("horiz_start")
print(horiz_start)
#    c. Calcola le coordinate assolute finali in dA_prev
#       idx_row_in_window e idx_col_in_window devono essere "broadcastabili" o
#       avere la stessa forma di vert_start e horiz_start se non sono già piatte.
#       Poiché indices_flat era (bs*nc*H_out*W_out,), rimodelliamoli:
idx_row_in_window_reshaped = row.reshape(H_out, W_out)
idx_col_in_window_reshaped = col.reshape(H_out, W_out)
print("idx_row_in_window_reshaped")
print(idx_row_in_window_reshaped)
print("idx_col_in_window_reshaped")
print(idx_col_in_window_reshaped)
abs_row_coords = vert_start + idx_row_in_window_reshaped # Forma (bs, nc, H_out, W_out)
abs_col_coords = horiz_start + idx_col_in_window_reshaped # Forma (bs, nc, H_out, W_out)
print("abs_row_coords")
print(abs_row_coords)
print("abs_col_coords")
print(abs_col_coords)
# 4. Usa np.add.at per sommare i gradienti d_out nelle posizioni calcolate di dA_prev.
#    np.add.at(array, indici, valori_da_aggiungere)
#    Gli 'indici' devono essere una tupla di array di indici per ogni dimensione.
#    Tutti gli array in 'indici' e 'valori_da_aggiungere' devono essere broadcastabili
#    a una forma comune, o essere appiattiti in modo consistente.

#    Creiamo gli indici per np.add.at:
#    Tutti questi array (b_idx, ch_idx, abs_row_coords, abs_col_coords, d_out)
#    hanno già la stessa forma (bs, nc, H_out, W_out), quindi NumPy
#    li gestirà elemento per elemento quando usati come indici e valori.

indices_for_add_at = (
    abs_row_coords,         # Indici per la dimensione altezza di dA_prev
    abs_col_coords          # Indici per la dimensione larghezza di dA_prev
)
print("back_X")
print(back_X)
np.add.at(back_X, indices_for_add_at, small_dX)
print("back_X")
print(back_X)


[[ 1  2  3  4]
 [ 5  6  7  8]
 [ 9 10 11 12]
 [13 14 15 16]]
row,col
[1 0 1 0] [1 0 0 1]
r_out_idx
[[0 0]
 [1 1]]
c_out_idx
[[0 1]
 [0 1]]
vert_start
[[0 0]
 [2 2]]
horiz_start
[[0 2]
 [0 2]]
idx_row_in_window_reshaped
[[1 0]
 [1 0]]
idx_col_in_window_reshaped
[[1 0]
 [0 1]]
abs_row_coords
[[1 0]
 [3 2]]
abs_col_coords
[[1 2]
 [0 3]]
back_X
[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
back_X
[[0. 0. 3. 0.]
 [0. 7. 0. 0.]
 [0. 0. 0. 5.]
 [4. 0. 0. 0.]]


### Striding
The next panel is devoted to explore the striding, achieved using the preceding, function along with the `[:,:,::2,::2]` construct that, respectively, leaves untouched the number of images in the batch and the number of channels, but selects one row every two and one column every two. rows and columns after the sliding window, uniquely identify one window.

In [3]:
X = np.arange(1,16*2+1).reshape(1,2,4,4)
tprint(X)
# basically [::2,::2] selects every two rows and every two columns, where in this case elements are 3 dimensional matrices of 2 by 2 so the 
# selection eliminates the desired elements, resulting in a stride 2 convolution 
y = np.lib.stride_tricks.sliding_window_view(X,(1,2,2,2))[:,:,::2,::2]
#y = np.lib.stride_tricks.as_strided(X,shape=(2,2),strides=[1,2,3,4])
tprint(y.reshape(-1,4))

[[[[ 1  2  3  4]
   [ 5  6  7  8]
   [ 9 10 11 12]
   [13 14 15 16]]

  [[17 18 19 20]
   [21 22 23 24]
   [25 26 27 28]
   [29 30 31 32]]]]
[[ 1  2  5  6]
 [17 18 21 22]
 [ 3  4  7  8]
 [19 20 23 24]
 [ 9 10 13 14]
 [25 26 29 30]
 [11 12 15 16]
 [27 28 31 32]]


## Practice

### Imports

In [7]:
import numpy as np
import struct
import matplotlib.pyplot as plt
from tqdm import tqdm
import math

### Loading images and labels

In [8]:
def load_mnist_images(filename):
    with open(filename, 'rb') as f:
        # Leggi intestazione: magic number, numero immagini, righe, colonne
        magic, num_images, rows, cols = struct.unpack(">IIII", f.read(16))
        # Leggi tutti i pixel e convertili in array numpy
        images = np.frombuffer(f.read(), dtype=np.uint8)
        # Ridimensiona l'array in (num_images, rows, cols)
        images = images.reshape((num_images, rows, cols))
    return images

def load_mnist_labels(filename):
    with open(filename, 'rb') as f:
        magic, num_labels = struct.unpack(">II", f.read(8))
        labels = np.frombuffer(f.read(), dtype=np.uint8)
    return labels

images = load_mnist_images('MNIST/train-images-idx3-ubyte')
labels = load_mnist_labels('MNIST/train-labels-idx1-ubyte')

print(images.shape)  # (60000, 28, 28)
print(labels.shape)  # (60000,)
one_hot_labels = np.zeros(labels.shape[0]*10).reshape((labels.shape[0]),10)
for i in range(len(labels)):
    one_hot_labels[i][labels[i]]=1
labels = one_hot_labels
print(labels.shape) # (60000,10)

(60000, 28, 28)
(60000,)
(60000, 10)


CNN Structure
The goal is achieving over 90% of accuracy with a simple structure, therefore this would be the set of layers:
- Convolutional Layer with 32 2x2 filters and ReLU activation: i: (B x C X 28 x 28), o: (B x 32 x 26 x 26)
- Max Pooling layer with 2x2 filter: i: (B x 32 x 26 x 26), o: (B x 32 x 13 x 13)
- Convolutional Layer with 32 2x2 filters and ReLU activation: i: (B x 13 x 13 x 32), o: (B x 11 x 11 x 64)
- Linear Fully Connected Layer with ReLU activation: i: (B x 7744), o: (B x 250)
- Linear Fully Connected Layer with Softmax activation: i: (B x 250), o: (B x 10)
- Cross-Entropy Loss: -sum(true_probability_distribution*log(predicted_probability_distribution))

where B is the batch_size and C is the number of channels, that for MNIST digits is just one, since they are greyscale images.

The idea is to reduce everything to a matrix-matrix multiplication which is super fast in NumPy, an optimized mathematical module for Python, by taking an image and create a matrix containing for each row the flattened window that would enter the convolution, as an example:
the first 2 flattened windows of this matrix:
01,02,03,04
05,06,07,08
09,10,11,12
13,14,15,16
for a kernel 2x2 of stride 1 are:
01,02,05,06
02,03,06,07



In [16]:
X = np.array([-4,0,4,2]).reshape(2,2)
reluX = np.maximum(0,X)
print(reluX)
BReluX=reluX
BReluX[BReluX>0]=1
print(BReluX)

[[0 0]
 [4 2]]
[[0 0]
 [1 1]]


## CNN - PyTorch

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import time
from tqdm import tqdm # tqdm standard può funzionare qui, ma tqdm.notebook è per jupyter

# # --- 1. Definizione del Modello ---

class SimpleCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleCNN, self).__init__()

        # --- Layer Convoluzionali ---
        # Input: (bs, 1, 28, 28)
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=2, padding=1)
        # Output: (bs, 32, 14, 14) -> floor((28-3+2*1)/2)+1 = 14
        self.relu1 = nn.ReLU()

        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2, padding=1)
        # Output: (bs, 64, 7, 7) -> floor((14-3+2*1)/2)+1 = 7
        self.relu2 = nn.ReLU()

        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2, padding=1)
        # Output: (bs, 128, 4, 4) -> floor((7-3+2*1)/2)+1 = 4
        self.relu3 = nn.ReLU()

        # --- Flattening ---
        self.flatten = nn.Flatten() # Converte (bs, C, H, W) in (bs, C*H*W)

        # Calcolo dimensione flatten: 128 * 4 * 4 = 2048
        fc_input_size = 128 * 4 * 4

        # --- Layer Fully Connected (MLP) ---
        self.fc1 = nn.Linear(in_features=fc_input_size, out_features=250)
        self.relu4 = nn.ReLU()
        self.fc2 = nn.Linear(in_features=250, out_features=num_classes)
        # Nota: nn.CrossEntropyLoss applica Softmax internamente, quindi non è necessario qui.

    def forward(self, x):
        # Blocco convoluzionale 1
        x = self.conv1(x)
        x = self.relu1(x)

        # Blocco convoluzionale 2
        x = self.conv2(x)
        x = self.relu2(x)

        # Blocco convoluzionale 3
        x = self.conv3(x)
        x = self.relu3(x)

        # Flatten
        x = self.flatten(x) # Ora x ha shape (bs, 2048)

        # MLP
        x = self.fc1(x)
        x = self.relu4(x)
        x = self.fc2(x) # Output sono i logits (raw scores)

        return x

# # --- 2. Preparazione Dati (MNIST) ---

# # Trasformazioni per il dataset MNIST
# transform = transforms.Compose([
#     transforms.ToTensor(), # Converte l'immagine in Tensor PyTorch e scala a [0, 1]
#     transforms.Normalize((0.1307,), (0.3081,)) # Normalizzazione standard per MNIST
# ])

# # Scarica e carica il dataset di training
# train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
# # Scarica e carica il dataset di test (o validazione)
# test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# # Crea i DataLoader per gestire i batch
# batch_size = 64 # Puoi aggiustare il batch size
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# # --- 3. Setup Training ---

# # Seleziona il device (GPU se disponibile, altrimenti CPU)
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print(f"Utilizzo del device: {device}")

# # Istanzia il modello e spostalo sul device
# model = SimpleCNN(num_classes=10).to(device)
# print(model) # Stampa la struttura del modello

# # Definisci la funzione di loss
# criterion = nn.CrossEntropyLoss() # Combina LogSoftmax e NLLLoss

# # Definisci l'ottimizzatore
# learning_rate = 0.001
# optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# # Numero di epoche
# num_epochs = 5 # Puoi aumentarlo per un training più lungo

# # --- 4. Ciclo di Training ---

# print("\nInizio Training...")
# for epoch in range(num_epochs):
#     model.train() # Imposta il modello in modalità training
#     running_loss = 0.0
#     start_time = time.time()

#     # Usa tqdm per la barra di progresso
#     progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)

#     for inputs, labels in progress_bar:
#         # Sposta i dati sul device
#         inputs, labels = inputs.to(device), labels.to(device)

#         # Azzera i gradienti dell'ottimizzatore
#         optimizer.zero_grad()

#         # Forward pass
#         outputs = model(inputs) # Ottieni i logits

#         # Calcola la loss
#         loss = criterion(outputs, labels)

#         # Backward pass (calcolo gradienti)
#         loss.backward()

#         # Aggiorna i pesi
#         optimizer.step()

#         # Aggiorna la loss corrente per il logging
#         running_loss += loss.item() * inputs.size(0) # Moltiplica per il batch size per la media corretta

#         # Aggiorna la descrizione della barra di progresso
#         progress_bar.set_postfix(loss=f"{loss.item():.4f}")

#     # Calcola la loss media dell'epoca
#     epoch_loss = running_loss / len(train_loader.dataset)
#     epoch_time = time.time() - start_time

#     print(f"Epoch {epoch+1}/{num_epochs} - Tempo: {epoch_time:.2f}s - Training Loss: {epoch_loss:.4f}")

#     # --- Valutazione sul Test Set (dopo ogni epoca) ---
#     model.eval() # Imposta il modello in modalità valutazione
#     test_loss = 0.0
#     correct = 0
#     total = 0

#     with torch.no_grad(): # Disabilita il calcolo dei gradienti per la valutazione
#         for inputs, labels in test_loader:
#             inputs, labels = inputs.to(device), labels.to(device)
#             outputs = model(inputs) # Logits
#             loss = criterion(outputs, labels)
#             test_loss += loss.item() * inputs.size(0)

#             _, predicted = torch.max(outputs.data, 1) # Ottieni l'indice della classe con probabilità massima
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()

#     avg_test_loss = test_loss / len(test_loader.dataset)
#     accuracy = 100 * correct / total
#     print(f"Epoch {epoch+1}/{num_epochs} - Test Loss: {avg_test_loss:.4f} - Test Accuracy: {accuracy:.2f}%")


# print("\nTraining Completato.")

# # (Opzionale) Salvare il modello addestrato
# torch.save(model.state_dict(), 'simple_cnn_mnist.pth')

In [None]:
# Assumi che 'SimpleCNN' sia la classe del modello definita precedentemente
# e 'model' sia un'istanza addestrata di SimpleCNN.

# Esempio: Carica un modello addestrato (se l'hai salvato)
model = SimpleCNN(num_classes=10)
model.load_state_dict(torch.load('simple_cnn_mnist.pth', map_location=torch.device('cpu'))) # Carica su CPU

# Assicurati che il modello sia in modalità valutazione (non strettamente necessario
# per estrarre i pesi, ma buona pratica)
model.eval()

# --- Estrazione Parametri e Conversione in NumPy ---

# Dizionario per contenere i pesi NumPy
numpy_weights = {}

# Sposta il modello su CPU se è su GPU, prima di chiamare .numpy()
model.to('cpu')

print("Estrazione Pesi e Bias...\n")

# Layer Conv1
# PyTorch weight shape: (out_channels, in_channels, kernel_height, kernel_width)
# NumPy atteso (basato sul codice precedente): (in_channels, out_channels, kernel_width, kernel_height) -> (1, 32, 3, 3)
pyt_k1_w = model.conv1.weight.data.detach().numpy()
# Trasponi: (out, in, kH, kW) -> (in, out, kW, kH)
numpy_weights['k1'] = pyt_k1_w.transpose(1, 0, 3, 2)
# PyTorch bias shape: (out_channels,)
numpy_weights['b_conv1'] = model.conv1.bias.data.detach().numpy() # Shape (32,)
print(f"k1: PyTorch Shape={pyt_k1_w.shape}, NumPy Shape={numpy_weights['k1'].shape}")
print(f"b_conv1: NumPy Shape={numpy_weights['b_conv1'].shape}")

# Layer Conv2
# PyTorch weight shape: (64, 32, 3, 3)
# NumPy atteso: (32, 64, 3, 3)
pyt_k2_w = model.conv2.weight.data.detach().numpy()
numpy_weights['k2'] = pyt_k2_w.transpose(1, 0, 3, 2)
numpy_weights['b_conv2'] = model.conv2.bias.data.detach().numpy() # Shape (64,)
print(f"k2: PyTorch Shape={pyt_k2_w.shape}, NumPy Shape={numpy_weights['k2'].shape}")
print(f"b_conv2: NumPy Shape={numpy_weights['b_conv2'].shape}")

# Layer Conv3
# PyTorch weight shape: (128, 64, 3, 3)
# NumPy atteso: (64, 128, 3, 3)
pyt_k3_w = model.conv3.weight.data.detach().numpy()
numpy_weights['k3'] = pyt_k3_w.transpose(1, 0, 3, 2)
numpy_weights['b_conv3'] = model.conv3.bias.data.detach().numpy() # Shape (128,)
print(f"k3: PyTorch Shape={pyt_k3_w.shape}, NumPy Shape={numpy_weights['k3'].shape}")
print(f"b_conv3: NumPy Shape={numpy_weights['b_conv3'].shape}")

# Layer FC1
# PyTorch weight shape: (out_features, in_features) -> (250, 2048)
# NumPy atteso (per input @ W): (in_features, out_features) -> (2048, 250)
pyt_w1 = model.fc1.weight.data.detach().numpy()
numpy_weights['w1'] = pyt_w1.T # Trasponi
# PyTorch bias shape: (out_features,) -> (250,)
# NumPy atteso (per aggiunta diretta): (1, out_features) -> (1, 250)
pyt_b1 = model.fc1.bias.data.detach().numpy()
numpy_weights['b1'] = pyt_b1.reshape(1, -1) # Rendi (1, 250)
print(f"w1: PyTorch Shape={pyt_w1.shape}, NumPy Shape={numpy_weights['w1'].shape}")
print(f"b1: PyTorch Shape={pyt_b1.shape}, NumPy Shape={numpy_weights['b1'].shape}")

# Layer FC2
# PyTorch weight shape: (num_classes, 250) -> (10, 250)
# NumPy atteso: (250, num_classes) -> (250, 10)
pyt_w2 = model.fc2.weight.data.detach().numpy()
numpy_weights['w2'] = pyt_w2.T # Trasponi
# PyTorch bias shape: (num_classes,) -> (10,)
# NumPy atteso: (1, num_classes) -> (1, 10)
pyt_b2 = model.fc2.bias.data.detach().numpy()
numpy_weights['b2'] = pyt_b2.reshape(1, -1) # Rendi (1, 10)
print(f"w2: PyTorch Shape={pyt_w2.shape}, NumPy Shape={numpy_weights['w2'].shape}")
print(f"b2: PyTorch Shape={pyt_b2.shape}, NumPy Shape={numpy_weights['b2'].shape}")

print("\nEstrazione completata. I pesi NumPy sono nel dizionario 'numpy_weights'.")

# Ora puoi usare numpy_weights['k1'], numpy_weights['k2'], ecc. nella tua
# implementazione NumPy per l'inferenza.

# Esempio di accesso:
np_k1 = numpy_weights['k1']
np_k2 = numpy_weights['k2']
np_k3 = numpy_weights['k3']
np_w1 = numpy_weights['w1']
np_b1 = numpy_weights['b1']
np_w2 = numpy_weights['w2']
np_b2 = numpy_weights['b2']


# !!! ATTENZIONE AI BIAS CONVOLUZIONALI !!!
# Ho estratto anche i bias dei layer convoluzionali (b_conv1, b_conv2, b_conv3).
# La tua implementazione NumPy originale di ReLU_ConvolutionS NON includeva
# l'aggiunta del bias dopo la somma pesata.
# Se vuoi usare questi bias, dovrai modificare la tua funzione NumPy
# ReLU_ConvolutionS per aggiungere il bias corrispondente a ogni canale di output
# PRIMA di applicare la ReLU. Esempio (pseudo-codice all'interno di ReLU_ConvolutionS):
# ... dopo aver calcolato current_sum per la posizione (i_bs, i_kc, i_nih, i_niw) ...
# current_sum_with_bias = current_sum + bias_conv[i_kc] # bias_conv sarebbero b_conv1/2/3
# ni[i_bs, i_kc, i_nih, i_niw] = np.maximum(0, current_sum_with_bias)

Estrazione Pesi e Bias...

k1: PyTorch Shape=(32, 1, 3, 3), NumPy Shape=(1, 32, 3, 3)
b_conv1: NumPy Shape=(32,)
k2: PyTorch Shape=(64, 32, 3, 3), NumPy Shape=(32, 64, 3, 3)
b_conv2: NumPy Shape=(64,)
k3: PyTorch Shape=(128, 64, 3, 3), NumPy Shape=(64, 128, 3, 3)
b_conv3: NumPy Shape=(128,)
w1: PyTorch Shape=(250, 2048), NumPy Shape=(2048, 250)
b1: PyTorch Shape=(250,), NumPy Shape=(1, 250)
w2: PyTorch Shape=(10, 250), NumPy Shape=(250, 10)
b2: PyTorch Shape=(10,), NumPy Shape=(1, 10)

Estrazione completata. I pesi NumPy sono nel dizionario 'numpy_weights'.


In [67]:
numpy_weights["b1"].shape

(1, 250)

## CNN - Slow Implementation

This implementation utilizes four nested loops to compute the convolutions. Each one represents one dimension: batch, channels, width and height

### Convolution Layer

#### With Bias

In [None]:
import numpy as np

def ReLU_ConvolutionS(boi, k, b_conv, p=0, s=1):
    """
    Calcola la convoluzione 2D + Bias + ReLU con cicli for.

    Args:
        boi (np.ndarray): Batch di immagini input (bs, nc, ih, iw).
        k (np.ndarray):   Kernel (ac, kc, kw, kh). Assumendo ac=nc.
        b_conv (np.ndarray): Bias per i canali di output. Shape (kc,).
        p (int):           Padding.
        s (int):           Stride.

    Returns:
        np.ndarray: Output attivato (bs, kc, nih, niw).
    """
    ac, kc, kw, kh = k.shape
    bs, nc, ih, iw = boi.shape

    if b_conv.shape[0] != kc:
        raise ValueError(f"La dimensione del bias ({b_conv.shape[0]}) non corrisponde al numero di canali di output ({kc})")
    if ac != nc:
         raise ValueError(f"Il numero di canali del kernel ({ac}) deve corrispondere al numero di canali dell'immagine ({nc})")

    # Usa int() per le dimensioni output
    nih = int(((ih - kh + (2 * p)) / s) + 1) # new image height
    niw = int(((iw - kw + (2 * p)) / s) + 1) # new image width
    ni = np.zeros((bs, kc, nih, niw)) # new image

    for i_bs in range(bs):
        for i_kc in range(kc):
            # Ottieni il bias per questo canale di output
            current_bias = b_conv[i_kc]
            for i_nih in range(nih):
                for i_niw in range(niw):
                    current_sum = 0.0 # Somma solo convoluzione
                    # Cicli per la convoluzione
                    for i_nc in range(nc):
                        for i_kh in range(kh):
                            input_y = (i_nih * s) - p + i_kh
                            for i_kw in range(kw):
                                input_x = (i_niw * s) - p + i_kw
                                # Controlla limiti per padding implicito
                                if 0 <= input_y < ih and 0 <= input_x < iw:
                                    input_val = boi[i_bs, i_nc, input_y, input_x]
                                    kernel_val = k[i_nc, i_kc, i_kw, i_kh]
                                    current_sum += input_val * kernel_val

                    # --- Aggiungi Bias e Applica ReLU ---
                    activation_input = current_sum + current_bias
                    ni[i_bs, i_kc, i_nih, i_niw] = np.maximum(0, activation_input)

    return ni

# ----------------------------------------------------------

def ReLU_ConvolutionS_backward(d_ni, boi, k, b_conv, ni_forward, p=0, s=1):
    """
    Calcola la backpropagation per ReLU_ConvolutionS (con Bias).

    Args:
        d_ni (np.ndarray): Gradiente Loss rispetto output (bs, kc, nih, niw).
        boi (np.ndarray):  Input batch originale (bs, nc, ih, iw).
        k (np.ndarray):    Kernel originali (ac, kc, kw, kh).
        b_conv (np.ndarray): Bias originali. Shape (kc,).
        ni_forward (np.ndarray): Output originale forward pass (bs, kc, nih, niw).
        p (int):           Padding.
        s (int):           Stride.

    Returns:
        tuple[np.ndarray, np.ndarray, np.ndarray]: Una tupla contenente:
            - d_boi (np.ndarray): Gradiente Loss rispetto input 'boi'.
            - d_k (np.ndarray):   Gradiente Loss rispetto kernel 'k'.
            - d_b_conv (np.ndarray): Gradiente Loss rispetto bias 'b_conv'.
    """
    ac, kc, kw, kh = k.shape
    bs, nc, ih, iw = boi.shape
    bs_out, kc_out, nih, niw = d_ni.shape

    if b_conv.shape[0] != kc:
        raise ValueError(f"La dimensione del bias ({b_conv.shape[0]}) non corrisponde al numero di canali di output ({kc})")
    if (bs, kc, nih, niw) != ni_forward.shape:
        raise ValueError("Le dimensioni di d_ni non corrispondono a quelle di ni_forward.")
    if ac != nc:
         raise ValueError(f"Il numero di canali del kernel ({ac}) deve corrispondere al numero di canali dell'immagine ({nc})")

    # Inizializza i gradienti
    d_boi = np.zeros_like(boi)
    d_k = np.zeros_like(k)
    d_b_conv = np.zeros_like(b_conv) # Gradiente per il bias, shape (kc,)

    # Backprop attraverso ReLU: dL/d(somma+bias)
    d_activation_input = d_ni * (ni_forward > 0) # Shape (bs, kc, nih, niw)

    # Calcolo gradienti
    for i_bs in range(bs):
        for i_kc in range(kc):
            for i_nih in range(nih):
                for i_niw in range(niw):
                    # Gradiente che arriva a questo punto (prima della somma e bias)
                    grad_curr = d_activation_input[i_bs, i_kc, i_nih, i_niw]

                    # Se il gradiente è zero, non c'è contributo
                    if grad_curr == 0:
                        continue

                    # --- Accumula Gradiente per il Bias ---
                    # dL/db = dL/d(somma+bias) * d(somma+bias)/db
                    # d(somma+bias)/db = 1
                    # Quindi dL/db = dL/d(somma+bias) = grad_curr
                    # Sommiamo su bs, nih, niw per ogni kc
                    d_b_conv[i_kc] += grad_curr

                    # --- Accumula Gradienti per Kernel (d_k) e Input (d_boi) ---
                    # Questi calcoli rimangono invariati perché il bias
                    # viene aggiunto *dopo* la moltiplicazione input*kernel.
                    # Usiamo lo stesso grad_curr (dL/d(somma+bias)).
                    for i_nc in range(nc): # nc == ac
                        for i_kh in range(kh):
                            input_y = (i_nih * s) - p + i_kh
                            for i_kw in range(kw):
                                input_x = (i_niw * s) - p + i_kw
                                # Controlla limiti
                                if 0 <= input_y < ih and 0 <= input_x < iw:
                                    # dL/dk += dL/d(somma+bias) * d(somma)/dk
                                    d_k[i_nc, i_kc, i_kw, i_kh] += grad_curr * boi[i_bs, i_nc, input_y, input_x]
                                    # dL/dboi += dL/d(somma+bias) * d(somma)/dboi
                                    d_boi[i_bs, i_nc, input_y, input_x] += grad_curr * k[i_nc, i_kc, i_kw, i_kh]

    # Restituisci tutti i gradienti calcolati
    return d_boi, d_k, d_b_conv

#### With Comments

In [None]:
def ReLU_ConvolutionS(boi,k,p=0,s=1):
    #boi stands for batch of images and has these dimensions: Batch_size, Number of Channels, Width, Height.
    #k stands for kernel and has these dimensions: Input images' number of channels, number of kernels, Width, Height
    ac, kc, kw, kh = k.shape
    bs, nc, iw, ih = boi.shape
    niw = round(((iw-kw+(2*p))/s)+1) # new image width
    nih = round(((ih-kh+(2*p))/s)+1) # new image height
    ni= np.zeros(bs*kc*niw*nih).reshape(bs,kc,niw,nih) # new image

    for i_bs in range(bs):
        for i_kc in range(kc):
            for i_nih in range(nih):
                for i_niw in range(niw):
                    current_sum = 0.0
                    for i_nc in range(nc):
                        for i_kh in range(kh):
                            input_y = (i_nih * s) - p + i_kh
                            for i_kw in range(kw):
                                input_x = (i_niw * s) - p + i_kw
                                if 0 <= input_y < ih and 0 <= input_x < iw:
                                    input_val = boi[i_bs, i_nc, input_y, input_x]
                                    # Ottieni il peso corrispondente dal kernel
                                    # Indici kernel: [canale_input, indice_kernel_output, indice_larghezza, indice_altezza]
                                    # secondo l'unpack kw, kh = k.shape[2], k.shape[3]
                                    kernel_val = k[i_nc, i_kc, i_kw, i_kh]
                                    # Moltiplica e accumula
                                    current_sum += input_val * kernel_val
                    ni[i_bs, i_kc, i_nih, i_niw] = np.maximum(0, current_sum)
    return ni

def ReLU_ConvolutionS_backward(d_ni, boi, k, ni_forward, p=0, s=1):
    """
    Calcola la backpropagation per la funzione ReLU_ConvolutionS.

    Args:
        d_ni (np.ndarray): Gradiente della Loss rispetto all'output della convoluzione (ni).
                           Dimensioni: (bs, kc, nih, niw).
        boi (np.ndarray):  Input batch originale della forward pass.
                           Dimensioni: (bs, nc, ih, iw).
        k (np.ndarray):    Kernel originali della forward pass.
                           Dimensioni: (ac, kc, kw, kh) dove ac=nc.
        ni_forward (np.ndarray): Output originale della forward pass (prima del gradiente).
                                 Necessario per il gradiente della ReLU.
                                 Dimensioni: (bs, kc, nih, niw).
        p (int):           Padding usato nella forward pass.
        s (int):           Stride usato nella forward pass.

    Returns:
        tuple[np.ndarray, np.ndarray]: Una tupla contenente:
            - d_boi (np.ndarray): Gradiente della Loss rispetto all'input batch 'boi'.
                                  Dimensioni: (bs, nc, ih, iw).
            - d_k (np.ndarray):   Gradiente della Loss rispetto ai kernel 'k'.
                                  Dimensioni: (ac, kc, kw, kh).
    """
    ac, kc, kw, kh = k.shape
    bs, nc, ih, iw = boi.shape
    bs_out, kc_out, nih, niw = d_ni.shape # Le dimensioni di d_ni devono corrispondere a ni_forward

    if (bs, kc, nih, niw) != ni_forward.shape:
        raise ValueError("Le dimensioni di d_ni non corrispondono a quelle di ni_forward.")
    if ac != nc:
         raise ValueError(f"Il numero di canali del kernel ({ac}) deve corrispondere al numero di canali dell'immagine ({nc})")

    # Inizializza i gradienti a zero
    d_boi = np.zeros_like(boi)
    d_k = np.zeros_like(k)

    # --- Backpropagation attraverso ReLU ---
    # Il gradiente passa solo dove l'output della ReLU (ni_forward) era > 0.
    # dL/d(current_sum) = dL/dni * dni/d(current_sum)
    #                   = d_ni * (1 if current_sum > 0 else 0)
    #                   = d_ni * (1 if ni_forward > 0 else 0)
    d_current_sum = d_ni * (ni_forward > 0) # Element-wise multiplication

    # --- Calcolo dei Gradienti d_boi e d_k ---
    # Iteriamo attraverso gli elementi del gradiente d_current_sum (o d_ni dopo ReLU)
    # e propaghiamo il gradiente indietro a d_boi e d_k.

    for i_bs in range(bs):
        for i_kc in range(kc):
            for i_nih in range(nih):
                for i_niw in range(niw):
                    # Gradiente locale per questa posizione dell'output
                    grad_curr = d_current_sum[i_bs, i_kc, i_nih, i_niw]

                    # Se il gradiente è zero, non contribuisce ai gradienti precedenti
                    if grad_curr == 0:
                        continue

                    # Propagazione indietro: iteriamo sulla finestra di input/kernel
                    # che ha contribuito a questo output
                    for i_nc in range(nc): # nc == ac
                        for i_kh in range(kh):
                            input_y = (i_nih * s) - p + i_kh
                            for i_kw in range(kw):
                                input_x = (i_niw * s) - p + i_kw

                                # Verifica se l'input corrispondente era valido (dentro i limiti)
                                if 0 <= input_y < ih and 0 <= input_x < iw:
                                    # --- Calcolo d_k ---
                                    # dL/dk = dL/d(current_sum) * d(current_sum)/dk
                                    # d(current_sum)/dk[c,kc,kw,kh] = boi[bs,c,y_in,x_in]
                                    # L'indice del kernel è (i_nc, i_kc, i_kw, i_kh)
                                    # L'indice dell'input è (i_bs, i_nc, input_y, input_x)
                                    d_k[i_nc, i_kc, i_kw, i_kh] += grad_curr * boi[i_bs, i_nc, input_y, input_x]

                                    # --- Calcolo d_boi ---
                                    # dL/dboi = dL/d(current_sum) * d(current_sum)/dboi
                                    # d(current_sum)/dboi[bs,c,y_in,x_in] = k[c,kc,kw,kh]
                                    # L'indice dell'input è (i_bs, i_nc, input_y, input_x)
                                    # L'indice del kernel è (i_nc, i_kc, i_kw, i_kh)
                                    d_boi[i_bs, i_nc, input_y, input_x] += grad_curr * k[i_nc, i_kc, i_kw, i_kh]

    return d_boi, d_k



# X=np.arange(1,2*4*4+1).reshape(1,2,4,4)
# k = np.ones(1*2*2*2).reshape(2,1,2,2)
# print("X")
# print(X)
# print("k")
# print(k)
# X_c = ReLU_ConvolutionS(X,k,s=2)
# print("X_c")
# print(X_c)

#### No Comments

In [None]:
def ReLU_ConvolutionS(boi,k,p=0,s=1):
    ac, kc, kw, kh = k.shape
    bs, nc, iw, ih = boi.shape
    niw = round(((iw-kw+(2*p))/s)+1) # new image width
    nih = round(((ih-kh+(2*p))/s)+1) # new image height
    ni= np.zeros(bs*kc*niw*nih).reshape(bs,kc,niw,nih) # new image

    for i_bs in range(bs):
        for i_kc in range(kc):
            for i_nih in range(nih):
                for i_niw in range(niw):
                    current_sum = 0.0
                    for i_nc in range(nc):
                        for i_kh in range(kh):
                            input_y = (i_nih * s) - p + i_kh
                            for i_kw in range(kw):
                                input_x = (i_niw * s) - p + i_kw
                                if 0 <= input_y < ih and 0 <= input_x < iw:
                                    input_val = boi[i_bs, i_nc, input_y, input_x]
                                    kernel_val = k[i_nc, i_kc, i_kw, i_kh]
                                    current_sum += input_val * kernel_val
                    ni[i_bs, i_kc, i_nih, i_niw] = np.maximum(0, current_sum)
    return ni

def ReLU_ConvolutionS_backward(d_ni, boi, k, ni_forward, p=0, s=1):
    ac, kc, kw, kh = k.shape
    bs, nc, ih, iw = boi.shape
    bs_out, kc_out, nih, niw = d_ni.shape 

    if (bs, kc, nih, niw) != ni_forward.shape:
        raise ValueError("Le dimensioni di d_ni non corrispondono a quelle di ni_forward.")
    if ac != nc:
         raise ValueError(f"Il numero di canali del kernel ({ac}) deve corrispondere al numero di canali dell'immagine ({nc})")

    d_boi = np.zeros_like(boi)
    d_k = np.zeros_like(k)

    d_current_sum = d_ni * (ni_forward > 0)

    for i_bs in range(bs):
        for i_kc in range(kc):
            for i_nih in range(nih):
                for i_niw in range(niw):
                    grad_curr = d_current_sum[i_bs, i_kc, i_nih, i_niw]
                    if grad_curr == 0:
                        continue
                    for i_nc in range(nc): # nc == ac
                        for i_kh in range(kh):
                            input_y = (i_nih * s) - p + i_kh
                            for i_kw in range(kw):
                                input_x = (i_niw * s) - p + i_kw
                                if 0 <= input_y < ih and 0 <= input_x < iw:
                                    d_k[i_nc, i_kc, i_kw, i_kh] += grad_curr * boi[i_bs, i_nc, input_y, input_x]
                                    d_boi[i_bs, i_nc, input_y, input_x] += grad_curr * k[i_nc, i_kc, i_kw, i_kh]

    return d_boi, d_k

### MLP Layer

In [10]:
def softmax(x):
    e_x = np.exp(x - np.max(x,axis=-1,keepdims=True))  # for numerical stability
    return e_x / np.sum(e_x,axis=-1,keepdims=True)

def ReLU_SoftMax_FullyConnected(input_array,w1,b1,w2,b2):
    fl = (input_array @ w1)+b1 # first layer
    fa = np.maximum(0,fl) # first activation: ReLU
    sl = (fa @ w2)+b2 # second layer
    sa = softmax(sl) # second activation: SoftMax
    return fl,fa,sl,sa

#print(softmax([1,2,3,100000]))
#print(softmax_no_NS([1,2,3,1000]))
#r = np.array(np.array([1,2,777,2]))
#print(softmax(r))
#r = np.array((np.array([1,2,777,2]),np.array([1,2,777,2]),np.array([1,2,777,2])))
#print(softmax(r))

### Loss Function: Categorical Cross-Entropy

In [11]:
def crossEntropy(p,t):
    # p stands for prediction and t stands for true label
    # p = [0,0,1] and t = [1,0,0]
    p = p+(1/100000) # for numerical stability
    return -np.dot(t,np.log(p).T)

#c = [1,1000000000000000,1,1]
#c = softmax(c)
#print(c)
#c = crossEntropy(c,[0,1,0,0])
#print(c)

### Inference

### Training

#### Parameters

In [45]:
import numpy as np
import time
from tqdm import tqdm


# --- Parametri Architettura ---
num_classes = 10
bs = 1 # Batch size (mantenuto a 1 come nell'esempio)
input_channels = 1 # Scala di grigi
kh, kw = 3, 3      # Dimensione Kernel (Height, Width)
s = 2              # Stride per tutte le convoluzioni
p = 1              # Padding per tutte le convoluzioni (per cercare di mantenere le dimensioni dimezzate)

# Layer 1
kc1 = 32           # Numero kernel/canali output Layer 1
# Layer 2
kc2 = 64           # Numero kernel/canali output Layer 2
# Layer 3
kc3 = 128          # Numero kernel/canali output Layer 3

# Calcolo dimensioni output convoluzioni (per FC layer)
# Input: 28x28
h_in, w_in = 28, 28
h_out1 = int(((h_in - kh + 2 * p) / s) + 1) # (28-3+2)/2 + 1 = 14
w_out1 = int(((w_in - kw + 2 * p) / s) + 1) # (28-3+2)/2 + 1 = 14
h_out2 = int(((h_out1 - kh + 2 * p) / s) + 1) # (14-3+2)/2 + 1 = 7
w_out2 = int(((w_out1 - kw + 2 * p) / s) + 1) # (14-3+2)/2 + 1 = 7
h_out3 = int(((h_out2 - kh + 2 * p) / s) + 1) # (7-3+2)/2 + 1 = 4
w_out3 = int(((w_out2 - kw + 2 * p) / s) + 1) # (7-3+2)/2 + 1 = 4

# Dimensione input per FC layer
fc_input_size = kc3 * h_out3 * w_out3 # 128 * 4 * 4 = 2048
fc_hidden_size = 250 # Dimensione layer nascosto FC

# --- Inizializzazione Pesi ---
# Kernel Convoluzionali (Formato: InputChannels, OutputChannels, KernelHeight, KernelWidth)
# -> NOTA: Adatto il formato a (in_channels, out_channels, kh, kw) che è più standard
#         Se le tue funzioni USANO (in_channels, kw, kh, out_channels) come nel codice
#         originale, DEVI aggiustare l'ordine qui sotto E nelle funzioni!
#         Assumiamo ora (in_channels, out_channels, kh, kw)

# k1: 1 -> 32 channels, kernel 3x3
k1 = np.random.randn(input_channels, kc1, kh, kw) * 0.01
# k2: 32 -> 64 channels, kernel 3x3
k2 = np.random.randn(kc1, kc2, kh, kw) * 0.01
# k3: 64 -> 128 channels, kernel 3x3
k3 = np.random.randn(kc2, kc3, kh, kw) * 0.01

# Pesi Fully Connected
# w1: Da output conv flattenato (2048) a hidden layer (250)
w1 = np.random.randn(fc_input_size, fc_hidden_size) * 0.01
b1 = np.zeros((1, fc_hidden_size))
# w2: Da hidden layer (250) a output classes (10)
w2 = np.random.randn(fc_hidden_size, num_classes) * 0.01
b2 = np.zeros((1, num_classes))

# --- Parametri Training ---
length = images.shape[0] # Numero totale di immagini
lr = 0.001               # Learning rate (ridotto rispetto all'originale)
num_epochs = 3           # Aumentato per vedere qualche cambiamento


#### Cicle

In [None]:
# --- Ciclo di Training ---
print("Inizio Training...")
for epoch in range(num_epochs):
    epoch_loss = []
    start_time = time.time()
    # Usiamo tqdm per la barra di progresso sull'intero dataset per epoca
    loop = tqdm(range(0, length, bs), desc=f"Epoch {epoch+1}/{num_epochs}")

    for i in loop:
        # --- Mini-batch ---
        images_batch = images[i:(i+bs)] # Shape (bs, 1, 28, 28)
        labels_batch = labels[i:(i+bs)] # Shape (bs, 10) - One-hot
        images_batch = images_batch.reshape(1,1,28,28)
        # --- Forward Pass ---
        # Layer 1: Conv + ReLU
        # Input: (bs, 1, 28, 28), k1: (1, 32, 3, 3) -> Output: (bs, 32, 14, 14)
        rs1 = ReLU_ConvolutionS(images_batch, k1, p=p, s=s)

        # Layer 2: Conv + ReLU
        # Input: (bs, 32, 14, 14), k2: (32, 64, 3, 3) -> Output: (bs, 64, 7, 7)
        rs2 = ReLU_ConvolutionS(rs1, k2, p=p, s=s)

        # Layer 3: Conv + ReLU
        # Input: (bs, 64, 7, 7), k3: (64, 128, 3, 3) -> Output: (bs, 128, 4, 4)
        rs3 = ReLU_ConvolutionS(rs2, k3, p=p, s=s)

        # Flatten l'output dell'ultimo layer convoluzionale
        # Input: (bs, 128, 4, 4) -> Output: (bs, 2048)
        i_mlp = rs3.reshape(bs, -1)

        # Layer 4 & 5: Fully Connected + ReLU -> Fully Connected + Softmax
        fl, fa, sl, pred = ReLU_SoftMax_FullyConnected(i_mlp, w1, b1, w2, b2)

        # --- Loss ---
        loss = crossEntropy(pred, labels_batch)
        epoch_loss.append(loss)

        # --- Backward Pass ---
        # Backprop attraverso Softmax + CrossEntropy (semplificato)
        # dL/dsl = pred - labels_batch (gradiente dell'output prima di Softmax)
        dL_dsl = pred - labels_batch # Shape (bs, 10)

        # Backprop attraverso FC2 (Layer 5)
        # dL/dw2 = dL/dsl * dsl/dw2 = fa.T @ dL_dsl
        dL_dw2 = fa.T @ dL_dsl # Shape (250, 10)
        # dL/db2 = sum(dL/dsl)
        dL_db2 = np.sum(dL_dsl, axis=0, keepdims=True) # Shape (1, 10)
        # dL/dfa = dL/dsl * dsl/dfa = dL_dsl @ w2.T
        dL_dfa = dL_dsl @ w2.T # Shape (bs, 250)

        # Backprop attraverso ReLU (Layer 4)
        # dL/dfl = dL/dfa * dfa/dfl
        dReLU = (fl > 0).astype(float) # Gradiente ReLU
        dL_dfl = dL_dfa * dReLU # Shape (bs, 250)

        # Backprop attraverso FC1 (Layer 4)
        # dL/dw1 = dL/dfl * dfl/dw1 = i_mlp.T @ dL_dfl
        dL_dw1 = i_mlp.T @ dL_dfl # Shape (2048, 250)
        # dL/db1 = sum(dL/dfl)
        dL_db1 = np.sum(dL_dfl, axis=0, keepdims=True) # Shape (1, 250)
        # dL/di_mlp = dL/dfl * dfl/di_mlp = dL_dfl @ w1.T
        dL_di_mlp = dL_dfl @ w1.T # Shape (bs, 2048)

        # Backprop attraverso Flatten (Layer 3 output)
        # Reshape del gradiente per matchare l'output del conv layer 3
        # Input grad: (bs, 2048) -> Output grad: (bs, 128, 4, 4)
        d_rs3 = dL_di_mlp.reshape(rs3.shape)

        # Backprop attraverso Conv3 (Layer 3)
        # Input grad: d_rs3 (bs, 128, 4, 4)
        # Restituisce d_rs2 (grad per l'input rs2) e d_k3 (grad per kernel k3)
        d_rs2, d_k3 = ReLU_ConvolutionS_backward(d_rs3, rs2, k3, rs3, p=p, s=s)
        # d_rs2 shape: (bs, 64, 7, 7), d_k3 shape: (64, 128, 3, 3)

        # Backprop attraverso Conv2 (Layer 2)
        # Input grad: d_rs2 (bs, 64, 7, 7)
        # Restituisce d_rs1 (grad per l'input rs1) e d_k2 (grad per kernel k2)
        d_rs1, d_k2 = ReLU_ConvolutionS_backward(d_rs2, rs1, k2, rs2, p=p, s=s)
        # d_rs1 shape: (bs, 32, 14, 14), d_k2 shape: (32, 64, 3, 3)

        # Backprop attraverso Conv1 (Layer 1)
        # Input grad: d_rs1 (bs, 32, 14, 14)
        # Restituisce d_boi (non usato qui) e d_k1 (grad per kernel k1)
        _, d_k1 = ReLU_ConvolutionS_backward(d_rs1, images_batch, k1, rs1, p=p, s=s)
        # d_k1 shape: (1, 32, 3, 3)

        # --- Aggiornamento Pesi ---
        w1 -= lr * dL_dw1
        b1 -= lr * dL_db1
        w2 -= lr * dL_dw2
        b2 -= lr * dL_db2
        k1 -= lr * d_k1
        k2 -= lr * d_k2
        k3 -= lr * d_k3

        # Aggiorna la barra di progresso con la loss media finora
        loop.set_postfix(loss=f"{np.mean(epoch_loss):.4f}")

    # Fine Epoch
    end_time = time.time()
    avg_epoch_loss = np.mean(epoch_loss)
    print(f"Epoch {epoch+1} completata in {end_time - start_time:.2f}s - Loss Media: {avg_epoch_loss:.4f}")

print("Training Completato.")
np.savez("CNNSlow_w1b1w2b2k1k2k3.npz",w1,b1,w2,b2,k1,k2,k3)

Inizio Training...


Epoch 1/3:   1%|          | 682/60000 [35:44<51:48:39,  3.14s/it, loss=2.3018]


KeyboardInterrupt: 

In [None]:
np.savez("CNNSlow_w1b1w2b2k1k2k3.npz",w1,b1,w2,b2,k1,k2,k3)


('arr_0', 'arr_1', 'arr_2', 'arr_3', 'arr_4', 'arr_5', 'arr_6')


## CNN - Fast Implementation

### Layers description

#### Convolution Layer

In [6]:
def ReLU_Convolution(batch_of_images,kernel,p=0,s=1):
    if len(kernel.shape)==2:
        kw, kh = kernel.shape # kernel width, height and number of channels
        iw, ih = batch_of_images.shape # batch of images' number of images, number of channels, single image's width, single images's height
        nc = 1
        window_m = np.lib.stride_tricks.sliding_window_view(batch_of_images,(kw,kh))[::s,::s].reshape(-1,(kw*kh*nc)) # window matrix
    else:
        ac, kc, kw, kh = kernel.shape # kernel width, height and number of channels
        bs, nc, iw, ih = batch_of_images.shape # batch of images' number of images, number of channels, single image's width, single images's height
        #p = 0 # padding
        #s = 1 # stride
        # im2col: Window creation
        window_m = np.lib.stride_tricks.sliding_window_view(batch_of_images,(1,nc,kw,kh))[:,:,::s,::s].reshape(-1,(kw*kh*nc)) # window matrix
    # Convolution
    kernel = kernel.reshape((kw*kh*nc),-1)
    c_m = window_m @ kernel # convolved image matrix
    # ReLU activation
    r_c_m = np.maximum(0,c_m) # convolved image matrix after ReLU activation
    
    niw = round(((iw-kw+(2*p))/s)+1) # new image width
    nih = round(((ih-kh+(2*p))/s)+1) # new image height
    
    # First operate a reshape keeping spatial ordering, which has channels at the end
    if len(kernel.shape)==2:
        reshaped_correct_order = r_c_m.reshape(nih, niw)
    else:
        output_temp = r_c_m.reshape(bs, nih, niw, kc)
        # Transpose to have input in shapes (batch, canali_output, height, width)
        reshaped_correct_order = output_temp.transpose(0, 3, 1, 2)
    return reshaped_correct_order


def ReLU_Convolution_Backward(batch_of_images,kernel,dX,p=0,s=1):
    print("This is the backward of the ReLU Convolution")
    if len(kernel.shape)==2:
        kw, kh = kernel.shape # kernel width, height and number of channels
        iw, ih = batch_of_images.shape # batch of images' number of images, number of channels, single image's width, single images's height
        nc = 1
        window_m = np.lib.stride_tricks.sliding_window_view(batch_of_images,(kw,kh))[::s,::s].reshape(-1,(kw*kh*nc)) # window matrix
        window_m_dx = np.lib.stride_tricks.sliding_window_view(dX,(kw,kh))[::s,::s].reshape(-1,(kw*kh*nc)) # window matrix of dX
    else:
        ac, kc, kw, kh = kernel.shape # kernel width, height and number of channels
        bs, nc, iw, ih = batch_of_images.shape # batch of images' number of images, number of channels, single image's width, single images's height
        #p = 0 # padding
        #s = 1 # stride
        # im2col: Window creation
        window_m = np.lib.stride_tricks.sliding_window_view(batch_of_images,(1,nc,kw,kh))[:,:,::s,::s].reshape(-1,(kw*kh*nc)) # window matrix
        window_m_dx = np.lib.stride_tricks.sliding_window_view(dX,(1,nc,kw,kh))[:,:,::s,::s].reshape(-1,(kw*kh*nc)) # window matrix of dX
    # Convolution
    kernel = kernel.reshape((kw*kh*nc),-1)
    c_m = window_m @ kernel # convolved image matrix
    # ReLU activation
    r_c_m = np.maximum(0,c_m) # convolved image matrix after ReLU activation
    r_c_m[r_c_m>0]=1 # Backward ReLU
    print("dX,X,kernel shapes")
    print(window_m_dx.shape)
    print(window_m.shape)
    print(kernel.shape)


X=np.arange(1,4*4+1).reshape(4,4)
k = np.ones(1*2*2).reshape(2,2)
print("X")
print(X)
print("k")
print(k)
X_c = ReLU_Convolution(X,k,s=2)
print("X_c")
print(X_c)
ReLU_Convolution_Backward(X,k,X_c)


X
[[ 1  2  3  4]
 [ 5  6  7  8]
 [ 9 10 11 12]
 [13 14 15 16]]
k
[[1. 1.]
 [1. 1.]]
X_c
[[14. 22.]
 [46. 54.]]
This is the backward of the ReLU Convolution
dX,X,kernel shapes
(1, 4)
(9, 4)
(4, 1)


#### Max Pooling Layer

In [None]:
def MaxPooling(boi, winSize=2, stride=2):
    bs, nc, H_in, W_in = boi.shape

    H_out = math.floor((H_in - winSize) / stride) + 1
    W_out = math.floor((W_in - winSize) / stride) + 1

    output_window_shape = (bs, nc, H_out, W_out, winSize, winSize)
    
    y_windows = np.lib.stride_tricks.as_strided(boi, shape=output_window_shape)

    reshaped_y_for_max = y_windows.reshape(bs * nc * H_out * W_out, winSize * winSize)
    
    indices = np.argmax(reshaped_y_for_max, axis=1) # Indici piatti (0 a winSize*winSize-1)
    max_values = reshaped_y_for_max.max(axis=1)
    
    pooled_output = max_values.reshape(bs, nc, H_out, W_out)
    
    cache = (boi.shape, indices, winSize, stride) # indices è 1D
    return pooled_output, cache

def BackwardMaxPooling(d_out, cache):

    A_prev_shape, indices_flat, winSize, stride = cache
    bs, nc, H_prev, W_prev = A_prev_shape
    _, _, H_out, W_out = d_out.shape

    dA_prev = np.zeros(A_prev_shape)

    idx_row_in_window = indices_flat // winSize
    idx_col_in_window = indices_flat % winSize  


    b_idx, ch_idx, r_out_idx, c_out_idx = np.indices((bs, nc, H_out, W_out))

    vert_start = r_out_idx * stride 
    horiz_start = c_out_idx * stride 

    idx_row_in_window_reshaped = idx_row_in_window.reshape(bs, nc, H_out, W_out)
    idx_col_in_window_reshaped = idx_col_in_window.reshape(bs, nc, H_out, W_out)
    
    abs_row_coords = vert_start + idx_row_in_window_reshaped 
    abs_col_coords = horiz_start + idx_col_in_window_reshaped 
    
    indices_for_add_at = (
        b_idx,                  
        ch_idx,                 
        abs_row_coords,         
        abs_col_coords        
    )
    
    np.add.at(dA_prev, indices_for_add_at, d_out)
    
    return dA_prev

#X = np.arange(1,33).reshape(2,1,4,4)
#X[0,0,0,0]=100
#
#pooled,cache = MaxPooling2D(X)
#print(cache)
#dX = np.array([1,5,2,9,4,3,2,8]).reshape(2,1,2,2)
#new_X = backward_maxpool_vectorized(dX,cache)
#print(new_X)


#### MLP Layer

In [None]:
def softmax(x):
    e_x = np.exp(x - np.max(x,axis=-1,keepdims=True))  # for numerical stability
    return e_x / np.sum(e_x,axis=-1,keepdims=True)

def ReLU_SoftMax_FullyConnected(input_array,w1,b1,w2,b2):
    fl = (input_array @ w1)+b1 # first layer
    fa = np.maximum(0,fl) # first activation: ReLU
    sl = (fa @ w2)+b2 # second layer
    sa = softmax(sl) # second activation: SoftMax
    return fl,fa,sl,sa

#print(softmax([1,2,3,100000]))
#print(softmax_no_NS([1,2,3,1000]))
#r = np.array(np.array([1,2,777,2]))
#print(softmax(r))
#r = np.array((np.array([1,2,777,2]),np.array([1,2,777,2]),np.array([1,2,777,2])))
#print(softmax(r))

#### Loss function
For this classification problem, the best loss function is the cross-entropy

In [None]:
def crossEntropy(p,t):
    # p stands for prediction and t stands for true label
    # p = [0,0,1] and t = [1,0,0]
    p = p+(1/100000) # for numerical stability
    return -np.dot(t,np.log(p).T)

#c = [1,1000000000000000,1,1]
#c = softmax(c)
#print(c)
#c = crossEntropy(c,[0,1,0,0])
#print(c)

### Inference

In [71]:
import time
print(numpy_weights)
weights_available = True
if weights_available:
    k1 = numpy_weights['k1']
    k2 = numpy_weights['k2']
    k3 = numpy_weights['k3']
    w1 = numpy_weights['w1']
    w2 = numpy_weights['w2']
    b1 = numpy_weights['b1']
    b2 = numpy_weights['b2']
    for i in [k1,k2,k3,w1,w2,b1,b2]:
        print(i.shape)
else:
    #ac is the adaptive channel, the number that corresponds to the amount of channels that the image has
    ac, kw, kh, kc = [1,3,3,32]
    k1 = np.random.rand(ac*kw*kh*kc).reshape(ac,kw,kh,kc)
    kc2 = 64
    ac2 = 32
    k2 = np.random.rand(ac2*kw*kh*kc2).reshape(ac2,kw,kh,kc2)
    h1 = 250
    w1 = np.random.rand(1600*250).reshape(1600,250)
    b1 = np.random.rand(250).reshape(1,250)
    w2 = np.random.rand(250*10).reshape(250,10)
    b2 = np.random.rand(10).reshape(1,10)

bs=1 # batch size
length = 1 #labels.shape[0]
lr = 0.01
num_epochs = 1
correct = 0
total = 0
loop= tqdm(range(0,num_epochs,bs))
for epoch in loop:
    avg_loss = []
    start_time = time.time()
    for i in range(0,length,bs):
        boi = images[i:(i+bs)].reshape(bs,1,28,28)
        rs1 = ReLU_Convolution(boi,k1) 
        rs2 = ReLU_Convolution(rs1,k2)
        rs3 = ReLU_Convolution(rs2,k3)
        i_mlp = rs3.flatten()
        fl,fa,sl,pred = ReLU_SoftMax_FullyConnected(i_mlp,w1,b1,w2,b2)

        # Loss
        loss = crossEntropy(pred,labels[i:(i+bs)])
        avg_loss.append(loss)
        if pred == labels[i:(i+bs)]:
            correct+= 1
        total += 1
        # # Backward
        # dL_dz2 = pred-labels[i:(i+bs)]
        # dL_dw2 = fa.T @ dL_dz2
        # dL_db2 = np.sum(dL_dz2, axis=0)
        # dL_dfa = dL_dz2 @ w2.T
        # dReLU = (fl > 0).astype(float)
        # dL_dfl = dL_dfa * dReLU
        # print(i_mlp.shape)
        # dL_dw1 = i_mlp.reshape(bs, -1).T @ dL_dfl
        # dL_db1 = np.sum(dL_dfl, axis=0)
        # dL_i_mlp = dL_dfl @ w1.T

        


        # w1 -= lr*dL_dw1
        # b1 -= lr*dL_db1
        # w2 -= lr*dL_dw2
        # b2 -= lr*dL_db2
    loop.set_postfix(average_loss=sum(avg_loss)/len(avg_loss),state=f"{round(100*i/length,2)}%",correctness=100*correct/total)
    end_time = time.time()

{'k1': array([[[[ 0.25444922, -0.30893782, -0.15066256],
         [ 0.0795004 , -0.17761572, -0.22886144],
         [ 0.37606657, -0.15557797,  0.08715349]],

        [[-0.07796349, -0.1744645 ,  0.32142672],
         [ 0.06785692, -0.16283797, -0.14963722],
         [-0.22427133, -0.17791815,  0.39172918]],

        [[-0.37847936, -0.35440397,  0.19926855],
         [-0.46100956,  0.23623335,  0.07524733],
         [ 0.00875574,  0.44175863,  0.11696613]],

        [[ 0.12216508,  0.04435208,  0.0567803 ],
         [-0.0141854 ,  0.22433724,  0.21419233],
         [ 0.17017817,  0.02683515, -0.41551712]],

        [[-0.04691067, -0.3580202 ,  0.19769657],
         [ 0.21854089, -0.08835822,  0.14769632],
         [-0.19616129,  0.0176739 ,  0.13939613]],

        [[-0.2733364 , -0.35128504, -0.25045168],
         [ 0.2601626 ,  0.06937825, -0.41099164],
         [ 0.36583582,  0.35990372, -0.12154322]],

        [[ 0.16774106,  0.07883533,  0.06790711],
         [-0.05339774,  0.00086

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]


ValueError: cannot reshape array of size 21632 into shape (26,26)

## Tests

### Inefficient Max Pooling Layer VS Efficient Max Pooling layer for Inference

In [None]:
#import time
################## PARAMETERS ###########################
#
#bs=1 # batch size
##ac is the adaptive channel, the number that corresponds to the amount of channels that the image has
#ac, kw, kh, kc = [1,3,3,32]
#k1 = np.random.rand(ac*kw*kh*kc).reshape(ac,kw,kh,kc)
#kc2 = 64
#ac2 = 32
#k2 = np.random.rand(ac2*kw*kh*kc2).reshape(ac2,kw,kh,kc2)
#h1 = 250
#w1 = np.random.rand(1600*250).reshape(1600,250)
#b1 = np.random.rand(250).reshape(1,250)
#w2 = np.random.rand(250*10).reshape(250,10)
#b2 = np.random.rand(10).reshape(1,10)
#
################## INFERENCE #############################
#length = 1000 # images.shape[0]
#start_time = time.time()
#for i in tqdm(range(0,length,bs)):
#    continue
#    rs1 = ReLU_Convolution(images[i:(i+bs)].reshape(bs,1,28,28),k1) 
#    # For convolution only, these are the times for processing all the images
#    # 11.0 seconds with bs = 10000
#    # 10.0 seconds with bs = 1000
#    # 09.9 seconds with bs = 100
#    # 08.0 seconds with bs = 10
#    # 06.0 seconds with bs = 1
#    # Why ? the window creation scales as O(N*W) where W is the window size and N is the dimensions of the image.
#    # Since images are stacked, they end up resulting as a single very big image which may cause problems.
#    mp1 = MaxPooling2D(rs1)
#    rs2 = ReLU_Convolution(mp1,k2)
#    mp2 = MaxPooling2D(rs2)
#    i_mlp = mp2.flatten()
#    fl,fa,sl,pred = ReLU_SoftMax_FullyConnected(i_mlp,w1,b1,w2,b2) #softmax doesn't work properly if batch_size > 1
#end_time = time.time()
#print(f"Loop with inefficient MaxPooling took: {round(end_time-start_time,2)}")
#start_time = time.time()
#for i in tqdm(range(0,length,bs)):
#    continue
#    rs1 = ReLU_Convolution(images[i:(i+bs)].reshape(bs,1,28,28),k1) 
#    # For convolution only, these are the times for processing all the images
#    # 11.0 seconds with bs = 10000
#    # 10.0 seconds with bs = 1000
#    # 09.9 seconds with bs = 100
#    # 08.0 seconds with bs = 10
#    # 06.0 seconds with bs = 1
#    # Why ? the window creation scales as O(N*W) where W is the window size and N is the dimensions of the image.
#    # Since images are stacked, they end up resulting as a single very big image which may cause problems.
#    mp1 = MaxPooling2D_Ef(rs1)
#    rs2 = ReLU_Convolution(mp1,k2)
#    mp2 = MaxPooling2D_Ef(rs2)
#    i_mlp = mp2.flatten()
#    fl,fa,sl,pred = ReLU_SoftMax_FullyConnected(i_mlp,w1,b1,w2,b2)
#    #loss = crossEntropy
#end_time = time.time()
#print(f"Loop with efficient MaxPooling took: {round(end_time-start_time,2)} seconds")

100%|██████████| 1000/1000 [00:00<?, ?it/s]


Loop with inefficient MaxPooling took: 0.05


100%|██████████| 1000/1000 [00:00<?, ?it/s]

Loop with efficient MaxPooling took: 0.0 seconds



