In [11]:
# Notes
# Batch size 32-128
# lr <= 0.0001 for relu
# if we want more data so training > test, can use data augmentation
# experimented with some dropout layers to prevent overfitting, low probability on conv layers, 0.4+ on fc layers
# relu, CrossEntropy and Adam seem to make most sense

In [12]:
import torch, time, gc
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import matplotlib.gridspec as grid

# Setup device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using {torch.cuda.get_device_name(0)}')
SAVE_PATH = "./saves/neural_net.pth"

# Training variables
BATCH_SIZE = 16
EPOCHS = 10
LEARNING_RATE = 0.000001

# Define the transforms for the dataset
transform = transforms.Compose([
    transforms.Resize(256, antialias=True),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
augment_transform = transforms.Compose([
    transforms.Resize(256, antialias=True),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.RandomApply(
        [transforms.RandomApply([transforms.RandomAffine(degrees=359, translate=(0.2, 0.2), shear=(20, 20, 20, 20)),
                                 transforms.RandomHorizontalFlip(1), transforms.RandomVerticalFlip(1)], p=0.7),
         transforms.ColorJitter(brightness=(0.75, 1.25), contrast=(0.75, 1.25), hue=(-0.15, 0.15),
                                saturation=(0.75, 1.25))
         ]
    )
])

val_accuracies, val_losses, train_accuracies, train_losses = [], [], [], []

# Load the train dataset
train_dataset = datasets.Flowers102(root="./data", split="train", transform=transform, download=True)
all_train_data = [train_dataset]

# Since augment transform has probability/random changes, we can use it multiple times to generate more training data
for i in range(3):
    all_train_data.append(
        datasets.Flowers102(root="./data", split="train", transform=augment_transform, download=False))
train_dataset_extra = torch.utils.data.ConcatDataset(all_train_data)

train_loader = DataLoader(train_dataset_extra, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

# Load validation dataset
val_dataset = datasets.Flowers102(root="./data", split="val", transform=transform, download=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

test_dataset = datasets.Flowers102(root="./data", split="test", transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)


# Define the CNN architecture
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(64),
                                   nn.ReLU(),
                                   nn.MaxPool2d(2, 2))  # 112 -> 224
        self.conv2 = nn.Sequential(nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(128),
                                   nn.ReLU(),
                                   nn.Dropout(p=0.2))
        self.conv3 = nn.Sequential(nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(128),
                                   nn.ReLU())
        self.conv4 = nn.Sequential(nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(128),
                                   nn.ReLU(),
                                   nn.MaxPool2d(2, 2),  # 56 -> 112
                                   nn.Dropout(p=0.2))
        self.conv5 = nn.Sequential(nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(256),
                                   nn.ReLU())
        self.conv6 = nn.Sequential(nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(256),
                                   nn.ReLU())
        self.conv7 = nn.Sequential(nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(256),
                                   nn.ReLU(),
                                   nn.MaxPool2d(2, 2),  # 28 -> 56
                                   nn.Dropout(p=0.2))
        self.conv8 = nn.Sequential(nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(512),
                                   nn.ReLU())
        self.conv9 = nn.Sequential(nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
                                   nn.BatchNorm2d(512),
                                   nn.ReLU())
        self.conv10 = nn.Sequential(nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
                                    nn.BatchNorm2d(512),
                                    nn.ReLU(),
                                    nn.MaxPool2d(2, 2),  # 14 -> 28
                                    nn.Dropout(p=0.2))
        self.conv11 = nn.Sequential(nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1),
                                    nn.BatchNorm2d(1024),
                                    nn.ReLU())
        self.conv12 = nn.Sequential(nn.Conv2d(1024, 1024, kernel_size=3, stride=1, padding=1),
                                    nn.BatchNorm2d(1024),
                                    nn.ReLU())
        self.conv13 = nn.Sequential(nn.Conv2d(1024, 1024, kernel_size=3, stride=1, padding=1),
                                    nn.BatchNorm2d(1024),
                                    nn.ReLU())
        self.conv14 = nn.Sequential(nn.Conv2d(1024, 1024, kernel_size=3, stride=1, padding=1),
                                    nn.BatchNorm2d(1024),
                                    nn.ReLU(),
                                    nn.MaxPool2d(2, 2),  # 7 -> 14
                                    nn.Dropout(p=0.2))
        self.fc1 = nn.Sequential(nn.Linear(7 * 7 * 1024, 8192),
                                 nn.ReLU(),
                                 nn.Dropout())
        self.fc2 = nn.Sequential(nn.Linear(8192, 4096),
                                 nn.ReLU(),
                                 nn.Dropout())
        self.fc3 = nn.Sequential(nn.Linear(4096, 4096),
                                 nn.ReLU(),
                                 nn.Dropout())
        self.fc4 = nn.Sequential(nn.Linear(4096, 102))
        # Final output_features size must be 102 as there are 102 labels in the dataset
        # Pool dimensions: out=(in-kernel)/stride + 1

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = self.conv7(x)
        x = self.conv8(x)
        x = self.conv9(x)
        x = self.conv10(x)
        x = self.conv11(x)
        x = self.conv12(x)
        x = self.conv13(x)
        x = self.conv14(x)
        x = x.view(-1, 7 * 7 * 1024)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x
        # Final output_features size must be 102 as there are 102 labels in the dataset
        # Pool dimensions: out=(in-kernel)/stride + 1

Using NVIDIA GeForce RTX 3070 Laptop GPU


In [13]:
def train_one_epoch(net: nn.Module = None, training_loss: [] = None, training_acc: [] = None, optimizer=None):
    if optimizer is None:
        optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)

    net.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for i, (images, labels) in enumerate(train_loader):
        labels = torch.eye(102)[labels]  # one hot encode
        images = images.to(device)
        labels = labels.to(device)

        outputs = net(images)  # train
        labels = torch.argmax(labels, dim=1)  # one hot decode

        loss = criterion(outputs, labels)
        running_loss += loss.item() * images.size(0)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += torch.sum(predicted == labels).sum().item()

    running_loss /= len(all_train_data)
    train_accuracy = 100 * correct / total
    # save data
    training_loss.append(running_loss)
    training_acc.append(train_accuracy)
    print(f'Training Loss: {running_loss:.4f}, Accuracy: {train_accuracy:.2f}%')
    return training_loss, training_acc


In [14]:
def validate_one_epoch(net: nn.Module = None, valid_loss: [] = None, valid_acc: [] = None):
    net.eval()
    with torch.no_grad():
        running_vloss = 0.0
        vcorrect = 0
        vtotal = 0
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = net(images)
            vloss = criterion(outputs, labels)
            running_vloss += vloss.item() * images.size(0)

            _, predicted = torch.max(outputs.data, 1)
            vtotal += labels.size(0)
            vcorrect += (predicted == labels).sum().item()

        vaccuracy = 100 * vcorrect / vtotal

        # save validation data
        valid_loss.append(running_vloss)
        valid_acc.append(vaccuracy)
        print(f'Validation Loss: {running_vloss:.4f}, Accuracy: {vaccuracy:.2f}%')
        return valid_loss, valid_acc

In [15]:
def train_net(net: nn.Module = None, epochs: int = 1, validation_loss=None, validation_accuracy=None,
              training_loss=None, training_accuracy=None, epochs_already_trained: int = 0):
    optimizer = optim.Adam(cnn.parameters(), lr=LEARNING_RATE)

    if training_accuracy is None:
        training_accuracy = []
    if training_loss is None:
        training_loss = []
    if validation_accuracy is None:
        validation_accuracy = []
    if validation_loss is None:
        validation_loss = []

    epochs += epochs_already_trained
    for epoch in range(epochs_already_trained, epochs):
        print(f'Epoch: [{epoch + 1}/{epochs}]')

        training_loss, training_accuracy = train_one_epoch(net, training_loss=training_loss,
                                                           training_acc=training_accuracy, optimizer=optimizer)
        validation_loss, validation_accuracy = validate_one_epoch(net, valid_loss=validation_loss,
                                                                  valid_acc=validation_accuracy, optimizer=optimizer)

        torch.save(net.state_dict(), SAVE_PATH)
        with open("./saves/save_data.txt", "w") as savefile:
            savefile.writelines(str(validation_loss))
            savefile.writelines(str(validation_accuracy))
            savefile.writelines(str(training_loss))
            savefile.writelines(str(training_accuracy))
            savefile.write(str(epoch + 1))
        print("NN saved.")


In [16]:
def train_net_from_save(epochs: int = 1, save_folder_path: str = "./saves"):
    print("Resuming training from save.")
    net = CNN().to(device)
    net.load_state_dict(torch.load(save_folder_path + "/neural_net.pth"))
    with open(save_folder_path + "/save_data.txt", "r") as savefile:
        savelist = savefile.readline().split("]")
        v_losses, v_accs, t_losses, t_accs = [], [], [], []
        for vloss in savelist[0][1::].split(","):
            v_losses.append(float(vloss.strip()))
        for vacc in savelist[1][1::].split(","):
            v_accs.append(float(vacc.strip()))
        for tloss in savelist[2][1::].split(","):
            t_losses.append(float(tloss.strip()))
        for tacc in savelist[3][1::].split(","):
            t_accs.append(float(tacc.strip()))
        epochs_done = int(savelist[4])
    print("Net loaded. " + str(epochs_done) + " epochs previously trained.")

    train_net(net=net, epochs=epochs, validation_loss=v_losses, validation_accuracy=v_accs, training_accuracy=t_accs,
              training_loss=t_losses, epochs_already_trained=epochs_done)

In [17]:
def test_net(net: nn.Module):
    net.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            # pred_labels = torch.argmax(outputs, axis=1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        print(f"Test Accuracy: {(100 * correct / total):.2f}%")

In [18]:
def test_net_from_save(save_folder_path: str = "./saves"):
    net = CNN().to(device)
    net.load_state_dict(torch.load(save_folder_path + "/neural_net.pth"))
    test_net(net)

In [19]:
# Clear cuda cache
gc.collect()
torch.cuda.empty_cache()

# Create an instance of the CNN and move it to the device
cnn = CNN().to(device)
print(cnn)

# Define the loss function and the optimizer
criterion = nn.CrossEntropyLoss()

KeyboardInterrupt: 

In [None]:
print("Starting training.")
# Start timer
start_time = time.time()

train_net(cnn, EPOCHS, validation_loss=val_losses, validation_accuracy=val_accuracies, training_loss=train_losses,
          training_accuracy=train_accuracies)

# Stop timer
end_time = time.time()
print("Training Complete in: " + time.strftime("%Hh %Mm %Ss", time.gmtime(end_time - start_time)))

In [None]:
# train_net_from_save()

In [None]:
# Test the CNN
test_net(net=cnn)

In [None]:
# Plot
fig = plt.figure(tight_layout=True)
gs = grid.GridSpec(nrows=2, ncols=1)


# for i in range(len(train_losses)):
#     train_losses[i]*=len(all_train_data)
def plot_epochs(ax, data: [[int]], ylabel: str, datalabels: [str], cs: [str]):
    if len(datalabels) == len(data):
        for line in range(len(data)):
            ax.plot(data[line], label=datalabels[line], c=cs[line])
        ax.set_xlabel("Epoch")
        ax.set_ylabel(ylabel)
        fig.align_labels()
        plt.legend()


line_labels = ["Validation", "Training"]
colours = ["forestgreen", "royalblue"]
plot_epochs(fig.add_subplot(gs[0, 0]), [val_accuracies, train_accuracies], "Accuracy", line_labels, colours)
plot_epochs(fig.add_subplot(gs[1, 0]), [val_losses, train_losses], "Loss", line_labels, colours)
plt.show()
fig.savefig("figs/training_graph.png")