In [1]:
import copy
import time

import numpy as np
import shutil
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from torch.utils.data import DataLoader
from torchsummary import summary
from torchvision import datasets, transforms, models

from google.colab import drive

In [2]:
drive.mount("/content/drive")

Mounted at /content/drive


In [3]:
# shutil.copytree("/content/drive/MyDrive/data", "/content/data")

### Loading data

In [4]:
def create_validation_set(train_data, train_percentage, test_transform):

  # Split train data into train and validation sets
  num_train_examples = int(len(train_data) * train_percentage)
  num_valid_examples = len(train_data) - num_train_examples

  # Create 'Subset' objects
  train_data, valid_data = torch.utils.data.random_split(train_data, [num_train_examples, num_valid_examples])

  # print(f"Number training examples: {len(train_data)}")
  # print(f"Number validation examples: {len(valid_data)}")

  # Apply test transformations to the validation set
  valid_data = copy.deepcopy(valid_data) # If we change train transformations, this won't affect the validation set
  valid_data.dataset.transform = test_transform

  return train_data, valid_data

In [5]:
# create datasets
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

train_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Final Training Data', transform=train_transform)
train_dataset, valid_dataset = create_validation_set(train_dataset, 0.8, test_transform)
test_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/Vision/grape_dataset/test', transform=test_transform)

In [6]:
print(f"Number training examples: {len(train_dataset)}")
print(f"Number validation examples: {len(valid_dataset)}")
print(f"Number test examples: {len(test_dataset)}")

Number training examples: 9601
Number validation examples: 2401
Number test examples: 1805


In [7]:
input_size = tuple(train_dataset[0][0].shape)
print(f'Input size: {input_size}')

Input size: (3, 224, 224)


In [8]:
# create iterators
BATCH_SIZE = 64
NUM_WORKERS = 2
PIN_MEMORY = True

train_iterator = DataLoader(train_dataset, shuffle=True, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
valid_iterator = DataLoader(valid_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
test_iterator = DataLoader(test_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)

### Support functions

In [9]:
def calculate_accuracy(y_pred, y):
  '''
  Compute accuracy from ground-truth and predicted labels.

  Input
  ------
  y_pred: torch.Tensor [BATCH_SIZE, N_LABELS]
  y: torch.Tensor [BATCH_SIZE]

  Output
  ------
  acc: float
    Accuracy
  '''
  # y_prob = F.softmax(y_pred, dim = -1)
  y_pred = y_pred.argmax(dim=1, keepdim = True)
  correct = y_pred.eq(y.view_as(y_pred)).sum()
  acc = correct.float()/y.shape[0]
  return acc

In [10]:
def train(model, iterator, optimizer, criterion, device):
  epoch_loss = 0
  epoch_acc = 0

  # Train mode
  model.train()

  for (x,y) in iterator:
    x = x.to(device)
    y = y.to(device)
    # Set gradients to zero
    optimizer.zero_grad()

    # Make Predictions
    y_pred = model(x)

    # Compute loss
    loss = criterion(y_pred, y)

    # Compute accuracy
    acc = calculate_accuracy(y_pred, y)

    # Backprop
    loss.backward()

    # Apply optimizer
    optimizer.step()

    # Extract data from loss and accuracy
    epoch_loss += loss.item()
    epoch_acc += acc.item()

  return epoch_loss/len(iterator), epoch_acc/len(iterator)

In [11]:
def evaluate(model, iterator, criterion, device):
  epoch_loss = 0
  epoch_acc = 0

  # Evaluation mode
  model.eval()

  # Do not compute gradients
  with torch.no_grad():

    for(x,y) in iterator:

      x = x.to(device)
      y = y.to(device)

      # Make Predictions
      y_pred = model(x)

      # Compute loss
      loss = criterion(y_pred, y)

      # Compute accuracy
      acc = calculate_accuracy(y_pred, y)

      # Extract data from loss and accuracy
      epoch_loss += loss.item()
      epoch_acc += acc.item()

  return epoch_loss/len(iterator), epoch_acc/len(iterator)

In [12]:
def model_training(n_epochs, model, train_iterator, valid_iterator, optimizer, criterion, device, model_name='best_model.pt'):

  # Initialize validation loss
  best_valid_loss = float('inf')

  # Save output losses, accs
  train_losses = []
  train_accs = []
  valid_losses = []
  valid_accs = []

  # Loop over epochs
  for epoch in range(n_epochs):
    start_time = time.time()
    # Train
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion, device)
    # Validation
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion, device)
    # Save best model
    if valid_loss < best_valid_loss:
      best_valid_loss = valid_loss
      # Save model
      torch.save(model.state_dict(), model_name)
    end_time = time.time()

    print(f"\nEpoch: {epoch+1}/{n_epochs} -- Epoch Time: {end_time-start_time:.2f} s")
    print("---------------------------------")
    print(f"Train -- Loss: {train_loss:.3f}, Acc: {train_acc * 100:.2f}%")
    print(f"Val -- Loss: {valid_loss:.3f}, Acc: {valid_acc * 100:.2f}%")

    # Save
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    valid_losses.append(valid_loss)
    valid_accs.append(valid_acc)

  return train_losses, train_accs, valid_losses, valid_accs

In [13]:
def model_training_patience(n_epochs, model, train_iterator, valid_iterator, optimizer, criterion, device, patience, model_name='best_model.pt'):

  # Initialize validation loss
  best_valid_loss = float('inf')
  best_epoch = 0

  # Save output losses, accs
  train_losses = []
  train_accs = []
  valid_losses = []
  valid_accs = []

  # Loop over epochs
  for epoch in range(n_epochs):
    start_time = time.time()
    # Train
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion, device)
    # Validation
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion, device)
    # Save best model
    if valid_loss < best_valid_loss:
      best_valid_loss = valid_loss
      best_epoch = epoch
      # Save model
      torch.save(model.state_dict(), model_name)
      print(f"\nModel checkpoint saved at epoch {epoch}\n")
    else:
        if patience is not None and epoch - best_epoch > patience:
            # stopping training
            print(f"\nEarly stopping at epoch {epoch}\n")
            break
    end_time = time.time()

    print(f"\nEpoch: {epoch+1}/{n_epochs} -- Epoch Time: {end_time-start_time:.2f} s")
    print("---------------------------------")
    print(f"Train -- Loss: {train_loss:.3f}, Acc: {train_acc * 100:.2f}%")
    print(f"Val -- Loss: {valid_loss:.3f}, Acc: {valid_acc * 100:.2f}%")

    # Save
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    valid_losses.append(valid_loss)
    valid_accs.append(valid_acc)

    model.to(device)

  return train_losses, train_accs, valid_losses, valid_accs

In [14]:
def plot_results(n_epochs, train_losses, train_accs, valid_losses, valid_accs):
  N_EPOCHS = n_epochs
  # Plot results
  plt.figure(figsize=(20, 6))
  _ = plt.subplot(1,2,1)
  plt.plot(np.arange(N_EPOCHS)+1, train_losses, linewidth=3)
  plt.plot(np.arange(N_EPOCHS)+1, valid_losses, linewidth=3)
  _ = plt.legend(['Train', 'Validation'])
  plt.grid('on'), plt.xlabel('Epoch'), plt.ylabel('Loss')

  _ = plt.subplot(1,2,2)
  plt.plot(np.arange(N_EPOCHS)+1, train_accs, linewidth=3)
  plt.plot(np.arange(N_EPOCHS)+1, valid_accs, linewidth=3)
  _ = plt.legend(['Train', 'Validation'])
  plt.grid('on'), plt.xlabel('Epoch'), plt.ylabel('Accuracy')

In [15]:
def model_testing(model, test_iterator, criterion, device, model_name='best_model.pt'):
  # Load best weights
  model.load_state_dict(torch.load(model_name))
  # Evaluate model
  test_loss, test_acc = evaluate(model, test_iterator, criterion, device)
  # Print results
  print(f"Test -- Loss: {test_loss:.3f}, Acc: {test_acc * 100:.2f} %")

In [16]:
def predict(model, iterator, device):

  # Evaluation mode
  model.eval()

  labels = []
  pred = []

  with torch.no_grad():
    for (x, y) in iterator:
      x = x.to(device)
      y_pred = model(x)

      # Get label with highest score
      y_prob = F.softmax(y_pred, dim = -1)
      top_pred = y_prob.argmax(1, keepdim=True)

      labels.append(y.cpu())
      pred.append(top_pred.cpu())

  labels = torch.cat(labels, dim=0)
  pred = torch.cat(pred, dim=0)

  return labels, pred

In [17]:
def print_report(model, test_iterator, device):
  labels, pred = predict(model, test_iterator, device)
  print(confusion_matrix(labels, pred))
  print("\n")
  print(classification_report(labels, pred))

### Baseline CNN model

In [20]:
class DenseNet121(nn.Module):
    def __init__(self, output_dim):
        super().__init__()
        self.dense_net = models.densenet121(pretrained=True)
        self.dense_net.classifier = nn.Identity()
        self.fc = nn.Linear(self.dense_net.classifier.in_features, output_dim)

    def forward(self, x):
        x = self.dense_net(x)
        x = x.view(x.shape[0], -1)
        x = self.fc(x)
        return x



In [21]:
def conv_block(in_channels, out_channels, kernel_size, padding, pool_size):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=padding),
              nn.ReLU(inplace=True),
              nn.BatchNorm2d(out_channels),
              nn.MaxPool2d(pool_size),
              ]

    return nn.Sequential(*layers)

In [22]:
def count_parameters(model):
  return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [35]:
!pip install hiddenlayer

Collecting hiddenlayer
  Downloading hiddenlayer-0.3-py3-none-any.whl (19 kB)
Installing collected packages: hiddenlayer
Successfully installed hiddenlayer-0.3


In [45]:
# Import torch and other modules
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define the CNN model class
class BaselineCNN(nn.Module):
    def __init__(self, in_channels, output_dim):
        super(BaselineCNN, self).__init__()

        self.model_name = 'OurBaselineCNN'

        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, 16, 3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
        )

        self.classifier = nn.Sequential(
            nn.Linear(64 * 14 * 14, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(512, output_dim),
        )


    def forward(self, x):
        out = self.conv(x)
        out = out.view(out.size(0), -1)
        out = self.classifier(out)

        return out

# Create an instance of the model
model = BaselineCNN(in_channels=3, output_dim=4)

# Save the model as a file
torch.save(model, "cnn_model.pth")

# Install Netron using pip
!pip install netron

# Launch Netron and open the model file in a web browser
import netron
netron.start("cnn_model.pth")


Serving 'cnn_model.pth' at http://localhost:17607


('localhost', 17607)

In [40]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device used: {device}')

Device used: cuda


In [41]:
INPUT_CHANNELS = input_size[0]
OUTPUT_DIM = 4 # number of classes

criterion = nn.CrossEntropyLoss() # Softmax + CrossEntropy
criterion = criterion.to(device)

baseline_cnn = BaselineCNN(in_channels=INPUT_CHANNELS, output_dim=OUTPUT_DIM)
baseline_cnn = baseline_cnn.to(device)

In [42]:
print(f"The model has {count_parameters(baseline_cnn):,} trainable parameters.")
print(summary(baseline_cnn, input_size=input_size))

The model has 6,485,956 trainable parameters.
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 16, 224, 224]             448
       BatchNorm2d-2         [-1, 16, 224, 224]              32
              ReLU-3         [-1, 16, 224, 224]               0
         MaxPool2d-4         [-1, 16, 112, 112]               0
            Conv2d-5         [-1, 32, 112, 112]           4,640
       BatchNorm2d-6         [-1, 32, 112, 112]              64
              ReLU-7         [-1, 32, 112, 112]               0
         MaxPool2d-8           [-1, 32, 56, 56]               0
            Conv2d-9           [-1, 64, 56, 56]          18,496
      BatchNorm2d-10           [-1, 64, 56, 56]             128
             ReLU-11           [-1, 64, 56, 56]               0
        MaxPool2d-12           [-1, 64, 28, 28]               0
           Conv2d-13           [-1, 64, 28, 28]          

In [43]:
optimizer = optim.Adam(baseline_cnn.parameters(),
                       # lr=5e-4
                       )

In [44]:
N_EPOCHS = 25
patience = 8
train_losses, train_accs, valid_losses, valid_accs = model_training_patience(N_EPOCHS,
                                                                    baseline_cnn,
                                                                    train_iterator,
                                                                    valid_iterator,
                                                                    optimizer,
                                                                    criterion,
                                                                    device,
                                                                    patience=patience,
                                                                    model_name='baseline_CNN.pt'
                                                                    )

plot_results(N_EPOCHS, train_losses, train_accs, valid_losses, valid_accs)

KeyboardInterrupt: 

### Recreate datasets (to use pretrained models on ImageNet)

In [None]:
mean_imagenet = [0.485, 0.456, 0.406]
std_imagenet = [0.229, 0.224, 0.225]

In [None]:
train_transform_imagenet = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean_imagenet, std_imagenet)
])

test_transform_imagenet = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean_imagenet, std_imagenet)
])

train_dataset_imagenet = datasets.ImageFolder(root='/content/drive/MyDrive/Final Training Data', transform=train_transform_imagenet)
train_dataset_imagenet, valid_dataset_imagenet = create_validation_set(train_dataset_imagenet, 0.8, test_transform_imagenet)
test_dataset_imagenet = datasets.ImageFolder(root='/content/drive/MyDrive/Vision/grape_dataset/test', transform=test_transform_imagenet)

In [None]:
# create iterators
BATCH_SIZE = 64
NUM_WORKERS = 2
PIN_MEMORY = True

train_iterator_imagenet = DataLoader(train_dataset_imagenet, shuffle=True, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
valid_iterator_imagenet = DataLoader(valid_dataset_imagenet, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
test_iterator_imagenet = DataLoader(test_dataset_imagenet, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)

### DenseNet121

Rewrite

In [None]:
import torch
from torch import nn, optim
from torchvision import models, datasets, transforms
from torch.utils.data import DataLoader

# Define constants
OUTPUT_DIM = 4  # Change this to your number of classes
BATCH_SIZE = 64
NUM_WORKERS = 2
PIN_MEMORY = True
N_EPOCHS = 25
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transforms
mean_imagenet = [0.485, 0.456, 0.406]
std_imagenet = [0.229, 0.224, 0.225]

train_transform_imagenet = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean_imagenet, std_imagenet)
])

test_transform_imagenet = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean_imagenet, std_imagenet)
])

# Load datasets
train_dataset_imagenet = datasets.ImageFolder(root='/content/drive/MyDrive/Final Training Data', transform=train_transform_imagenet)
test_dataset_imagenet = datasets.ImageFolder(root='/content/drive/MyDrive/Vision/grape_dataset/test', transform=test_transform_imagenet)

# Create data loaders
train_iterator_imagenet = DataLoader(train_dataset_imagenet, shuffle=True, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
test_iterator_imagenet = DataLoader(test_dataset_imagenet, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)

# Define the model
class DenseNet121(nn.Module):
    def __init__(self, output_dim):
        super().__init__()
        self.dense_net = models.densenet121(pretrained=True)
        num_features = self.dense_net.classifier.in_features
        self.dense_net.classifier = nn.Identity()
        self.fc = nn.Linear(num_features, output_dim)

    def forward(self, x):
        x = self.dense_net(x)
        x = x.view(x.shape[0], -1)
        x = self.fc(x)
        return x

dense_net121_extract = DenseNet121(OUTPUT_DIM).to(device)

# Define the optimizer (added L2 regularization)
optimizer = optim.Adam(dense_net121_extract.parameters(), weight_decay=0.01)

# Define the loss function
criterion = nn.CrossEntropyLoss()

# Training function
def model_training(N_EPOCHS, model, train_iterator, valid_iterator, optimizer, criterion, device, model_name='model.pt'):
    best_valid_loss = float('inf')
    for epoch in range(N_EPOCHS):
        # Training
        model.train()
        train_loss = 0
        valid_loss = 0
        for inputs, labels in train_iterator:
            optimizer.zero_grad()
            inputs, labels = inputs.to(device), labels.to(device)
            predictions = model(inputs)
            loss = criterion(predictions, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        # Validation
        model.eval()
        with torch.no_grad():
            for inputs, labels in valid_iterator:
                inputs, labels = inputs.to(device), labels.to(device)
                predictions = model(inputs)
                loss = criterion(predictions, labels)
                valid_loss += loss.item()

        print(f'Epoch: {epoch+1}/{N_EPOCHS}.. Training Loss: {train_loss/len(train_iterator):.3f}.. Validation Loss: {valid_loss/len(valid_iterator):.3f}')

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), model_name)

    return model

# Train the model
model_training(N_EPOCHS, dense_net121_extract, train_iterator_imagenet, test_iterator_imagenet, optimizer, criterion, device, model_name='dense_net121.pt')
