<a href="https://colab.research.google.com/github/MicheleGiambelli/Deep-Learning-Project/blob/tommy/Backup_DeepLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
mean = [0.5437, 0.4453, 0.3496]
std = [0.2687, 0.2710, 0.2731]

dataset, train_loader, val_loader, test_loader = data_loader(dataset_path = dataset_path,
                                                             batch_size = 64,
                                                             height = 224, width = 224,
                                                             mean = mean, std = std)

In [None]:
# Labels dictionary
class_to_idx = dataset.class_to_idx
print(class_to_idx)

In [None]:
class Inception(nn.Module):
    def __init__(self, input_channels, c1, c2, c3, c4, **kwargs):
        """
        Args:
            input_channels (int): Numero di canali in ingresso.
            c1 (int): Numero di canali di output per il percorso 1 (conv 1x1).
            c2 (tuple): (c2_reduce, c2_out), riduzione con 1x1 -> convoluzione 3x3.
            c3 (tuple): (c3_reduce, c3_out), riduzione con 1x1 -> convoluzione 5x5.
            c4 (int): Numero di canali di output per il percorso 4 (pooling + conv 1x1).
        """
        super(Inception, self).__init__(**kwargs) #super --> costruttore di nn.Module fondamentale in Pytorch per inizializzare la classe correttamente

        # Path 1: Single 1x1 Convolution
        self.path1 = nn.Conv2d(input_channels, c1, kernel_size=1)

        # Path 2: 1x1 -> 3x3 Convolution
        self.path2_1 = nn.Conv2d(input_channels, c2[0], kernel_size=1)
        self.path2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)

        # Path 3: 1x1 -> 5x5 Convolution
        self.path3_1 = nn.Conv2d(input_channels, c3[0], kernel_size=1)
        self.path3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)

        # Path 4: Max Pooling -> 1x1 Convolution
        self.path4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.path4_2 = nn.Conv2d(input_channels, c4, kernel_size=1)

    def forward(self, x):

        p1 = self.path1(x)
        p2 = self.path2_2(self.path2_1(x))
        p3 = self.path3_2(self.path3_1(x))
        p4 = self.path4_2(self.path4_1(x))

        # Concatenate along the channel dimension
        return torch.cat((p1, p2, p3, p4), dim=1)

In [None]:
# Blocco 1
b1 = nn.Sequential(
    nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)

# Blocco 2
b2 = nn.Sequential(
    nn.Conv2d(64, 64, kernel_size=1),
    nn.ReLU(),
    nn.Conv2d(64, 192, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)

# Blocco 3
b3 = nn.Sequential(
    Inception(192, 64, (96, 128), (16, 32), 32),
    Inception(256, 128, (128, 192), (32, 96), 64),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)

# Blocco 4
b4 = nn.Sequential(
    Inception(480, 192, (96, 208), (16, 48), 64),
    Inception(512, 160, (112, 224), (24, 64), 64),
    Inception(512, 128, (128, 256), (24, 64), 64),
    Inception(512, 112, (144, 288), (32, 64), 64),
    Inception(528, 256, (160, 320), (32, 128), 128),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)

# Blocco 5
b5 = nn.Sequential(
    Inception(832, 256, (160, 320), (32, 128), 128),
    Inception(832, 384, (192, 384), (48, 128), 128),
    nn.AdaptiveAvgPool2d((1, 1)),  # Global Average Pooling
    nn.Flatten()
)

# Rete completa
net = nn.Sequential(b1,b2,b3,b4,b5,
    nn.Linear(1024, 12),  # Classificazione
)

In [None]:
## Training Loop di Mike
def accuracy(predictions, targets):
    correct = 0
    total = 0
    _, pred_labels = torch.max(predictions, 1)
    total = targets.size(0)
    correct = (pred_labels == targets).sum().item()

    accuracy = correct / total
    return accuracy

def training_loop(network, training_loader, validation_loader,
                  epochs,
                  learning_rate=1e-3, weight_decay=0.0, patience=5,
                  device = "cuda",
                  save_model_name = "My_GoogleLeNet.pth"):

    device = torch.device(device if torch.cuda.is_available() else 'cpu')

    # Initialize the weights
    def init_weights(m):
      if type(m) == nn.Linear or type(m) == nn.Conv2d:
        nn.init.xavier_uniform_(m.weight) # Xavier parameter initialization: a particular method (see textbook section)

    network.apply(init_weights)

    # Move the net to the GPU if is avaible
    print(f"Training on device {device}\n")
    network.to(device)
    # Create the optimization method
    optimizer = torch.optim.Adam(network.parameters(), lr = learning_rate, weight_decay=weight_decay)

    # Create the loss function
    loss_function = nn.CrossEntropyLoss()

    # Lists to store the training and evaluation losses/accuracy
    train_losses = []
    val_losses = []
    train_acc = []
    val_acc = []
    best_val_loss = float('inf')
    best_val_acc = 0.0
    best_epoch = 0

    # Start the training
    for epoch in range(epochs):

      # TRAINING LOOP
      # Set the network in training mode
      network.train()
      # List to store the current epoch training loss and accuracy
      epoch_train_loss = []
      epoch_train_acc = []

      for image, target  in tqdm(training_loader, desc = f"Epoch {epoch+1}/{epochs}"):
        #  timer.start()
          # Reset the gradient
          optimizer.zero_grad()
          #MOVE each minibatch of data to the GPU (if available)
          image, target = image.to(device), target.to(device)
          # Compute the output
          output = network(image)
          # Compute the loss with Cross-Entropy for the current batch
          loss = loss_function(output, target)
          # Compute the gradient
          loss.backward()
          # Perform the update
          optimizer.step()

        #  timer.stop()

          # Collect minibatch training loss to compute the average loss of the epoch
          epoch_train_loss.append(loss.item())
          # Compute training accuracy
          acc = accuracy(output, target)
          epoch_train_acc.append(acc)

      # Average loss for the current epoch
      average_loss_train = torch.mean(torch.tensor(epoch_train_loss))
      train_losses.append(average_loss_train)
      #Average accuracy for the current epoch
      average_train_accuracy = torch.mean(torch.tensor(epoch_train_acc))
      train_acc.append(average_train_accuracy)

      # VALIDATION LOOP
      # After 1 epoch of training --> full iteration over evaluation data
      network.eval() # setting network in evaluation mode
      epoch_val_loss = []
      epoch_val_acc = []

      with torch.no_grad(): # Set no update for the weights
        for image, target in tqdm(validation_loader, desc = f"Epoch {epoch+1}/{epochs}"):
          image, target = image.to(device), target.to(device)
          output = network(image)
          loss = loss_function(output, target)
          epoch_val_loss.append(loss.item())
          acc = accuracy(output, target)
          epoch_val_acc.append(acc)

        average_loss_val = torch.mean(torch.tensor(epoch_val_loss))
        val_losses.append(average_loss_val)
        average_val_accuracy = torch.mean(torch.tensor(epoch_val_acc))
        val_acc.append(average_val_accuracy)

      print(f"EPOCH: {epoch+1} --- Train loss: {average_loss_train:7.4f} --- Eval loss: {average_loss_val:7.4f}\n")
      print(f"Train accuracy: {average_train_accuracy:7.4f} --- Eval accuracy: {average_val_accuracy:7.4f}\n")

      # Early stopping
      if average_loss_val < best_val_loss:
        best_val_loss = average_loss_val
        best_val_acc = average_val_accuracy
        best_epoch = epoch+1
        patience_counter = 0
        torch.save(network.state_dict(), save_model_name) #Save the best model till the current epoch
      else:
        patience_counter += 1
      if patience_counter >= patience:
          print(f"Early stopping triggered after {epoch+1} epochs\n")
          # Load the best model at the end of training
          network.load_state_dict(torch.load(save_model_name))
          print(f"BEST MODEL at EPOCH: {best_epoch+1} --- Eval loss: {best_val_loss:7.4f} --- Eval accuracy: {best_val_acc:7.4f}")
          break

    print("--- Training Ended! ---")
    return train_losses, val_losses, train_acc, val_acc, best_epoch

In [None]:
def evaluate_accuracy_gpu(net, data_iter, device=None):
    """Compute the accuracy for a model on a dataset using a GPU."""
    if isinstance(net, torch.nn.Module):
        net.eval()  # Set the model to evaluation mode
        if not device:
            device = next(iter(net.parameters())).device
    # No. of correct predictions, no. of predictions
    metric = d2l.Accumulator(2)
    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(X, list):
                X = [x.to(device) for x in X]
            else:
                X = X.to(device)
            y = y.to(device)
            metric.add(d2l.accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]


def train(net, train_iter, val_iter, test_iter, num_epochs, lr, weight_decay = 0.0, device=d2l.try_gpu()):
    """
    Train a model with a GPU, similar to the professor's style.
    Include validation and test accuracy checks without early stopping.
    """

    # Xavier initialization
    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)

    net.apply(init_weights)
    print('Training on', device)

    net.to(device)  # MOVE the net to the GPU (if available)
    optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=weight_decay)
    loss = nn.CrossEntropyLoss()

    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
                            legend=['train loss', 'train acc', 'val acc'])

    timer, num_batches = d2l.Timer(), len(train_iter)

    progress_bar = tqdm(range(num_epochs))

    for epoch in progress_bar:
        # Sum of training loss, sum of training accuracy, no. of examples
        metric = d2l.Accumulator(3)
        net.train() # Set the network to training mode

        for i, (X, y) in enumerate(train_iter):
            timer.start()
            optimizer.zero_grad()
            X, y = X.to(device), y.to(device) #MOVE minibatch to GPU (if available)
            y_hat = net(X)
            l = loss(y_hat, y)
            l.backward()
            optimizer.step()

            with torch.no_grad():
                # metric: training loss sum, training correct predictions sum, no. of samples processed
                metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
            timer.stop()

            # Update the animator at a fraction of epochs (for visualization)
            if (i + 1) % max(1, (num_batches // 5)) == 0 or i == num_batches - 1:
                train_l = metric[0] / metric[2]
                train_acc = metric[1] / metric[2]
                animator.add(epoch + (i + 1) / num_batches,
                             (train_l, train_acc, None, None))

        # Evaluate on the validation and test sets after each epoch
        val_acc = evaluate_accuracy_gpu(net, val_iter, device)
        #test_acc = evaluate_accuracy_gpu(net, test_iter, device)
        #animator.add(epoch + 1, (None, None, val_acc, test_acc))
        animator.add(epoch + 1, (None, None, val_acc))


    # Final metrics after last epoch
    train_l = metric[0] / metric[2]
    train_acc = metric[1] / metric[2]
    print(f'Final Results: loss {train_l:.3f}, train acc {train_acc:.3f}, val acc {val_acc:.3f}') #, test acc {test_acc:.3f}'
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on {str(device)}')


Plots delle metriche Mike

In [None]:
def plot_training_metrics(train_losses, val_losses, train_acc, val_acc):
    # Create subplots
    fig, axes = plt.subplots(1, 2, figsize=(10, 4))

    # Plot Training and Validation Loss on the first subplot
    axes[0].plot(train_losses, label='Training Loss', marker='o')
    axes[0].plot(val_losses, label='Validation Loss', marker='o')
    axes[0].set_xlabel('Epochs')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Training and Validation Loss')
    axes[0].legend()
    axes[0].grid(True)

    # Plot Training and Validation Accuracy on the second subplot
    axes[1].plot(train_acc, label='Training Accuracy', marker='o')
    axes[1].plot(val_acc, label='Validation Accuracy', marker='o')
    axes[1].set_xlabel('Epochs')
    axes[1].set_ylabel('Accuracy')
    axes[1].set_title('Training and Validation Accuracy')
    axes[1].legend()
    axes[1].grid(True)

    # Adjust layout and show
    plt.tight_layout()
    plt.show()

plot_training_metrics(train_losses, val_losses, train_acc, val_acc)

# **Res Net di backup**

In [None]:
class Residual(nn.Module):
    def __init__(self, in_channels, out_channels, use_1x1conv=False, strides=1, dropout_prob=0.3):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=strides)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        if use_1x1conv:
            self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=strides)
        else:
            self.conv3 = None

        self.dropout = nn.Dropout(p=dropout_prob)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        Y += X
        Y = self.dropout(Y)
        return F.relu(Y)

X = torch.rand(4, 3, 64, 64)
residual_block = Residual(3, 3, use_1x1conv=True)

output = residual_block(X)
print(output.shape)

In [None]:
class ResNet18(nn.Module):
    def __init__(self, num_classes=9, dropout_prob=0.3):
        super(ResNet18, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self.make_layer(64, 64, 2, dropout_prob=dropout_prob)
        self.layer2 = self.make_layer(64, 128, 2, stride=2, dropout_prob=dropout_prob)
        self.layer3 = self.make_layer(128, 256, 2, stride=2, dropout_prob=dropout_prob)
        self.layer4 = self.make_layer(256, 512, 2, stride=2, dropout_prob=dropout_prob)

        self.dropout = nn.Dropout(p=dropout_prob)
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, in_channels, out_channels, num_blocks, stride=1, dropout_prob=0.3):
        layers = []
        layers.append(Residual(in_channels, out_channels, strides=stride, use_1x1conv=True, dropout_prob=dropout_prob))
        for _ in range(1, num_blocks):
            layers.append(Residual(out_channels, out_channels, dropout_prob=dropout_prob))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = torch.flatten(x, 1)
        x = self.dropout(x)
        x = self.fc(x)
        return x

In [None]:
model = ResNet18(num_classes=12, dropout_prob=0.3)

train(net, train_loader, val_loader, test_loader, num_epochs=50, lr=0.01, weight_decay=1e-4)