In [None]:
#IMPORTS
import os
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from google.colab import files
from google.colab import drive

# DEVICE CONFIGURATION
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#DATASET DOWNLOAD and CNN IMPORT
!pip install -q kaggle
upload = files.upload()

# The Kaggle API client expects this file to be in ~/.kaggle,
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

# This permissions change avoids a warning on Kaggle tool startup.
!chmod 600 ~/.kaggle/kaggle.json

# Download and unzip dataset and sort data
!kaggle datasets download -d dev523/leaf-disease-detection-dataset
!unzip -qq /content/leaf-disease-detection-dataset.zip

Saving kaggle.json to kaggle.json
Downloading leaf-disease-detection-dataset.zip to /content
100% 1.33G/1.34G [00:16<00:00, 143MB/s]
100% 1.34G/1.34G [00:16<00:00, 86.0MB/s]


In [None]:
#LOAD DATASETS (images and labels from folder names)
train_dir = r'/content/dataset/train'
test_dir  = r'/content/dataset/test'

train_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

test_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor()])

full_train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
test_dataset = datasets.ImageFolder(test_dir, transform=test_transforms)
labels,labeldict = full_train_dataset.find_classes(train_dir)

#create validation set
train_dataset, val_dataset = torch.utils.data.random_split(full_train_dataset, [0.7, 0.3])


trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
valloader = torch.utils.data.DataLoader(val_dataset, batch_size=64, shuffle=True)
testloader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=True)

In [None]:
# RESNET-50 ARCHITECTURE

#Re-usable Residual Block that skips connections
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
      super(ResidualBlock, self).__init__()
      self.conv1 = nn.Sequential(
          nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
          nn.BatchNorm2d(out_channels),
          nn.ReLU())
      self.conv2 = nn.Sequential(
          nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
          nn.BatchNorm2d(out_channels))
      self.downsample = downsample
      self.relu = nn.ReLU()
      self.out_channels = out_channels

    def forward(self, x):
      residual = x
      out = self.conv1(x)
      out = self.conv2(out)
      if self.downsample:
          residual = self.downsample(x)
      out += residual
      out = self.relu(out)
      return out

#CNN architecture
class ResNet50(nn.Module):
    def __init__(self, block, layers, num_classes = 38):
      super(ResNet50, self).__init__()
      self.inplanes = 64
      self.conv1 = nn.Sequential(
          nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3),
          nn.BatchNorm2d(64),
          nn.ReLU())
      self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
      self.layer0 = self._make_layer(block, 64, layers[0], stride = 1)
      self.layer1 = self._make_layer(block, 128, layers[1], stride = 2)
      self.layer2 = self._make_layer(block, 256, layers[2], stride = 2)
      self.layer3 = self._make_layer(block, 512, layers[3], stride = 2)
      self.avgpool = nn.AvgPool2d(7, stride=1)
      self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1): #helper function to create layers for adding
      downsample = None
      if stride != 1 or self.inplanes != planes:
        downsample = nn.Sequential(
          nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
          nn.BatchNorm2d(planes))
      layers = []
      layers.append(block(self.inplanes, planes, stride, downsample))
      self.inplanes = planes
      for i in range(1, blocks):
          layers.append(block(self.inplanes, planes))
      return nn.Sequential(*layers)


    def forward(self, x):
      x = self.conv1(x)
      x = self.maxpool(x)
      x = self.layer0(x)
      x = self.layer1(x)
      x = self.layer2(x)
      x = self.layer3(x)
      x = self.avgpool(x)
      x = x.view(x.size(0), -1)
      x = self.fc(x)

      return x

In [None]:
# RESNET-152 ARCHITECTURE

import torch.nn.functional as F

class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))

        x = self.relu(self.batch_norm2(self.conv2(x)))

        x = self.conv3(x)
        x = self.batch_norm3(x)

        #downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        #add identity
        x+=identity
        x=self.relu(x)

        return x

class Block(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Block, self).__init__()


        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
      identity = x.clone()

      x = self.relu(self.batch_norm2(self.conv1(x)))
      x = self.batch_norm2(self.conv2(x))

      if self.i_downsample is not None:
          identity = self.i_downsample(identity)
      print(x.shape)
      print(identity.shape)
      x += identity
      x = self.relu(x)
      return x

class ResNet152(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes=38, num_channels=3):
        super(ResNet152, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*ResBlock.expansion, num_classes)

    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []

        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes*ResBlock.expansion)
            )

        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion

        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))

        return nn.Sequential(*layers)

In [None]:
#MODEL PARAMETERS AND SET UP
num_epochs = 5
model = ResNet50(ResidualBlock, [3, 4, 6, 3]).to(device)
#model = ResNet152(Bottleneck, [3,8,36,3]).to(device)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2, weight_decay = 0.001, momentum = 0.9)

In [None]:
#MODEL TRAINING
import gc
total_step = len(trainloader)

avg_train_losses = []
avg_val_losses = []
ep_train_losses = []
ep_val_losses = []
accs = []

for epoch in range(num_epochs):

  print('Starting Epoch [{}/{}] ...'.format(epoch+1, num_epochs,))
  train_epoch_loss = 0.0
  for i, (images, labels) in enumerate(trainloader):
    # Move tensors to the configured device
    images = images.to(device)
    labels = labels.to(device)

    # Forward pass
    outputs = model(images)
    train_loss = criterion(outputs, labels)

    # Backward and optimize
    optimizer.zero_grad()
    train_loss.backward()
    optimizer.step()

    # Calculate loss
    train_epoch_loss += outputs.shape[0] * train_loss.item()

    del images, labels, outputs
    torch.cuda.empty_cache()
    gc.collect()


  avg_train_losses.append(train_loss.item())
  ep_train_losses.append(train_epoch_loss/len(trainloader.dataset))

  print('Epoch [{}/{}], Training Loss: {:.4f}'.format(epoch+1, num_epochs, train_epoch_loss/len(trainloader.dataset)))

    # Validation
  with torch.no_grad():
    correct = 0
    total = 0
    val_epoch_loss = 0.0
    for images, labels in valloader:
      images = images.to(device)
      labels = labels.to(device)
      outputs = model(images)
      val_loss = criterion(outputs, labels)
      _, predicted = torch.max(outputs.data, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()
      val_epoch_loss += outputs.shape[0] * val_loss.item()
      del images, labels, outputs

    avg_val_losses.append(val_loss.item())
    ep_val_losses.append(val_epoch_loss/len(valloader.dataset))
    accs.append(100 * correct / total)
    print('Epoch [{}/{}] done, Validation Loss: {:.4f}'.format(epoch+1, num_epochs, val_epoch_loss/len(valloader.dataset)))
    print('Accuracy of the network on the {} validation images: {} %'.format(len(valloader.dataset), 100 * correct / total))

Starting Epoch [1/5] ...


In [None]:
#MODEL TESTING

with torch.no_grad():
  correct = 0
  total = 0
  for images, labels in testloader:
      images = images.to(device)
      labels = labels.to(device)
      outputs = model(images)
      _, predicted = torch.max(outputs.data, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()
      del images, labels, outputs

  print('Accuracy of the network on the {} test images: {} %'.format(len(testloader), 100 * correct / total))

In [None]:
#PLOTTING LOSS FOR EACH BATCH

fig, (ax1, ax2) = plt.subplots(1, 2)

ax1.plot(range(1,(num_epochs+1)), ep_train_losses, label='Training')
ax1.plot(range(1,(num_epochs+1)), ep_val_losses, label='Validation')
ax1.set_title('Standard Loss per Epoch')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss (Standardized)')
ax1.set_xticks(range(1,(num_epochs+1)))
ax1.set_xlim(1, num_epochs)
ax1.legend()

ax2.plot(range(1,(num_epochs+1)), accs)
ax2.set_title('Validation Accuracy per Epoch')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('Accuracy (%)')
ax2.set_xticks(range(1,(num_epochs+1)))
ax2.set_xlim(1, num_epochs)
ax1.grid()
ax2.grid()

fig.tight_layout()