In [47]:
import os
import numpy as np
import torchvision.models as models
from PIL import Image
import matplotlib.pyplot as plt


import torch
import torchvision
import torchinfo
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.io import read_image
from torchvision.transforms import functional as TVF
from torchvision.datasets import ImageFolder
from torch.utils.data import random_split
from tqdm import tqdm
from torchinfo import summary
from torch.utils.data import ConcatDataset


data_dir = "MY_data"
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(device)

cuda


In [48]:
# Define multiple augmentation strategies
aug1 = transforms.Compose([
    transforms.Resize((40, 40)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

aug2 = transforms.Compose([
    transforms.Resize((40, 40)),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

aug3 = transforms.Compose([
    transforms.Resize((40, 40)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(30),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the original dataset
original_dataset = ImageFolder(data_dir + "/train", transform=aug1)

# Create augmented datasets
augmented_datasets = [
    ImageFolder(data_dir + "/train", transform=aug1),
    ImageFolder(data_dir + "/train", transform=aug2),
    ImageFolder(data_dir + "/train", transform=aug3)
]

# Concatenate the original and augmented datasets
full_dataset = ConcatDataset([original_dataset] + augmented_datasets)

# Split into training and validation sets
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_subset, val_subset = random_split(full_dataset, [train_size, val_size])

# Create data loaders
train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)

# Print dataset details
num_classes = len(original_dataset.classes)
print(f'Number of classes: {num_classes}')
print(f'Number of images in  trainset: {len(full_dataset)}')


Number of classes: 10
Number of images in augmented trainset: 9204


In [49]:
import matplotlib.pyplot as plt

def plot_training_results(train_losses, val_losses, train_accuracies, val_accuracies):
    
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(14, 6))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label='Training Loss')
    plt.plot(epochs, val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    # Plotting accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accuracies, label='Training Accuracy')
    plt.plot(epochs, val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()

    plt.show()


In [50]:
def train(net, trainloader, valloader, device, num_epochs, lr=0.01, weight_decay = 0.8, step_size = 5, gamma = 0.1):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    
    # Transfer model to GPU
    net = net.to(device)

    # Set the optimizer using the lr and momentum settings passed by the user
    optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        running_loss = 0
        running_corrects = 0
        total_samples = 0

        # Training phase
        net.train()  # Ensure the model is in training mode
        for i, (inputs, labels) in enumerate(trainloader):
            optimizer.zero_grad()

            # Transfer data to GPU
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Forward propagation to get outputs
            outputs = net(inputs)

            # Compute loss
            loss = criterion(outputs, labels)

            # Backpropagation to get gradients of all parameters
            loss.backward()

            # Update parameters
            optimizer.step()

            # Accumulate loss
            running_loss += loss.item() * inputs.size(0)

            # Calculate accuracy for the batch
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels)
            total_samples += inputs.size(0)

        # Calculate average training loss and accuracy
        train_loss = running_loss / total_samples
        train_acc = running_corrects.float() / total_samples
        train_losses.append(train_loss)
        train_accuracies.append(train_acc.cpu().numpy())

        # Validation phase
        net.eval()  # Set model to evaluation mode
        val_loss = 0
        val_corrects = 0
        val_samples = 0

        with torch.no_grad():  # Disable gradient computation
            for inputs, labels in valloader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = net(inputs)
                loss = criterion(outputs, labels)
                

                val_loss += loss.item() * inputs.size(0)

                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels)
                val_samples += inputs.size(0)

        val_loss /= val_samples
        val_acc = val_corrects.double() / val_samples
        val_losses.append(val_loss)
        val_accuracies.append(val_acc.cpu().numpy())

        print(f'[Epoch {epoch+1:2d}]: train_loss = {train_loss:.4f}, train_acc = {train_acc:.4f}, '
              f'validation_loss = {val_loss:.4f}, validation_acc = {val_acc:.4f}')

        scheduler.step()  # Step the learning rate scheduler

    print("Training completed.")
    return train_losses, val_losses, train_accuracies, val_accuracies

In [51]:
import torch
from sklearn.metrics import precision_score, recall_score, f1_score

def evaluate(net, dataloader, device):
    net.eval()  # Set model to evaluation mode
    all_targets = []
    all_predictions = []

    with torch.no_grad():  # Disable gradient computation
        for inputs, targets in dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device)

            outputs = net(inputs)
            _, predicted = torch.max(outputs, 1)  # Extract the predicted class indices
            
            all_targets.extend(targets.cpu().numpy())  # Store the true labels
            all_predictions.extend(predicted.cpu().numpy())  # Store the predicted labels

    # Calculate accuracy
    accuracy = sum(np.array(all_predictions) == np.array(all_targets)) / len(all_targets)
    print(f'Test accuracy: {accuracy:.4f}')
    
    # Calculate Precision, Recall, and F1-Score
    precision = precision_score(all_targets, all_predictions, average='weighted')
    recall = recall_score(all_targets, all_predictions, average='weighted')
    f1 = f1_score(all_targets, all_predictions, average='weighted')

    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')

In [52]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super(SEBlock, self).__init__()
        self.fc1 = nn.Linear(in_channels, in_channels // reduction)
        self.fc2 = nn.Linear(in_channels // reduction, in_channels)

    def forward(self, x):
        batch, channels, _, _ = x.size()
        squeeze = F.adaptive_avg_pool2d(x, 1).view(batch, channels)
        excitation = F.relu(self.fc1(squeeze))
        excitation = torch.sigmoid(self.fc2(excitation)).view(batch, channels, 1, 1)
        return x * excitation

class SimplifiedModel(nn.Module):
    def __init__(self):
        super(SimplifiedModel, self).__init__()

        # Convolutional Layers
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.batchnorm1 = nn.BatchNorm2d(64)
        self.batchnorm2 = nn.BatchNorm2d(128)
        self.batchnorm3 = nn.BatchNorm2d(256)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

        # Squeeze-and-Excitation Block
        self.se = SEBlock(in_channels=256)

        # Fully Connected Layers
        self.fc1 = nn.Linear(256, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 36)
        self.batchnorm4 = nn.BatchNorm1d(64)
        self.batchnorm5 = nn.BatchNorm1d(32)
        self.dropout = nn.Dropout(0.3)  # Reduced dropout

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        
        x = F.relu(self.conv2(x))
        x = self.batchnorm1(x)
        x = self.pool(x)
        
        x = F.relu(self.conv3(x))
        x = self.batchnorm2(x)
        
        x = F.relu(self.conv4(x))
        x = self.batchnorm3(x)
        x = self.avgpool(x)
        
        # Apply Squeeze-and-Excitation block
        x = self.se(x)
        
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.batchnorm4(x)
        x = self.dropout(x)
        
        x = F.relu(self.fc2(x))
        x = self.batchnorm5(x)
        x = self.dropout(x)
        
        x = self.fc3(x)
    
        return x


net = SimplifiedModel()
summary(net, input_size=(64, 3, 40, 40))


Layer (type:depth-idx)                   Output Shape              Param #
SimplifiedModel                          [64, 36]                  --
├─Conv2d: 1-1                            [64, 32, 40, 40]          896
├─MaxPool2d: 1-2                         [64, 32, 20, 20]          --
├─Conv2d: 1-3                            [64, 64, 20, 20]          18,496
├─BatchNorm2d: 1-4                       [64, 64, 20, 20]          128
├─MaxPool2d: 1-5                         [64, 64, 10, 10]          --
├─Conv2d: 1-6                            [64, 128, 10, 10]         73,856
├─BatchNorm2d: 1-7                       [64, 128, 10, 10]         256
├─Conv2d: 1-8                            [64, 256, 10, 10]         295,168
├─BatchNorm2d: 1-9                       [64, 256, 10, 10]         512
├─AdaptiveAvgPool2d: 1-10                [64, 256, 1, 1]           --
├─SEBlock: 1-11                          [64, 256, 1, 1]           --
│    └─Linear: 2-1                       [64, 16]                  4

In [56]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        # Increase the number of filters
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)  # 32 -> 64
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)  # 64 -> 128
        self.batchnorm1 = nn.BatchNorm2d(128)
        
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)  # 128 -> 256
        self.batchnorm2 = nn.BatchNorm2d(256)
        
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)  # 256 -> 512
        self.batchnorm3 = nn.BatchNorm2d(512)
        
        self.maxpool = nn.MaxPool2d(2, 2)
        
        # Dropout layer after convolutional layers
        self.dropout_conv = nn.Dropout(p=0.5)  # Dropout after convolutional layers
        
        # Fully connected layers
        self.fc1 = nn.Linear(512 * 5 * 5, 1024)  # Adjusted to match new conv4 filters
        self.dropout_fc1 = nn.Dropout(p=0.5)
        
        self.fc2 = nn.Linear(1024, 10)  # More units in fc1

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.batchnorm1(x)
        x = self.maxpool(x)
        
        x = F.relu(self.conv3(x))
        x = self.batchnorm2(x)
        x = self.maxpool(x)
        
        x = F.relu(self.conv4(x))
        x = self.batchnorm3(x)
        x = self.maxpool(x)
        
        x = self.dropout_conv(x)  # Apply dropout to convolutional features
        
        x = x.view(-1, 512 * 5 * 5)  # Adjust to match the new filters
        x = F.relu(self.fc1(x))
        x = self.dropout_fc1(x)
        
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

net = CNN()
summary(net, input_size=(64, 3, 40, 40))


Layer (type:depth-idx)                   Output Shape              Param #
CNN                                      [64, 10]                  --
├─Conv2d: 1-1                            [64, 64, 40, 40]          1,792
├─Conv2d: 1-2                            [64, 128, 40, 40]         73,856
├─BatchNorm2d: 1-3                       [64, 128, 40, 40]         256
├─MaxPool2d: 1-4                         [64, 128, 20, 20]         --
├─Conv2d: 1-5                            [64, 256, 20, 20]         295,168
├─BatchNorm2d: 1-6                       [64, 256, 20, 20]         512
├─MaxPool2d: 1-7                         [64, 256, 10, 10]         --
├─Conv2d: 1-8                            [64, 512, 10, 10]         1,180,160
├─BatchNorm2d: 1-9                       [64, 512, 10, 10]         1,024
├─MaxPool2d: 1-10                        [64, 512, 5, 5]           --
├─Dropout: 1-11                          [64, 512, 5, 5]           --
├─Linear: 1-12                           [64, 1024]          

In [None]:
train_losses, val_losses, train_accuracies, val_accuracies = train(net, train_loader, val_loader, device, num_epochs=60, lr=0.001, weight_decay=1e-4,step_size=5, gamma=0.7)

[Epoch  1]: train_loss = 2.4614, train_acc = 0.2991, validation_loss = 1.7233, validation_acc = 0.3895
[Epoch  2]: train_loss = 1.7499, train_acc = 0.3982, validation_loss = 1.5729, validation_acc = 0.4753
[Epoch  3]: train_loss = 1.6304, train_acc = 0.4478, validation_loss = 1.6212, validation_acc = 0.4405
[Epoch  4]: train_loss = 1.5281, train_acc = 0.4865, validation_loss = 1.4846, validation_acc = 0.5225
[Epoch  5]: train_loss = 1.4342, train_acc = 0.5355, validation_loss = 1.1955, validation_acc = 0.6095
[Epoch  6]: train_loss = 1.2074, train_acc = 0.5983, validation_loss = 1.0773, validation_acc = 0.6475
[Epoch  7]: train_loss = 1.1398, train_acc = 0.6290, validation_loss = 0.9887, validation_acc = 0.6676
[Epoch  8]: train_loss = 1.0823, train_acc = 0.6436, validation_loss = 0.9450, validation_acc = 0.6752
[Epoch  9]: train_loss = 1.0258, train_acc = 0.6660, validation_loss = 0.9282, validation_acc = 0.6860
[Epoch 10]: train_loss = 0.9527, train_acc = 0.6878, validation_loss = 0.

In [None]:
plot_training_results(train_losses, val_losses, train_accuracies, val_accuracies)

In [55]:
test_transform = transforms.Compose([
    transforms.Resize((40, 40)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

testset = ImageFolder(data_dir + "/Test", transform=test_transform)
test_loader = DataLoader(testset, batch_size=32, shuffle=False)
evaluate(net, test_loader, device)

Test accuracy: 0.7385
Precision: 0.7380
Recall: 0.7385
F1-Score: 0.7340


In [19]:
def predict_image(image_path, model, transform=None):
    model.eval()
    
    device = next(model.parameters()).device
    
    image = Image.open(image_path)
    
    img = transform(image)
    
    img = img.to(device)
    
    xb = img.unsqueeze(0)
    
    with torch.no_grad():
        yb = model(xb)
        
    _, preds = torch.max(yb, dim=1)
    
    return preds.item()