# 102Flowers Image Classifier

This is the main notebook for the project. See the associated report (WIP) for more information.

### Imports

In [None]:
import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torchvision import datasets, transforms, models

import numpy as np
import matplotlib.pyplot as plt
import scipy

from IPython.display import clear_output
import os
import json

### Hyperparameters

In [None]:
NAME = "alex-1"
DEFAULT_BATCH_SIZE = 20
TRAINING_BATCH_SIZE = DEFAULT_BATCH_SIZE
VALIDATION_BATCH_SIZE = DEFAULT_BATCH_SIZE
TESTING_BATCH_SIZE = DEFAULT_BATCH_SIZE
TESTING_BATCH_COUNT = 51
EPOCHS = 200
LEARNING_RATE = 0.001
IMAGE_CROP_SIZE = 224
TRAINING_PLOT = True
DROPOUT_P = 0.5
CP_EVERY = 10

f_tr_acc, f_tr_loss, f_val_acc, f_val_loss, f_te_acc, f_te_loss, f_ep_loss = -1, -1, -1, -1, -1, -1, -1

### Device

In [None]:
# Default to CPU
DEVICE = torch.device("cpu")

# Switch to GPU if available
if torch.cuda.is_available():
	print(f"Found {torch.cuda.device_count()} GPUs. Using cuda:0.")
	DEVICE = torch.device("cuda:0")
else:
	print("No GPUs found, using CPU.")

### Load Dataset

In [None]:
training_data = datasets.Flowers102(
    root = "data",
    split = "train",
    transform=transforms.Compose([
        transforms.Resize(IMAGE_CROP_SIZE),
        transforms.CenterCrop(IMAGE_CROP_SIZE),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.ToTensor()
    ]),
    download=True
)

validation_data = datasets.Flowers102(
    root = "data",
    split = "val",
    transform=transforms.Compose([
        transforms.Resize(IMAGE_CROP_SIZE),
        transforms.CenterCrop(IMAGE_CROP_SIZE),
        transforms.ToTensor()
    ]),
    download=True
)

testing_data = datasets.Flowers102(
    root = "data",
    split = "test",
    transform=transforms.Compose([
        transforms.Resize(IMAGE_CROP_SIZE),
        transforms.CenterCrop(IMAGE_CROP_SIZE),
        transforms.ToTensor()
    ]),
    download=True
)

### DataLoaders

In [None]:
training_dataloader = DataLoader(training_data, batch_size=TRAINING_BATCH_SIZE, shuffle=True)
validation_dataloader = DataLoader(validation_data, batch_size=VALIDATION_BATCH_SIZE, shuffle=True)
testing_dataloader = DataLoader(testing_data, batch_size=TESTING_BATCH_SIZE, shuffle=True)

## Model

In [None]:
class F102Classifier(nn.Module):
    
	#"""
	def __init__(self):
		super(F102Classifier, self).__init__()
		
		self.conv1 = nn.Conv2d(3, 64, 11, 4, 2)
		self.relu2 = nn.ReLU(True)
		self.maxpool3 = nn.MaxPool2d(3, 2)
		self.conv4 = nn.Conv2d(64, 192, 5, 1, 2)
		self.relu5 = nn.ReLU(True)
		self.maxpool6 = nn.MaxPool2d(3, 2)
		self.conv7 = nn.Conv2d(192, 384, 3, 1, 1)
		self.relu8 = nn.ReLU(True)
		self.conv9 = nn.Conv2d(384, 256, 3, 1, 1)
		self.relu10 = nn.ReLU(True)
		self.conv11 = nn.Conv2d(256, 256, 3, 1, 1)
		self.relu12 = nn.ReLU(True)
		self.maxpool13 = nn.MaxPool2d(3, 2)
		self.avgpool14 = nn.AdaptiveAvgPool2d((6, 6))
		self.dropout15 = nn.Dropout(DROPOUT_P)
		self.fc16 = nn.Linear(9216, 4096)
		self.relu17 = nn.ReLU(True)
		self.dropout18 = nn.Dropout(DROPOUT_P)
		self.fc19 = nn.Linear(4096, 4096)
		self.relu20 = nn.ReLU(True)
		self.fc21 = nn.Linear(4096, 102)

	def forward(self, x):
		x = self.conv1(x)
		x = self.relu2(x)
		x = self.maxpool3(x)
		x = self.conv4(x)
		x = self.relu5(x)
		x = self.maxpool6(x)
		x = self.conv7(x)
		x = self.relu8(x)
		x = self.conv9(x)
		x = self.relu10(x)
		x = self.conv11(x)
		x = self.relu12(x)
		x = self.maxpool13(x)
		x = self.avgpool14(x)
		x = self.dropout15(x)
		x = torch.flatten(x, 1)
		x = self.fc16(x)
		x = self.relu17(x)
		x = self.dropout18(x)
		x = self.fc19(x)
		x = self.relu20(x)
		x = self.fc21(x)

		return x
	#"""

	"""
	def __init__(self):
		super(F102Classifier, self).__init__()
		
		self.pool = nn.AvgPool2d(2, 2)
		self.conv1 = nn.Conv2d(3, 6, 3)
		self.conv2 = nn.Conv2d(6, 12, 3)
		self.conv3 = nn.Conv2d(12, 24, 3)
		self.conv4 = nn.Conv2d(24, 48, 3)
		self.conv5 = nn.Conv2d(48, 96, 3)
		self.fc1 = nn.Linear(2400, 1024)
		self.fc2 = nn.Linear(1024, 512)
		self.fc3 = nn.Linear(512, 102)		

	def forward(self, x):
		x = self.pool(F.relu(self.conv1(x)))
		x = self.pool(F.relu(self.conv2(x)))
		x = self.pool(F.relu(self.conv3(x)))
		x = self.pool(F.relu(self.conv4(x)))
		x = self.pool(F.relu(self.conv5(x)))
		#print("after conv5: ", x.size())
		x = torch.flatten(x)
		#print("after flatten: ", x.size())
		x = x.view(DEFAULT_BATCH_SIZE, -1)
		#print("after view: ", x.size())
		x = F.relu(self.fc1(x))
		#print("after fc1: ", x.size())
		x = F.relu(self.fc2(x))
		#print("after fc2: ", x.size())
		x = self.fc3(x)
		#print("after fc3: ", x.size())
		#print(x)
		#print(x.size())
		return x
	"""

net = F102Classifier()
net = net.to(DEVICE)

### Loss Function & Optimiser

In [None]:
loss_function = nn.CrossEntropyLoss().to(DEVICE)
optimiser = optim.Adam(net.parameters(), lr=LEARNING_RATE)
#optimiser = optim.SGD(net.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=0.005)

## Validation, Testing, Training Functions

### Validation, Testing Functions

In [None]:
def validate(model:F102Classifier=net, dataloader:DataLoader=validation_dataloader, loss_fn=loss_function, batches=TESTING_BATCH_COUNT, print_type="Validation"):
    model.eval()
    test_loss, correct = 0, 0
    for batch, (inputs, labels) in enumerate(dataloader):
        if batch == batches:
            break
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        output = model(inputs)
        test_loss += loss_fn(output, labels).item()
        pred = torch.tensor([torch.argmax(o)+1 for o in output]).to(DEVICE)
        correct += pred.eq(labels.view_as(pred)).sum().item()
    test_loss /= batches
    correct /= batches
    print(f"{print_type} Accuracy: {correct*100:.1f}%\nMean Loss for {print_type}: {test_loss:.3f}")
    if print_type == "Validation":
        global f_val_acc, f_val_loss
        f_val_acc, f_val_loss = correct, test_loss
    elif print_type == "Training":
        global f_tr_acc, f_tr_loss
        f_tr_acc, f_tr_loss = correct, test_loss
    elif print_type == "Testing":
        global f_te_acc, f_te_loss
        f_te_acc, f_te_loss = correct, test_loss
    return test_loss, correct

def test(model:F102Classifier=net, dataloader:DataLoader=testing_dataloader, loss_fn=loss_function, batches=TESTING_BATCH_COUNT, print_type="Testing"):
    return validate(dataloader=dataloader, batches=TESTING_BATCH_COUNT, print_type=print_type)

### Training Function

In [None]:
def train(model:F102Classifier=net, dataloader:DataLoader=training_dataloader, loss_fn=loss_function, optimiser=optimiser, epochs:int=EPOCHS, validate_every:int=1, training_fit_every:int=5, checkpoint_every:int=CP_EVERY, plot:bool=TRAINING_PLOT):
	print("Started Training")
	model.train()
	epoch_loss = 0
	loss_record = list([0])
	validation_accuracy_record = list([(0,0)])
	training_accuracy_record = list([(0,0)])
	validation_loss_record = list([(0,0)])
	training_fit_loss_record = list([(0,0)])
	for i in range(0, epochs):
		model.train()
		for batch, (inputs, labels) in enumerate(dataloader):
			inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)

			outputs = model(inputs)
			
			# LOSS
			loss = loss_fn(outputs, labels)
			epoch_loss += loss.item()

			# BACKPROP
			#optimiser.zero_grad()
			#loss.backward()
			loss.backward()
			optimiser.step()

			if (batch % len(dataloader) == 0):
				clear_output(wait=True)

				if ((i + 1) % validate_every == 0):
					test_loss, accuracy = validate()
					validation_accuracy_record.append((i+1,accuracy*100))
					validation_loss_record.append((i+1,test_loss))

				if ((i + 1) % training_fit_every == 0):
					test_loss, accuracy = validate(dataloader=training_dataloader, print_type="Training")
					training_accuracy_record.append((i+1,accuracy*100))
					training_fit_loss_record.append((i+1,test_loss))

				epoch_loss = epoch_loss/len(dataloader)
				loss_record.append(epoch_loss)
				#status = f"Epoch: {i+1}/{epochs}\nBatch: {batch+1}/{len(dataloader)}\nMean Loss for Epoch: {epoch_loss:.4f}"
				status = f"Epoch: {i+1}/{epochs}\nMean Loss for Epoch: {epoch_loss:.4f}"
				epoch_loss = 0.0
				print(status)

				if ((i + 1) % checkpoint_every == 0):
					if not os.path.exists("./models"):
						os.makedirs("./models")
					if not os.path.exists(f"./models/{NAME}"):
						os.makedirs(f"./models/{NAME}")
					torch.save(net.state_dict(), f"./models/{NAME}/{NAME}-cp{i+1}.pth")
					print(f"Checkpoint {i+1} saved at ./models/{NAME}/{NAME}-cp{i+1}.pth")

				if plot:
					fig, ax1 = plt.subplots()
					# Primary Y axis for loss
					ax1.plot(loss_record, color='limegreen', label='Epoch Loss')
					ax1.plot([e[0] for e in validation_loss_record], [a[1] for a in validation_loss_record], color='orange', label='Validation Set Loss')
					ax1.plot([e[0] for e in training_fit_loss_record], [a[1] for a in training_fit_loss_record], color='blue', label='Training Set Loss')
					plt.xlabel('Mean Loss for Epoch')
					plt.ylabel('Loss')
					plt.text(0, -0.1, status, ha='left', va='top', transform=plt.gca().transAxes)
					plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True))
					plt.gca().set_xlim(left=1)
					plt.gca().set_ylim(bottom=0)
					plt.gca().set_ylim(top=max(loss_record)*1.1)
					# Secondary Y axis for accuracy
					ax2 = ax1.twinx()
					ax2.plot([e[0] for e in validation_accuracy_record], [a[1] for a in validation_accuracy_record], color='sandybrown', label='Validation Set Accuracy (%)')
					ax2.plot([e[0] for e in training_accuracy_record], [a[1] for a in training_accuracy_record], color='slateblue', label='Training Set Accuracy (%)')
					plt.ylabel('Accuracy (%)')
					plt.gca().set_ylim(bottom=0)
					plt.gca().set_ylim(top=100)
					plt.gca().set_xlim(left=1)
					plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True))
					# Legend
					li1, la1 = ax1.get_legend_handles_labels()
					li2, la2 = ax2.get_legend_handles_labels()
					lis, las = li1+li2, la1+la2
					plt.legend(lis, las)
					plt.grid(which='both')
					plt.show()
	
	print("Finished Training")
	global f_ep_loss
	f_ep_loss = loss_record[-1]
	return loss_record, validation_accuracy_record


# Run Model

In [20]:
train()

KeyboardInterrupt: 

# Test Model

In [None]:
test(batches=(len(testing_dataloader)))

### Save Model

In [None]:
COMMENT = "AlexNet-like architecture"

In [None]:
if not os.path.exists("./models"):
	os.makedirs("./models")
if not os.path.exists(f"./models/{NAME}"):
	os.makedirs(f"./models/{NAME}")
	
torch.save(net.state_dict(), f"./models/{NAME}/{NAME}.pth")

register = json.load(open("./models/_register.json"))
for i in register:
	if i["name"] == NAME:
		register.remove(i)
		break
register.append({
	"name": NAME,
	"comment": COMMENT,
	"batch_size": DEFAULT_BATCH_SIZE,
	"epochs": EPOCHS,
	"learning_rate": LEARNING_RATE,
	"image_crop_size": IMAGE_CROP_SIZE,
	"dropout_p": DROPOUT_P,
	"final_training_accuracy": f_tr_acc,
	"final_training_loss": f_tr_loss,
	"final_validation_accuracy": f_val_acc,
	"final_validation_loss": f_val_loss,
	"final_testing_accuracy": f_te_acc,
	"final_testing_loss": f_te_loss,
	"final_epoch_loss": f_ep_loss
})
json.dump(register, open("./models/_register.json", "w"), indent="\t")

print(f"Model saved at ./models/{NAME}/{NAME}.pth, registered as {NAME}")