In [None]:
import torch
from torchvision import datasets, transforms, utils
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import seaborn as sns
import pandas as pd
from torch.utils.data import random_split
import time
from datetime import datetime

In [None]:
# Check if device supports CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("using: " + str(device))

For reference:

tensor image structure = (Batch(size), Channels(count), Height, Width)

Image information:

dimension: 300 x 300
bit depth: 24 bit (rgb : 3 x 8 bits)

In [None]:
def get_img_mean_std(loader) -> float:
    """
    Generates an approximation of the mean and std deviation for a given dataset of images.
    Approximations are much, much faster to calculate than exact values and shouldn't be too far off.

    :param loader: dataset to have calculated
    :return: mean, std deviation
    """

    mean = 0.
    std = 0.
    total_image_count = 0

    for imgs, _ in loader:
        batch_img_count = imgs.size(0)
        imgs = imgs.view(batch_img_count, imgs.size(1), -1)
        mean += imgs.mean(2).sum(0)
        std += imgs.std(2).sum(0)
        total_image_count += batch_img_count

    mean /= total_image_count
    std /= total_image_count

    return mean, std

In [None]:
# define transformation
initial_train_transform = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.ToTensor()
])

In [None]:
desired_batch_size = 7

In [None]:
# import dataset
original_train_dataset = datasets.ImageFolder("traindata", transform=initial_train_transform)

# create loader for generating mean/std deviation
full_train_loader = torch.utils.data.DataLoader(original_train_dataset, batch_size=desired_batch_size, shuffle=False, num_workers=2)

# show what classes have been identified
classes = original_train_dataset.classes
print("Classes:", classes)

In [None]:
# train_mean, train_std = get_img_mean_std(full_train_loader)
# print("mean: " + str(train_mean) + ", std: " + str(train_std))
# output: mean: tensor([0.5474, 0.4110, 0.3391]), std: tensor([0.2301, 0.2384, 0.2308])

# already calculated once so no need to do every time
train_mean = torch.tensor([0.5474, 0.4110, 0.3391])
train_std = torch.tensor([0.2301, 0.2384, 0.2308])

In [None]:
# define transformations
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(45),
    transforms.RandomResizedCrop(size=(150, 150), scale=(0.4, 0.9)),
    transforms.ToTensor(),
    transforms.Normalize(train_mean, train_std)
])

# used for validation dataset as well
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(train_mean, train_std)
])

In [None]:
# set new transform
train_dataset = datasets.ImageFolder("traindata", transform=train_transform)

# create validation set for better training (and because I don't have the test set)
train_size = 3600
val_size = 900
# 3600, 900 <- 80/20 split

train_set, val_set = random_split(original_train_dataset, [train_size, val_size], generator=torch.Generator().manual_seed(7))


# create loaders for train and val sets
train_loader = torch.utils.data.DataLoader(train_set, batch_size=desired_batch_size, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=desired_batch_size, shuffle=False, num_workers=2)

In [None]:
def imshow(img):
    plt.rcParams["figure.figsize"] = (20, 20)
    plt.rcParams['figure.dpi'] = 100
    plt.grid(visible=None)
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(train_loader)
images, labels = dataiter.next()

# show images
imshow(utils.make_grid(images))

print(' '.join(f'{classes[labels[j]]:5s}' for j in range(desired_batch_size)))

First I need to try a simple MLP model

In [None]:
# useful for if I want to disable MLP section when repeatedly re-running for CNN
enable_mlp_section = False

In [None]:
if enable_mlp_section:
    # define our MLP
    class MLP(nn.Module):
        def __init__(self):
            super().__init__()
            self.fc1 = nn.Linear(in_features=270000, out_features=1000)
            self.fc2 = nn.Linear(in_features=1000, out_features=1000)
            self.fc3 = nn.Linear(in_features=1000, out_features=800)
            self.fc4 = nn.Linear(in_features=800, out_features=800)
            self.fc5 = nn.Linear(in_features=800, out_features=3)

        def forward(self, x):
            x = torch.flatten(x, 1) # flatten all dimensions except batch
            x = F.relu(self.fc1(x))
            x = F.relu(self.fc2(x))
            x = F.relu(self.fc3(x))
            x = F.relu(self.fc4(x))
            x = self.fc5(x)
            return x

    mlp_model = MLP()
    mlp_model.to(device)

In [None]:
if enable_mlp_section:
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(mlp_model.parameters(), lr=0.001, momentum=0.8)

In [None]:
if enable_mlp_section:
    # iterate over the dataset
    best_val_acc = 0.0
    epoch_used = 0
    train_time = 0

    # stop after this many epochs with no validation improvement
    early_stop_count = 8
    epochs_with_no_improv = 0

    mlp_train_history = pd.DataFrame(columns=["train_loss", "train_accuracy", "val_loss", "val_accuracy"])

    for epoch in range(100):
        # train the model
        mlp_model.train()

        train_accuracy = 0.0
        train_loss = 0.0
        total = 0

        for i, data in enumerate(train_loader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = mlp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, prediction = torch.max(outputs.data, 1)
            total += labels.size(0)
            train_accuracy += int(torch.sum(prediction == labels.data))

        train_accuracy = train_accuracy / total
        train_loss = train_loss / total

        # evaluate the model
        mlp_model.eval()

        val_accuracy = 0.0
        val_loss = 0.0
        total = 0

        for i, data in enumerate(val_loader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)

            # Forward Pass
            outputs = mlp_model(inputs)
            # Find the Loss
            loss = criterion(outputs, labels)
            # Calculate Loss
            val_loss += loss.item()
            _, prediction = torch.max(outputs.data, 1)
            total += labels.size(0)
            val_accuracy += int(torch.sum(prediction == labels.data))

        val_accuracy = val_accuracy / total
        val_loss = val_loss / total

        print("Epoch: " + str(epoch+1) + "\n" +
              "Train loss: " + str(train_loss) + ", Train accuracy: " + str(train_accuracy) + "\n" +
              "Val loss: " + str(val_loss) + ", Val Accuracy: " + str(val_accuracy) + "\n")

        mlp_train_history.loc[len(mlp_train_history)] = [train_loss, train_accuracy, val_loss, val_accuracy]

        if val_accuracy > best_val_acc:
            print("Validation accuracy improved: " + str(best_val_acc) + " --> " + str(val_accuracy))
            print("Saving model \n")
            epoch_used = epoch
            best_val_acc = val_accuracy
            torch.save(mlp_model.state_dict(), "mlp_model.pth")
            epochs_with_no_improv = 0
        else:
            epochs_with_no_improv += 1
            print(epochs_with_no_improv)

        if epochs_with_no_improv >= early_stop_count:
            print("Stopping early, no validation improvement in " + str(early_stop_count) + " epochs \n")
            break

    print("Finished Training")

In [None]:
if enable_mlp_section:
    x = range(len(mlp_train_history))

    # plot params
    plt.rcParams["figure.figsize"] = (5, 5)
    plt.rcParams['figure.dpi'] = 100
    plt.grid(visible=None)

    plt.plot(x, mlp_train_history["train_loss"].to_numpy(), label="Train")
    plt.plot(x, mlp_train_history["val_loss"].to_numpy(), label="Validation")
    plt.axvline(x=epoch_used, label="Selected point", color="g", linestyle="--")

    plt.title("MLP Loss", fontsize=18)
    plt.xlabel("Epochs", fontsize=14)
    plt.legend()

    plt.savefig("mlp_loss.png")
    plt.show()


In [None]:
if enable_mlp_section:
    x = range(len(mlp_train_history))

    # plot params
    plt.rcParams["figure.figsize"] = (5, 5)
    plt.rcParams['figure.dpi'] = 100
    plt.grid(visible=None)

    plt.plot(x, mlp_train_history["train_accuracy"].to_numpy(), label="Train")
    plt.plot(x, mlp_train_history["val_accuracy"].to_numpy(), label="Validation")
    plt.axvline(x=epoch_used, label="Selected point", color="g", linestyle="--")

    plt.title("MLP Accuracy", fontsize=18)
    plt.xlabel("Epochs", fontsize=14)
    plt.legend()

    plt.savefig("mlp_accuracy.png")
    plt.show()


In [None]:
if enable_mlp_section:
    # load the model from file
    mlp_model = MLP()
    mlp_model.load_state_dict(torch.load("mlp_model.pth"))

In [None]:
if enable_mlp_section:
    outputs = mlp_model(images)
    _, predicted = torch.max(outputs, 1)

    print('Predicted: ', ' '.join(f'{classes[predicted[j]]:5s}' for j in range(desired_batch_size)))

In [None]:
if enable_mlp_section:
    correct = 0
    total = 0
    # since we're not training, we don't need to calculate the gradients for our outputs
    with torch.no_grad():
        for data in val_loader:
            images, labels = data
            # calculate outputs by running images through the network
            outputs = mlp_model(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f'Accuracy of the network on the 900 validation images: {100 * correct // total} %')

Accuracy of the network on the 900 validation images: 54 %

In [None]:
if enable_mlp_section:
    # prepare to count predictions for each class
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}

    # for confusion matrix
    results = pd.DataFrame(columns=["true", "pred"])

    # again no gradients needed
    with torch.no_grad():
        for data in val_loader:
            images, labels = data
            outputs = mlp_model(images)
            _, predictions = torch.max(outputs, 1)
            # collect the correct predictions for each class
            for label, prediction in zip(labels, predictions):
                results.loc[len(results)] = [str(label), str(prediction)]

                if label == prediction:
                    correct_pred[classes[label]] += 1
                total_pred[classes[label]] += 1


    # print accuracy for each class
    for classname, correct_count in correct_pred.items():
        accuracy = 100 * float(correct_count) / total_pred[classname]
        print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')

Accuracy for class: cherry is 47.1 %
Accuracy for class: strawberry is 54.2 %
Accuracy for class: tomato is 62.4 %

In [None]:
def show_confusion_matrix(true, pred, title):
    cf_matrix = confusion_matrix(true, pred)

    df_cm = pd.DataFrame(cf_matrix/np.sum(cf_matrix) * 10, index = [i for i in classes], columns = [i for i in classes])

    fig, ax = plt.subplots()
    sns.heatmap(df_cm, ax=ax, annot=True)
    fig.set_size_inches(12, 8)
    ax.set_xlabel("Predicted")
    ax.set_ylabel("Ground truth")
    ax.set_title(title)

#plt.savefig('output.png')

In [None]:
#results.info()

In [None]:
if enable_mlp_section:
    show_confusion_matrix(results.true, results.pred, "MLP cherry Confusion Matrix")

In [None]:
# takes 'inspiration' from vgg architecture

# define our CNN
class CNN(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv5 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        self.conv6 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv7 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU()
        )

        self.conv8 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU()
        )

        self.conv9 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv10 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )

        self.conv11 = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )

        self.conv12 = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.fc1 = nn.Sequential(
            nn.Linear(in_features=20736, out_features=4096),
            nn.Dropout(p=0.5)
        )

        self.fc2 = nn.Sequential(
            nn.Linear(in_features=4096, out_features=4096),
            nn.Dropout(p=0.5)
        )

        self.fc3 = nn.Linear(in_features=4096, out_features=3)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = self.conv6(x)
        x = self.conv7(x)
        x = self.conv8(x)
        x = self.conv9(x)
        x = self.conv10(x)
        x = self.conv11(x)
        x = self.conv12(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

cnn_model = CNN()
cnn_model.to(device)

In [None]:
# define the loss function
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(cnn_model.parameters(), lr=0.001, momentum=0.5)

In [None]:
model_save_path = "model.pth"

In [None]:
# iterate over the dataset
best_val_acc = 0.0
epoch_used = 0
training_start_time = datetime.now()  # millis

# stop after this many epochs with no validation improvement
early_stop_count = 25
epochs_with_no_improv = 0

cnn_train_history = pd.DataFrame(columns=["train_loss", "train_accuracy", "val_loss", "val_accuracy"])

for epoch in range(200):
    # train the model
    cnn_model.train()

    train_accuracy = 0.0
    train_loss = 0.0
    total = 0

    for i, data in enumerate(train_loader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = cnn_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, prediction = torch.max(outputs.data, 1)
        total += labels.size(0)
        train_accuracy += int(torch.sum(prediction == labels.data))

    train_accuracy = train_accuracy / total
    train_loss = train_loss / total

    # evaluate the model
    cnn_model.eval()

    val_accuracy = 0.0
    val_loss = 0.0
    total = 0

    for i, data in enumerate(val_loader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)

        # Forward Pass
        outputs = cnn_model(inputs)
        # Find the Loss
        loss = criterion(outputs, labels)
        # Calculate Loss
        val_loss += loss.item()
        _, prediction = torch.max(outputs.data, 1)
        total += labels.size(0)
        val_accuracy += int(torch.sum(prediction == labels.data))

    val_accuracy = val_accuracy / total
    val_loss = val_loss / total

    print("Epoch: " + str(epoch+1) + "\n" +
          "Train loss: " + str(train_loss) + ", Train accuracy: " + str(train_accuracy) + "\n" +
          "Val loss: " + str(val_loss) + ", Val Accuracy: " + str(val_accuracy) + "\n")

    cnn_train_history.loc[len(cnn_train_history)] = [train_loss, train_accuracy, val_loss, val_accuracy]

    if val_accuracy > best_val_acc:
        print("Validation accuracy improved: " + str(best_val_acc) + " --> " + str(val_accuracy))
        print("Saving model \n")
        epoch_used = epoch
        best_val_acc = val_accuracy
        torch.save(cnn_model.state_dict(), model_save_path)
        epochs_with_no_improv = 0
    else:
        epochs_with_no_improv += 1

    if epochs_with_no_improv >= early_stop_count:
        print("Stopping early, no validation improvement in " + str(early_stop_count) + " epochs \n")
        break

In [None]:
train_time = (time.time() * 1000) - training_start_time

print("Finished Training")
print("Took " + train_time + "minutes")

In [None]:
x = range(len(cnn_train_history))

# plot params
plt.rcParams["figure.figsize"] = (5, 5)
plt.rcParams['figure.dpi'] = 100
plt.grid(visible=None)

plt.plot(x, cnn_train_history["train_loss"].to_numpy(), label="Train")
plt.plot(x, cnn_train_history["val_loss"].to_numpy(), label="Validation")
plt.axvline(x=epoch_used, label="Selected point", color="g", linestyle="--")

plt.title("CNN Loss", fontsize=18)
plt.xlabel("Epochs", fontsize=14)
plt.legend()

plt.savefig("cnn_loss.png")
plt.show()

In [None]:
x = range(len(cnn_train_history))

# plot params
plt.rcParams["figure.figsize"] = (5, 5)
plt.rcParams['figure.dpi'] = 100
plt.grid(visible=None)

plt.plot(x, cnn_train_history["train_accuracy"].to_numpy(), label="Train")
plt.plot(x, cnn_train_history["val_accuracy"].to_numpy(), label="Validation")
plt.axvline(x=epoch_used, label="Selected point", color="g", linestyle="--")

plt.title("CNN Accuracy", fontsize=18)
plt.xlabel("Epochs", fontsize=14)
plt.legend()

plt.savefig("cnn_accuracy.png")
plt.show()

Now for testing

In [None]:
dataiter = iter(val_loader)
images, labels = dataiter.next()

# print images
imshow(utils.make_grid(images))
print(' '.join(f'{classes[labels[j]]:5s}' for j in range(desired_batch_size)))

In [None]:
# load the model from file
cnn_model = CNN()
cnn_model.load_state_dict(torch.load(model_save_path))
cnn_model.eval()

In [None]:
outputs = cnn_model(images)

In [None]:
_, predicted = torch.max(outputs, 1)

print('Predicted: ', ' '.join(f'{classes[predicted[j]]:5s}'
                              for j in range(desired_batch_size)))

In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in val_loader:
        images, labels = data
        # calculate outputs by running images through the network
        outputs = cnn_model(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the 900 validation images: {100 * correct // total} %')

output: Accuracy of the network on the 900 validation images: 83 %

In [None]:
# prepare to count predictions for each class
correct_pred = {classname: 0 for classname in classes}
total_pred = {classname: 0 for classname in classes}

results = pd.DataFrame(columns=["true", "pred"])

# again no gradients needed
with torch.no_grad():
    for data in val_loader:
        images, labels = data
        outputs = cnn_model(images)
        _, predictions = torch.max(outputs, 1)
        # collect the correct predictions for each class
        for label, prediction in zip(labels, predictions):
            results.loc[len(results)] = [str(label), str(prediction)]

            if label == prediction:
                correct_pred[classes[label]] += 1
            total_pred[classes[label]] += 1


# print accuracy for each class
for classname, correct_count in correct_pred.items():
    accuracy = 100 * float(correct_count) / total_pred[classname]
    print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')

output:
Accuracy for class: cherry is 84.0 %
Accuracy for class: strawberry is 89.2 %
Accuracy for class: tomato is 78.4 %