## 4.2 Experiment adding Attention Mechanism to CNN Model

⚠️⚠️⚠️ *Please open this notebook in Google Colab* by click below link ⚠️⚠️⚠️<br><br>
<a href="https://colab.research.google.com/github/Muhammad-Yunus/Belajar-Image-Classification/blob/main/Pertemuan%204/4.1%20cnn_with_attention_mechanism.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a><br><br><br>
- Click `Connect` button in top right Google Colab notebook,<br>
<img src="resource/cl-connect-gpu.png" width="250px">
- If connecting process completed, it will turn to something look like this<br>
<img src="resource/cl-connect-gpu-success.png" width="250px">

- Check GPU connected into Colab environment is active

In [None]:
!nvidia-smi

- Load library

In [None]:
!pip install gdown

import os
import cv2
import gdown
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split

import torchvision
from torchvision import transforms

from IPython import display

# clear output cell
display.clear_output()

print(f"torch : {torch.__version__}")
print(f"torch vision : {torchvision.__version__}")

- Download MNIST Dataset

In [None]:
DATASET_NAME = 'MNIST' # the dataset name
DATASET_NUM_CLASS = 10 # number of class in dataset

In [None]:
# default using gdrive_id Dataset `mnist_dataset.zip` (1-FfwJrllyHofQwIbMb_IxAkxnfMGSFmR)
gdrive_id = '1-FfwJrllyHofQwIbMb_IxAkxnfMGSFmR' # <-----  ⚠️⚠️⚠️ USE YOUR OWN GDrive ID FOR CUSTOM DATASET ⚠️⚠️⚠️

# download zip from GDrive
url = f'https://drive.google.com/uc?id={gdrive_id}'
gdown.download(url, DATASET_NAME + ".zip", quiet=False)

# unzip dataset
!unzip {DATASET_NAME}.zip -d {DATASET_NAME}

# clear output cell
display.clear_output()

- Load MNIST Dataset
    - <font color="orange">DONT FLATTEN THE INPUT IMAGE ON DATA LOADER</font>,
    - WE WILL FEED 2D 28x28 MNIST DIGIT IMAGE DATA INTO MODEL

In [None]:
# Define Custom Dataset class
# it's just helper to load image dataset using OpenCV and convert to pytorch tensor
# also doing a label encoding using one-hot encoding
class CustomDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.image_files = sorted([file for file in os.listdir(root_dir) if file.lower().endswith('.png')])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Read image from corresponding .png file
        image_path = os.path.join(self.root_dir, self.image_files[idx])
        image = cv2.imread(image_path)  # Load image using OpenCV
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  # Convert BGR to GRAY
        image = torch.from_numpy(image).to(torch.float32)  # Convert NumPy array to PyTorch tensor
        image = image.view(1, 28, 28) # reshape loaded MNIST image into 1x28x28 format (required by the model)

        # Read label from corresponding .txt file
        label_path = os.path.splitext(image_path)[0] + ".txt"
        with open(label_path, 'r') as label_file:
            label = int(label_file.read().strip())  # Assuming labels are integers

        # Apply one-hot encoding into label
        labels_tensor = torch.tensor(label)
        one_hot_encoded = F.one_hot(labels_tensor, num_classes=DATASET_NUM_CLASS).to(torch.float32)

        return image, one_hot_encoded



# instantiate dataset
# in here the image dataset is not loaded yet
# we only read all image files names in fataset folder
all_train_dataset = CustomDataset(root_dir=f'{DATASET_NAME}/dataset/train')
test_dataset = CustomDataset(root_dir=f'{DATASET_NAME}/dataset/test')

In [None]:
print(f"All Train Dataset : {len(all_train_dataset)} data")
print(f"Test Dataset : {len(test_dataset)} data")

In [None]:
# Split 'all_train_dataset' into 'train' and 'validation' set using `random_split()` function
train_dataset, validation_dataset = random_split(all_train_dataset, [50000, 10000])

print(f"Train Dataset : {len(train_dataset)} data")
print(f"Validation Dataset : {len(validation_dataset)} data")

In [None]:
# Create data loaders
BATCH_SIZE = 128

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

____________
#### <font color="orange">Define Channel Attention : Squeeze and Excitation (SE) Block Here!!!</font>
- Squeeze and Excitation (SE) Channel Attention, we’ll implement the <font color="cyan">SE block</font> as a <font color="orange">separate class</font> and embed it in model <font color="orange">after each convolutional layer</font>. 
- The SE block will <font color="cyan">recalibrate</font> each <font color="orange">feature map</font> by learning to emphasize the more <font color="orange">informative channels</font>.
    - It <font color="orange">squeezes</font> the feature map using <font color="cyan">global average pooling</font>, 
    - Then <font color="orange">excites</font> it using a <font color="cyan">fully connected layer</font> followed by a sigmoid activation.<br><br>
        <table cellspacing="0" cellpadding="0" style="border:none;">
            <tbody>
                <tr>
                    <td>
                        <img src="resource/attention-se-block.png" width="250px">
                    </td>
                    <td>
                        GAP = global average pooling<br>
                        FC = fully-connected layer<br><br>
                        <img src="resource/GAP.png" width="250px"><br>
                        <img src="resource/attention-se-block-2.png" width="400px"><br>
                    </td>
                </tr>
            </tbody>
        </table>

In [None]:
class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super(SEBlock, self).__init__()
        
        # Global average pooling to produce channel-wise statistics
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        
        # A fully connected layer with a reduction factor, 
        # followed by ReLU activation and another fully connected layer with a sigmoid activation
        self.fc = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels, in_channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Get the batch size and number of channels
        b, c, _, _ = x.size()
        
        # Perform global average pooling and reshape to (batch_size, channels)
        y = self.avg_pool(x).view(b, c)
        
        # Pass through the fully connected layers
        y = self.fc(y).view(b, c, 1, 1)
        
        # Scale the input feature maps with the recalibrated channel-wise statistics
        return x * y.expand_as(x)

- Here we use model defined <font color="orange">'3.3 cnn_with_batch_normalization.ipynb'</font>.
    - With additional <font color="orange">SE Block</font> after each convolution layer.<br>

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the model using nn.Sequential
model = nn.Sequential(
    # Convolutional layers with Batch Normalization and SE Attention
    nn.Conv2d(in_channels=1, out_channels=12, kernel_size=3, padding=2, bias=False),
    nn.BatchNorm2d(num_features=12, affine=True),
    nn.ReLU(),
    SEBlock(in_channels=12), # set SE Block in here
    
    nn.Conv2d(in_channels=12, out_channels=24, kernel_size=6, stride=2, padding=2, bias=False),
    nn.BatchNorm2d(num_features=24, affine=True),
    nn.ReLU(),
    SEBlock(in_channels=24), # set SE Block in here
    
    nn.Conv2d(in_channels=24, out_channels=32, kernel_size=6, stride=2, padding=2, bias=False),
    nn.BatchNorm2d(num_features=32, affine=True),
    nn.ReLU(),
    SEBlock(in_channels=32), # set SE Block in here
    
    # Flatten layer
    nn.Flatten(),
    
    # Fully connected layers with Batch Normalization
    nn.Linear(in_features=32 * 7 * 7, out_features=200, bias=False),
    nn.BatchNorm1d(num_features=200, affine=True),
    nn.ReLU(),
    nn.Dropout(0.6),
    nn.Linear(in_features=200, out_features=10),
    nn.LogSoftmax(dim=1)
).to(device)

# Iterate over model to find BatchNorm layers and modify them
for layer in model:
    if isinstance(layer, nn.BatchNorm2d) or isinstance(layer, nn.BatchNorm1d):
        # Set weight to 1 (disabling scaling)
        with torch.no_grad():
            layer.weight.fill_(1.0)
        # Freeze the weight from being updated
        layer.weight.requires_grad = False

In [None]:
# setup optimizer, loss function & metric
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_function = nn.CrossEntropyLoss()

- To run training process, we can use the following code

In [None]:
!pip install tqdm

from tqdm import tqdm

In [None]:
def train(model, train_loader, optimizer, loss_function):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    # Add progress bar for training loop
    progress_bar = tqdm(train_loader, desc='Training', leave=False)

    for inputs, labels in progress_bar:
        inputs = inputs.to(device) # move inputs to device
        labels = labels.to(device) # move labels to device

        # resets the gradients of all the model's parameters before the backward pass
        optimizer.zero_grad()
        # pass 2D 28x28 input tensor to CNN model
        outputs = model(inputs)
        # calc loss value
        loss = loss_function(outputs, labels)
        # computes the gradient of the loss with respect to each parameter in model
        loss.backward()
        # adjust model parameter
        optimizer.step()
        # sum loss value
        running_loss += loss.item()

        # Calculate correct & total prediction
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels.argmax(1)).sum().item()
        total_predictions += labels.size(0)

        # Update progress bar description with current loss
        progress_bar.set_postfix(loss=loss.item())

    # Calculate average training loss
    average_train_loss = running_loss / len(train_loader.dataset)
    # Calculate training accuracy
    train_accuracy = correct_predictions / total_predictions
    return average_train_loss, train_accuracy

def validate(model, val_loader, loss_function):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    # Add progress bar for validation loop
    progress_bar = tqdm(val_loader, desc='Validating', leave=False)

    with torch.no_grad():
        for inputs, labels in progress_bar:
            inputs = inputs.to(device) # move inputs to device
            labels = labels.to(device) # move labels to device

            # pass 2D 28x28 input tensor to CNN model
            outputs = model(inputs)
            # calc loss value
            loss = loss_function(outputs, labels)
            # sum loss value
            running_loss += loss.item()

            # Calculate correct & total prediction
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels.argmax(1)).sum().item()
            total_predictions += labels.size(0)

            # Update progress bar description with loss
            progress_bar.set_postfix(loss=loss.item())

    # Calculate average validation loss
    average_val_loss = running_loss / len(val_loader.dataset)
    # Calculate validation accuracy
    val_accuracy = correct_predictions / total_predictions
    return average_val_loss, val_accuracy





# This is a training loop for selected Epoch
# each epoch will process all training and validation set, chunked into small batch size data
# then measure the loss & accuracy of training and validation set
NUM_EPOCH = 10      # you can change this value

train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

for epoch in range(NUM_EPOCH):
    print(f"Epoch {epoch+1}/{NUM_EPOCH}")

    train_loss, train_accuracy = train(model, train_loader, optimizer, loss_function)
    val_loss, val_accuracy = validate(model, validation_loader, loss_function)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_accuracy * 100)  # convert to percentage
    val_accuracies.append(val_accuracy * 100)  # convert to percentage

    print(f"Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}, Train Accuracy = {train_accuracy:.4f}, Val Accuracy = {val_accuracy:.4f}\n")

- Plot Loss and Accuracy of Training vs Validation Set 

In [None]:
# visualize Loss & Accuracy
import matplotlib.pyplot as plt

epochs = list(range(1, NUM_EPOCH + 1))

# Plotting loss
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, 'b', label='Training Loss')
plt.plot(epochs, val_losses, 'r', label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plotting accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, 'b', label='Training Accuracy')
plt.plot(epochs, val_accuracies, 'r', label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True)

plt.tight_layout()


- Evaluate Model, find Precision, Recal each class data, measure accuracy and compute confusion matrix

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
import seaborn as sns
import numpy as np

# define evaluate function for test set
def evaluate(model, test_loader):
    model.eval()
    all_labels = []
    all_preds = []

    # Add progress bar for validation loop
    progress_bar = tqdm(test_loader, desc='Evaluating', leave=False)

    with torch.no_grad():
        # iterate over all batched test set
        for inputs, labels in progress_bar:
            inputs = inputs.to(device) # move inputs to device
            labels = labels.to(device) # move labels to device

            # pass 2D 28x28 input tensor to CNN model
            outputs = model(inputs)
            # get prediction
            _, preds = torch.max(outputs, 1)
            # collect all labels & preds
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    return all_labels, all_preds

# Evaluation on test set
all_labels, all_preds = evaluate(model, test_loader)
all_labels = np.argmax(all_labels, axis=1)

# Calculate classification report
labels = [str(i) for i in range(DATASET_NUM_CLASS)]
print(classification_report(all_labels, all_preds, target_names=labels))

# Confusion Matrix
conf_matrix = confusion_matrix(all_labels, all_preds)

# Plotting the confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues")
plt.xlabel('Predicted Class')
plt.ylabel('Actual Class')
plt.title('Confusion Matrix')
plt.show()

- Download Model 

In [None]:
# Save the model
torch.save(model.state_dict(), 'trained_cnn_model.pt')

# Download the model file
from google.colab import files
files.download('trained_cnn_model.pt')

>
>## Discussion
>- It looks like dropout not help much to reduce overfitting,
>- It's also has negative impact by reducing training accuracy.
>- Now we will try to learn new possibility to adopt other regularization technique.
>- It's called <font color="cyan">Batch Normalization</font>
>.

- Open <font color="orange">'3.3 cnn_with_batch_normalization.ipynb'</font> in Google Colab to learn more...<br> 
<a href="https://colab.research.google.com/github/Muhammad-Yunus/Belajar-Image-Classification/blob/main/Pertemuan%203/3.3%20cnn_with_batch_normalization.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

_________________________________________________________________________
<br><br><br>
# Source
- https://lilianweng.github.io/posts/2018-06-24-attention/?ref=blog.paperspace.com
- https://link.springer.com/content/pdf/10.1007/s41095-022-0271-y.pdf
- https://www.researchgate.net/figure/Before-inputting-the-SE-attention-mechanism-left-colorless-figure-C-the-importance-of_fig1_366512193
- https://www.digitalocean.com/community/tutorials/attention-mechanisms-in-computer-vision-cbam
- https://www.researchgate.net/figure/Diagram-of-the-channel-attention-module-and-spatial-attention-module-for-the_fig3_347669937
- https://arxiv.org/pdf/1805.08318
- https://arxiv.org/pdf/1811.12006v2
- https://medium.com/@shravankoninti/transformers-attention-is-all-you-need-overview-on-multi-headed-attention-379eb8d095dc