In [14]:
import numpy as np
import matplotlib.pyplot as plt
import time
from PIL import Image

import torch
torch.cuda.empty_cache()
from torch import nn
from torch import optim
from torchvision import datasets, transforms, models
import torch.optim.lr_scheduler as lr_scheduler

from collections import OrderedDict

In [15]:
tr_batchsize = 32
val_test_batchsize = 16
epochs = 150
lr = 0.0001

In [16]:
# By defalt, set device to the CPU
deviceFlag = torch.device('cpu')

# Default is CPU, but as long as GPU is avaliable, then use GPU
if torch.cuda.is_available():
    print(f'Found {torch.cuda.device_count()} GPUs.')
    deviceFlag = torch.device('cuda:0') # Manually pick your cuda device. By default is 'cuda:0'

print(f'Now the deivce is set to {deviceFlag}')

Found 1 GPUs.
Now the deivce is set to cuda:0


# Validation Function

In [17]:
def validation(model, validate_loader, val_criterion):
    
    val_loss_running = 0
    acc = 0
    
    # a dataloader object is a generator of batches, each batch contain image & label separately
    for images, labels in iter(validate_loader):
        
        # Send the data onto choosen device
        images = images.to(deviceFlag)
        labels = labels.to(deviceFlag)
        
        output = model.forward(images)
        val_loss_running += val_criterion(output, labels).item() #*images.size(0) # .item() to get a scalar in Torch.tensor out
        
        probabilities = torch.exp(output) # as in the model we use the .LogSoftmax() output layer

        equality = (labels.data == probabilities.max(dim=1)[1])
        acc += equality.type(torch.FloatTensor).mean()
        
    return val_loss_running, acc

# Training Function
(Calls Validation function)

In [18]:
def train_model():
      steps = 0
      print_every = 16

      model.to(deviceFlag)

      for current_epoch in range(epochs):
          model.train()
    
          running_loss = 0
    
          for images, labels in iter(train_loader):
        
              steps += 1
        
              images = images.to(deviceFlag)
              labels = labels.to(deviceFlag)
        
              optimizer.zero_grad()
        
              output = model.forward(images)
              loss = criterion(output, labels)
              loss.backward()
              optimizer.step()
        
              running_loss += loss.item()
        
              if steps % print_every == 0:
                
                  model.eval()
                
                  # Turn off gradients for validation, saves memory and computations
                  with torch.no_grad():
                      validation_loss, accuracy = validation(model, validate_loader, criterion)
            
                  print("Epoch: {}/{}.. ".format(current_epoch+1, epochs),
                        "Training Loss: {:.3f}... ".format(running_loss/print_every),
                        "Validation Loss: {:.3f}... ".format(validation_loss/len(validate_loader)),
                        "Validation Accuracy: {:.3f}".format(accuracy/len(validate_loader)))
                        # f"Current Learning Rate: {optimizer.param_groups[0]['lr']}")
            
                  running_loss = 0
                  model.train()
          # scheduler.step()

# Test Function

In [19]:
def test_acc(model, test_loader):

    # Do validation on the test set
    model.eval()
    model.to(deviceFlag)

    with torch.no_grad():
    
        accuracy = 0
    
        for images, labels in iter(test_loader):
    
            images, labels = images.to(deviceFlag), labels.to(deviceFlag)
    
            output = model.forward(images)

            probabilities = torch.exp(output)
        
            equality = (labels.data == probabilities.max(dim=1)[1])
        
            accuracy += equality.type(torch.FloatTensor).mean()
        
        print("Test Accuracy: {}".format(accuracy/len(test_loader)))    
        
        

In [20]:
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, x):
        return x + torch.randn(x.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

# Dataset Loading

In [21]:
training_transforms = transforms.Compose([
    transforms.RandomRotation(90),
    transforms.RandomResizedCrop(256),
    transforms.RandomAdjustSharpness(2, 0.5),
    # transforms.RandomErasing(),
    transforms.RandomHorizontalFlip(p=0.7),
    transforms.RandomVerticalFlip(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize(0, 1)
    # transforms.RandomApply([AddGaussianNoise(0, 1)], p=0.2)
])

validation_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(256),
    transforms.ToTensor(),
    transforms.Normalize(0, 1)
])

testing_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(256),
    transforms.ToTensor(),
    transforms.Normalize(0, 1)
])

# Load the datasets with torchvision.datasets.ImageFolder object
train_dataset = datasets.Flowers102(root = './dataset', split = 'train', transform = training_transforms, download = True)
valid_dataset = datasets.Flowers102(root = './dataset', split = 'val', transform = validation_transforms, download = True)
test_dataset = datasets.Flowers102(root = './dataset', split = 'test', transform = testing_transforms, download = True)

# Instantiate loader objects to facilitate processing


# Define the torch.utils.data.DataLoader() object with the ImageFolder object
# Dataloader is a generator to read from ImageFolder and generate them into batch-by-batch
# Only shuffle during trianing, validation and testing no shuffles
# the batchsize for training and tesitng no need to be the same
train_loader = torch.utils.data.DataLoader(dataset = train_dataset,
                                           batch_size = tr_batchsize,
                                           shuffle = True)

validate_loader = torch.utils.data.DataLoader(dataset = valid_dataset,
                                           batch_size = val_test_batchsize)


test_loader = torch.utils.data.DataLoader(dataset = test_dataset,
                                           batch_size = val_test_batchsize)

In [22]:
class VGG16(nn.Module):
    def __init__(self):
        super(VGG16, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer5 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride =2))
        self.layer6 = nn.Sequential(
            nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size =2, stride = 2))
        self.layer7 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
            #nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(256),
            #nn.ReLU(),
            #nn.MaxPool2d(kernel_size = 2, stride = 2))
            #nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(512),
            #nn.ReLU())
            #nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(512),
            #nn.ReLU())
            #nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(512),
            #nn.ReLU(),
            #nn.MaxPool2d(kernel_size = 2, stride = 2))
            #nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(512),
            #nn.ReLU())
            #nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(512),
            #nn.ReLU())
            #nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(512),
            #nn.ReLU(),
            #nn.MaxPool2d(kernel_size = 2, stride = 2))

        self.fc1 = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(p=0.5),
            # first linear must be image size * image size * last Conv2d out channel
            nn.Linear(10368, 5000),
            nn.BatchNorm1d(5000), 
            nn.ReLU())
        self.fc2 = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(5000, 1024),
            nn.BatchNorm1d(1024), 
            nn.ReLU())
        self.fc3 = nn.Sequential(
            #final linear to number of classes
            nn.Linear(1024, 102),
            nn.LogSoftmax(dim=1))

        
    def forward(self, x):
        out = self.layer1(x)

        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        out = self.layer7(out)

        out = self.fc1(out)
        out = self.fc2(out)
        out = out.reshape(out.shape[0], -1)
        out = self.fc3(out)

        return out

In [23]:
model = VGG16()
model.to(deviceFlag)
model

VGG16(
  (layer1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=8, stride=8, padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=1, padding=0, dilation=1, ceil_mode=False)
  )
  (layer3): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=1, padding=0, dilation=1, ceil_mode=False)
  )
  (layer4): Sequential(
    (0): Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1), padding=(1, 1))
    (1): BatchN

In [24]:
# # Freeze pretrained model parameters to avoid backpropogating through them
# for parameter in model.parameters():
#     parameter.requires_grad = False


# from collections import OrderedDict

# # Build custom classifier
# classifier = nn.Sequential(OrderedDict([('fc1', nn.Linear(25088, 5000)),
#                                         ('relu', nn.ReLU()),
#                                         ('drop', nn.Dropout(p=0.5)),
#                                         ('fc2', nn.Linear(5000, 102)),
#                                         ('output', nn.LogSoftmax(dim=1))]))

# model.classifier = classifier

# Define Loss Function and Optimizer

In [25]:
# Negative Log Likelihood Loss
criterion = nn.NLLLoss()

# Cross Entropy Loss
# criterion = nn.CrossEntropyLoss()

# optimizer 1
optimizer = optim.AdamW(model.parameters(), lr = lr, weight_decay = 0.02)

# optimizer 2
# optimizer = optim.SGD(model.parameters(), lr=lr, momentum = 0.5)

# scheduler
# scheduler = lr_scheduler.LinearLR(optimizer, start_factor=1.0, end_factor=0.1, total_iters=50)

In [26]:
train_model()

Epoch: 1/150..  Training Loss: 4.567...  Validation Loss: 4.703...  Validation Accuracy: 0.010
Epoch: 1/150..  Training Loss: 4.342...  Validation Loss: 4.552...  Validation Accuracy: 0.024
Epoch: 2/150..  Training Loss: 3.957...  Validation Loss: 3.987...  Validation Accuracy: 0.074
Epoch: 2/150..  Training Loss: 3.855...  Validation Loss: 3.679...  Validation Accuracy: 0.136
Epoch: 3/150..  Training Loss: 3.672...  Validation Loss: 3.549...  Validation Accuracy: 0.190
Epoch: 3/150..  Training Loss: 3.610...  Validation Loss: 3.431...  Validation Accuracy: 0.188
Epoch: 4/150..  Training Loss: 3.387...  Validation Loss: 3.384...  Validation Accuracy: 0.197
Epoch: 4/150..  Training Loss: 3.455...  Validation Loss: 3.243...  Validation Accuracy: 0.266
Epoch: 5/150..  Training Loss: 3.259...  Validation Loss: 3.193...  Validation Accuracy: 0.279
Epoch: 5/150..  Training Loss: 3.271...  Validation Loss: 3.147...  Validation Accuracy: 0.256
Epoch: 6/150..  Training Loss: 3.084...  Validatio

KeyboardInterrupt: ignored

In [None]:
test_acc(model, test_loader)

In [None]:
# loadedModel = VGG16()
# model.load_state_dict(torch.load("best-model.pt"))

# torch.save(model.state_dict(), "2023-05-08-best-model.pt")

In [None]:
# total_step = len(train_loader)

# for epoch in range(epochs):
#     for i, (images, labels) in enumerate(train_loader):  
#         # Move tensors to the configured device
#         images = images.to(deviceFlag)
#         labels = labels.to(deviceFlag)
        
#         # Forward pass
#         outputs = model(images)
#         loss = criterion(outputs, labels)
        
#         # Backward and optimize
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#     print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
#                    .format(epoch+1, epochs, i+1, total_step, loss.item()))
            
#     # Validation
#     with torch.no_grad():
#         correct = 0
#         total = 0
#         for images, labels in validate_loader:
#             images = images.to(deviceFlag)
#             labels = labels.to(deviceFlag)
#             outputs = model(images)
#             _, predicted = torch.max(outputs.data, 1)
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()
#             del images, labels, outputs
    
#         print('Accuracy of the network on the {} validation images: {} %'.format(total, 100 * correct / total)) 


In [None]:
# with torch.no_grad():
#     correct = 0
#     total = 0
#     for images, labels in test_loader:
#         images = images.to(deviceFlag)
#         labels = labels.to(deviceFlag)
#         outputs = model(images)
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()
#         del images, labels, outputs

#     print('Accuracy of the network on the {} test images: {} %'.format(total, 100 * correct / total))   