In [None]:
import pandas as pd
import cv2
import os 
import torch.nn as nn
from torch.utils.data import Dataset ,DataLoader, Subset
import torch.optim as optim
import numpy as np
import torch
from torchvision import transforms
import matplotlib.pyplot as plt
from PIL import Image
from torch.utils.data import random_split
from tqdm import tqdm

In [None]:
data_dir = '/kaggle/input/cassava-leaf-disease-classification'
batch_size = 32
epochs = 30
num_classes = 5
lr_rate = 0.05

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
os.listdir(data_dir)
print('Train images: %d' %len(os.listdir(os.path.join(data_dir, "train_images"))))

In [None]:
train_labels = pd.read_csv(os.path.join(data_dir, "train.csv"))
train_labels.head()

In [None]:
# Design a specific Dataset to read local data
class CustomDataset(Dataset):
    # Initialize the dataset with the CSV file, root directory, and optional transform
    def __init__(self, csv_file, root_dir, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        
    # Return the number of samples in the dataset
    def __len__(self):
        return len(self.data_frame)

    # Get the file name and label for the specified index
    def __getitem__(self, idx):
        img_name = self.data_frame.iloc[idx, 0]
        img_path = os.path.join(self.root_dir, img_name)
        image = Image.open(img_path).convert('RGB')

        label = int(self.data_frame.iloc[idx, 1])
        # Apply the specified transform to the image if it exists
        if self.transform:
            image = self.transform(image)

        return image, label

# Define data transform
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])
# Read images from local file
dataset = CustomDataset(csv_file=os.path.join(data_dir, "train.csv"), root_dir=os.path.join(data_dir, "train_images"), transform=transform)

# Calculate the dataset size
dataset_size = len(dataset)

# Define the ratio
train_ratio = 0.8
val_ratio = 0.1
test_ratio = 0.1

# Calculate the size of every dataset
train_size = int(train_ratio * dataset_size)
val_size = int(val_ratio * dataset_size)
test_size = dataset_size - train_size - val_size

# Split the dataset
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Use DataLoader to load data
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Print the size of dataset
print(f'Train dataset size: {len(train_dataset)}')
print(f'Validation dataset size: {len(val_dataset)}')
print(f'Test dataset size: {len(test_dataset)}')

In [None]:
# Define a simple Convolutional Neural Network (CNN) class
class SimpleCNN(nn.Module):
    # init()：Start initialize
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        # First convolutional layer
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        # Use ReLU as the activation function
        self.relu = nn.ReLU(inplace=True)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        # Second convolutional layer
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        # Fully connected layers for classification
        self.fc1 = nn.Linear(64 * 56 * 56, 64)
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        # Move the input tensor to the device (GPU or CPU)
        x = x.to(device)
        # First convolutional layer
        x = self.conv1(x)
        x = self.relu(x)
        x = self.pool(x)
        # Second convolutional layer
        x = self.conv2(x)
        x = self.relu(x)
        x = self.pool(x)
        # Flatten the output for the fully connected layers
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.fc2(x)
        return x

In [None]:
# Design ResNet
class BasicBlock(nn.Module):
    # expansion refers to the multiple of decreasing the scale to increase the dimension in each small residual block
    expansion = 1
 
    # init()：Start initialize
    def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,
                               kernel_size=3, stride=stride, padding=1, bias=False)
        # Use batch normalization
        self.bn1 = nn.BatchNorm2d(out_channel)
        # Use ReLU as the activation function
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
        self.downsample = downsample
 
    # forward()：The forward propagation process is defined and the connections between the layers are described
    def forward(self, x):
        # The residual block retains the original input
        identity = x
        # In the case of a dashed residual structure, downsampling is performed
        if self.downsample is not None:
            identity = self.downsample(x)
 
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        # -----------------------------------------
        out = self.conv2(out)
        out = self.bn2(out)
        # The main branch and the shortcut branch data are added
        out += identity
        out = self.relu(out)
 
        return out
 
 
# Define the residual structure of ResNet50/101/152
class Bottleneck(nn.Module):
    # expansion refers to the multiple of decreasing the scale to increase the dimension in each small residual block
    expansion = 4
 
    # init()：Start initialize
    def __init__(self, in_channel, out_channel, stride=1, downsample=None,
                 groups=1, width_per_group=64):
        super(Bottleneck, self).__init__()
 
        width = int(out_channel * (width_per_group / 64.)) * groups
 
        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width,
                               kernel_size=1, stride=1, bias=False)
        # Use batch normalization
        self.bn1 = nn.BatchNorm2d(width)
        # -----------------------------------------
        self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,
                               kernel_size=3, stride=stride, bias=False, padding=1)
        self.bn2 = nn.BatchNorm2d(width)
        # -----------------------------------------
        self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel * self.expansion,
                               kernel_size=1, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channel * self.expansion)
        # Use ReLU as the activation function
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
 
    # forward()：The forward propagation process is defined and the connections between the layers are described
    def forward(self, x):
        # The residual block retains the original input
        identity = x
        # In the case of a dashed residual structure, downsampling is performed
        if self.downsample is not None:
            identity = self.downsample(x)
 
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
 
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
 
        out = self.conv3(out)
        out = self.bn3(out)
        # The main branch and the shortcut branch data are added
        out += identity
        out = self.relu(out)
 
        return out
 
 
# Define ResNet class
class ResNet(nn.Module):
    # initialize the function
    def __init__(self,
                 block,
                 blocks_num,
                 num_classes=1000,
                 include_top=True,
                 groups=1,
                 width_per_group=64):
        super(ResNet, self).__init__()
        self.include_top = include_top
        # The maxpool has 64 output channels and 64 residual structure input channels
        self.in_channel = 64
 
        self.groups = groups
        self.width_per_group = width_per_group
 
        self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_channel)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        # Shallow stride=1, deep stride=2
        # block：Two types of residual modules are defined
        # block_num：The number of residual blocks in the module
        self.layer1 = self._make_layer(block, 64, blocks_num[0])
        self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
        self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
        self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)
        if self.include_top:
            # Adaptive average pooling, with specified outputs (H, W) and no change in the number of channels
            self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
            # Fully connected layer
            self.fc = nn.Linear(512 * block.expansion, num_classes)
        # Inherit nn. Module class, self.modules(), which returns all modules in the network
        for m in self.modules():
            # if convolution layer
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
 
    # Define the Residuals module, which consists of several residual blocks
    def _make_layer(self, block, channel, block_num, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channel * block.expansion))
 
        layers = []
        layers.append(block(self.in_channel,
                            channel,
                            downsample=downsample,
                            stride=stride,
                            groups=self.groups,
                            width_per_group=self.width_per_group))
        self.in_channel = channel * block.expansion
 
        for _ in range(1, block_num):
            layers.append(block(self.in_channel,
                                channel,
                                groups=self.groups,
                                width_per_group=self.width_per_group))
        # Sequential：Custom sequences are connected into models to generate network structures
        return nn.Sequential(*layers)
 
    # forward()：The forward propagation process is defined and the connections between the layers are described
    def forward(self, x):
        # Static layer
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        # Dynamic layers
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
 
        if self.include_top:
            x = self.avgpool(x)
            x = torch.flatten(x, 1)
            x = self.fc(x)
 
        return x

# ResNet50
def resnet50(num_classes=num_classes, include_top=True):
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)

# ResNet101
def resnet101(num_classes=num_classes, include_top=True):
    return ResNet(Bottleneck, [3, 4, 23, 3], num_classes=num_classes, include_top=include_top)

# ResNet152
def resnet152(num_classes=num_classes, include_top=True):
    return ResNet(Bottleneck, [3, 8, 36, 3], num_classes=num_classes, include_top=include_top)

In [None]:
# choose the model you want
model = resnet50(num_classes).to(device)
# design the optimizers
optimizer1 = optim.Adam(model.parameters(), lr=lr_rate)
optimizer2 = optim.SGD(model.parameters(), lr=lr_rate, momentum=0.9)

# Validate step
def validate(model, val_loader):
    # Set the model to evaluation mode
    model.eval()
    correct = 0
    total = 0
    # Use tqdm for a progress bar during training
    progress_bar = tqdm(enumerate(val_loader), total=len(val_loader))
    with torch.no_grad():
        for batch_idx, (images, labels) in progress_bar:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    # Calculate accuracy and print the result
    accuracy = correct / total
    print("\n Evaluation accuracy: {}".format(accuracy))
    return accuracy

# Test step
def test(model, test_loader):
    # Set the model to evaluation mode
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    # Calculate accuracy and print the result
    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

# Train step
def train(model, train_loader, val_loader, test_loader, num_epochs=5, optim = optimizer1):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim
    # Use a cyclic learning rate scheduler for adaptive learning rates
    scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=0.005, max_lr=0.05)
    trn_acc_hist = []
    val_acc_hist = []
    
    for epoch in range(num_epochs):
        total_loss = 0.0
        # Use tqdm for a progress bar during training
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f'Epoch {epoch+1}/{num_epochs}')
        for batch_idx, (images, labels) in progress_bar:
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()
            total_loss += loss.item()
            progress_bar.set_postfix({'Loss': total_loss / (batch_idx + 1)})
        
        # Print average loss at the end of each epoch
        print(f'Epoch {epoch+1}/{num_epochs}, Average Loss: {total_loss/len(train_loader)}')
        # Record training and validation accuracy history
        trn_acc_hist.append(validate(model, train_loader))
        # Validation is performed at the end of each epoch
        print("\n Evaluate on validation set...")
        val_acc_hist.append(validate(model, val_loader))

    # Test at the end of the training session
    print("\nTraining completed. Starting test evaluation.")
    test(model, test_loader)
    return trn_acc_hist, val_acc_hist

# Start training
print("Training started...")
trn_acc_hist, val_acc_hist = train(model, train_loader, val_loader, test_loader, epochs, optimizer2)

In [None]:
# Svae the trained model
torch.save(model.state_dict(),'./model_best.pth')

In [None]:
# Plot the training and validation accuracy histories
x = np.arange(epochs)
plt.figure()
plt.plot(x, trn_acc_hist)
plt.plot(x, val_acc_hist)
plt.legend(['Training', 'Validation'])
plt.xticks(x)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Classification')
plt.gcf().set_size_inches(10, 5)
# Save the plot as an image (PNG) with a specified DPI (dots per inch)
plt.savefig('classify.png', dpi=300)
plt.show()

In [None]:
# Get an enumeration of the test loader
examples = enumerate(test_loader)
batch_idx, (example_data, example_targets) = next(examples)
# Get predictions through the model
with torch.no_grad():
    example_data = example_data.to(device)
    output = model(example_data)
fig = plt.figure()

# Iterate over the first 9 examples in the batch
for i in range(9):
  plt.subplot(3,3,i+1)
  plt.tight_layout()
  plt.imshow(example_data[i][0].cpu())
  plt.title("Prediction: {}".format(output.data.max(1, keepdim=True)[1][i].item()))
  plt.xticks([])
  plt.yticks([])
plt.show()

In [None]:
# Add your image to predict the class
img = Image.open('xx.jpg').convert('RGB')
# Transform the image to fit the input type
x = transform(img).to(device)
pred_class = torch.argmax(model(x)).item()
plt.figure(figsize=(6,8))
plt.imshow(img)
plt.title('Predicted Class: %s' %pred_class)
plt.axis('off')
plt.show()