In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torchsummary import summary
import numpy as np
import matplotlib.pyplot as plt
import time
import progressbar

# Get our VGG and Resnet models
from torchvision.models import vgg16, vgg19, VGG16_Weights, VGG19_Weights
from torchvision.models import resnet50, resnet152, ResNet50_Weights, ResNet152_Weights

# Imports for Data
from torchvision.datasets import CIFAR10
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms, Resize
from torchvision.transforms.functional import to_pil_image
from torchvision.io import read_image

DIMENSION = 224
BATCH_SIZE = 64
DATA_SIZE = 24000
device = 'cuda' if torch.cuda.is_available() else 'cpu'
widgets = [
    ' [',
    progressbar.Timer(format= 'elapsed time: %(elapsed)s'),
    '] ',
    progressbar.Bar('*'),' (',
    progressbar.ETA(), ') ',
]
print(device)

In [None]:
# Set Up Data
preprocess = transforms.Compose([
        transforms.Resize((DIMENSION, DIMENSION)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),])

# Load data from CIFAR and trim it down
train = CIFAR10('~/data/CIFAR', download=True, train=True, transform=preprocess)
test = CIFAR10('~/data/CIFAR', download=True, train=False, transform=preprocess)
train = torch.utils.data.Subset(train, list(range(0, DATA_SIZE)))

# Create data loaders objects
train_dl = DataLoader(train, batch_size=BATCH_SIZE, shuffle=True)
test_dl = DataLoader(test, batch_size=BATCH_SIZE, shuffle=True)

# Take a peak at the data
print(type(train[0][0]))
plt.imshow(train[1][0].cpu().permute(1, 2, 0))
print(f"Train length x: {len(train)}, y: {len(test)}")
print(f"Current Device: {device}")


In [None]:
# Training code
def train_batch(model, X_train, Y_train, opt, loss_func):

  opt.zero_grad()                       # Flush memory
  pred = model(X_train)
  batch_loss = loss_func(pred, Y_train) # Compute loss
  batch_loss.backward()                 # Compute gradients
  opt.step()                            # Make a GD step

  return batch_loss.detach().cpu().numpy()


def accuracy(model, test_loader):
  correct = 0
  total = 0

  # Set the model to evaluation mode
  model.eval()

  # Disable gradient calculation
  print("Computing accuracy...")
  start = time.time()
  bar = progressbar.ProgressBar(max_value=len(test_loader), widgets=widgets).start()
  with torch.no_grad():
      for i, batch in enumerate(test_loader):
          inputs, labels = batch
          inputs = inputs.to(device)  # Send inputs to the device (CPU or GPU)
          labels = labels.to(device)  # Send labels to the device

          # Forward pass
          outputs = model(inputs)

          # Get predicted class labels
          _, predicted = torch.max(outputs, 1)

          # Update counts
          total += labels.size(0)
          correct += (predicted == labels).sum().item()
          bar.update(i)

  # Calculate accuracy
  accuracy = (correct / total) * 100.0
  testing_time = time.time() - start
  return accuracy, testing_time


def train_model(model, epochs, optimizer, loss):

  # Put model in training mode
  model.train()
  losses, is_accuracies, os_accuracies, n_epochs = [], [], [], epochs
  start = time.time()
  for epoch in range(n_epochs):
      print(f"Running epoch {epoch + 1} of {n_epochs}")
      bar = progressbar.ProgressBar(max_value=len(train_dl), widgets=widgets).start()
      epoch_losses = []

      # Loop to train each batch and measure its loss value
      for i, batch in enumerate(train_dl):
          x, y = batch
          x, y = x.to(device), y.to(device)
          batch_loss = train_batch(model, x, y, optimizer, loss)
          epoch_losses.append(batch_loss)
          bar.update(i)

      # Log and track epoch progress
      epoch_loss = np.mean(epoch_losses)
      losses.append(epoch_loss)
      print()
      print(f"Epoch {epoch + 1} had loss of {epoch_loss}")

  training_time = time.time() - start
  return training_time

In [None]:

# Code we want to run to test each model
def test_model(model):
  test_accuracy, inference_time = accuracy(model, test_dl)
  print()
  print(f"Test Accuracy: {test_accuracy}")
  print(f"Inference time: {inference_time}")


In [None]:
# Define and train the vgg models
vgg_models = [
    vgg16(weights=VGG16_Weights.IMAGENET1K_V1).to(device),
    vgg19(weights=VGG19_Weights.IMAGENET1K_V1).to(device),
]

for model in vgg_models:

  # Disable training for VGG's feature layers
  for param in model.features.parameters():
    param.requires_grad = False

  # Set up the model with our classifier
  model.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1)).to(device)
  model.classifier = nn.Sequential(
                                   nn.Linear(512,128),
                                   nn.ReLU(),
                                   nn.Dropout(0.2),
                                   nn.Linear(128,10),
                                   nn.Sigmoid()).to(device)
  summary(model, (3, DIMENSION, DIMENSION))

  # Train the model
  loss_func = nn.CrossEntropyLoss().to(device)
  opt = Adam(model.parameters(), lr=1e-3)
  training_time = train_model(model, 5, opt, loss_func)

  # Print some training stats
  print(f"Total training time: {training_time}")

  # Test the model
  test_model(model)

In [None]:
# Define and train the resnet models
res_models = [
    resnet50(weights=ResNet50_Weights.IMAGENET1K_V1).to(device),
    resnet152(weights=ResNet152_Weights.IMAGENET1K_V1).to(device)
]

for model in res_models:

  # Disable training for ResNet's feature layers
  for param in model.parameters():
    param.requires_grad = False

  # Set up the model with our classifier
  model.fc = nn.Sequential(nn.Flatten(),
                           nn.Linear(2048,128),
                           nn.ReLU(),
                           nn.Dropout(0.2),
                           nn.Linear(128,10),
                           nn.Sigmoid()).to(device)
  summary(model, (3, DIMENSION, DIMENSION))

  # Train the model
  loss_func = nn.CrossEntropyLoss().to(device)
  opt = Adam(model.parameters(), lr=1e-3)
  training_time = train_model(model, 5, opt, loss_func)

  # Print some training stats
  print(f"Total training time: {training_time}")

  # Test the model
  test_model(model)


In [None]:
# Test Some Images from Google
# 2 frogs, 2 ships, 2 airplanes
classes = {
    0: "airplane",
    1: "automobile",
    2: "bird",
    3: "cat",
    4: "deer",
    5: "dog",
    6: "frog",
    7: "horse",
    8: "ship",
    9: "truck"
}
paths = ["frog1.png", "frog2.png", "ship1.png", "ship2.png", "plane1.png", "plane2.png"]
for path in paths:

  # Get tensor from file
  img = read_image(path)
  img = img[:3, :, :]
  img = to_pil_image(img)

  # Preprocess the image
  img = preprocess(img)

  # Plot image
  plt.imshow(img.permute(1, 2, 0))

  # Check the predicted class
  models = vgg_models + res_models
  for model in models:
    model.eval()
    prediction = model(img[None, :, :, :].to(device))
    prediction = classes[torch.argmax(prediction, 1).item()]
    print(f"Prediction for {path}: {prediction}")
