In [None]:
import torch as th
import torchvision as thv
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
device = 'cuda'

In [None]:
# # Use this block if you are working on Google Colab. Copies the dataset from drive. You should specify the right path

# from google.colab import drive
# drive.mount('/content/drive', force_remount=False)

# !cp /content/drive/MyDrive/HW3_Data.rar HW.rar
# !unrar x HW.rar

In [None]:
# Creates directories to save statistics
!mkdir '/content/saved_losses/'
!mkdir '/content/saved_models/'

In [None]:
# We center the images data with the mean we found in the other script

transforms_center = transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.4831, 0.4053, 0.3707),(1.0,1.0,1.0))
])

# Load datasets for train, validation and test
dataset_centered = datasets.ImageFolder('HW3_Data/HW4_Data/classification_data/train_data',transform=transforms_center)
dataset_validation = datasets.ImageFolder('HW3_Data/HW4_Data/classification_data/val_data',transform=transforms_center)
dataset_test = datasets.ImageFolder('HW3_Data/HW4_Data/classification_data/test_data',transform=transforms_center)

In [None]:
# Implementation of the given classfier model

class FaceIdentifier(nn.Module):
    def __init__(self):
        super(FaceIdentifier, self).__init__()
        self.conv_block = nn.Sequential(
            nn.Conv2d(3, 32, 7),
            nn.ReLU(),
            nn.Conv2d(32, 16, 7),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 16, 5, padding=1),
            nn.Dropout2d(),
            nn.Conv2d(16, 16, 5),
            nn.BatchNorm2d(16)
        )

        self.fc_block = nn.Sequential(
            nn.Flatten(),
            nn.Linear(6400,4000),
            #nn.Softmax() #Softmax is implemented in the cross-entropy loss, so no need to put it here
        )

    def forward(self, x):
        x = self.conv_block(x)
        logits = self.fc_block(x)
        return logits

In [None]:
# Validate/Test Model.  Also can be renamed to evaluate.
def validate(model, dataset, batch_size, device='cuda'):
  dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
  dataset_size = len(dataloader.dataset)
  num_batches = len(dataloader)

  loss_fn = nn.CrossEntropyLoss()

  running_loss = 0
  running_correct = 0

  model.eval()

  with th.no_grad():

    for batch, (images, labels) in enumerate(dataloader):

        labels = labels.to(device)
        predictions = model(images.to(device))
        loss = loss_fn(predictions, labels)

        _, predicted_labels = predictions.max(1)
        running_correct += predicted_labels.eq(labels).sum().item()
        running_loss += loss.item()
  
  model.train()

  val_loss = running_loss/num_batches
  accuracy = running_correct/dataset_size

  print(f"VALIDATION: [loss: {val_loss:>7f}, accuracy: {accuracy}")

  return val_loss, accuracy

In [None]:
# Train the model with given parameters. Run tests on validation dataset at the end of every epoch once

def train_loop(model, dataset, validation_dataset, optimizer_name='SGD', weight_decay=0, num_epochs=50, batch_size=64, device='cuda', learning_rate=1e-5, log=True):

  dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

  loss_fn = nn.CrossEntropyLoss()

  epoch_losses = [] # For plotting
  epoch_accuracies = []
  val_losses = []
  val_accuracies = []

  if optimizer_name == 'SGD':
    optimizer = th.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
  elif optimizer_name == 'Adam':
    optimizer = th.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9,0.999))
  elif optimizer_name == 'RMSProp':
    optimizer = th.optim.RMSprop(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay)
  elif optimizer_name == 'AdaGrad':
    optimizer = th.optim.Adagrad(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

  dataset_size = len(dataloader.dataset)
  num_batches = len(dataloader)

  for epoch in range(num_epochs):

    epoch_loss = 0
    correct = 0

    for batch, (images, labels) in enumerate(dataloader):
      labels = labels.to(device)
      predictions = model(images.to(device))
      loss = loss_fn(predictions, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      _, predicted_labels = predictions.max(1)
      correct += predicted_labels.eq(labels).sum().item()
      epoch_loss += loss.item()

      if batch % 10 == 0 and log == True:
        loss, current = loss.item(), batch * len(images)
        correct_guesses = predicted_labels.eq(labels).sum().item()
        print(f"[Batch:{batch} Epoch:{epoch}] loss: {loss:>7f} accuracy: {correct_guesses/len(images)}  [{current:>5d}/{dataset_size:>5d}]")

    val_l, val_acc = validate(model, validation_dataset, batch_size, device)

    epoch_losses.append(epoch_loss/num_batches)
    epoch_accuracies.append(correct/dataset_size)
    val_losses.append(val_l)
    val_accuracies.append(val_acc)

      
  
  return epoch_losses, epoch_accuracies, val_losses, val_accuracies

In [None]:
# Train the model. Also save the statisctics and the model for later use

model = FaceIdentifier().to(device)
train_losses, train_accuracies, val_losses, val_accuracies = train_loop(model, dataset_centered, dataset_validation, optimizer_name='RMSProp', num_epochs=25, batch_size=256, device=device, learning_rate=1e-5)

th.save(model.state_dict(), 'saved_models/model.pth')
th.save(train_losses, 'saved_losses/train_losses_no_regularization.pt')
th.save(train_accuracies, 'saved_losses/train_accuracies_no_regularization.pt')
th.save(val_losses, 'saved_losses/val_losses_no_regularization.pt')
th.save(val_accuracies, 'saved_losses/val_accuracies_no_regularization.pt')

In [None]:
# # If you are working on Colab, downloads files automatically. Give colab permission to perform multiple file downloads.
# from google.colab import files
# files.download('/content/saved_losses/train_losses_no_regularization.pt')
# files.download('/content/saved_losses/train_accuracies_no_regularization.pt')
# files.download('/content/saved_losses/val_losses_no_regularization.pt')
# files.download('/content/saved_losses/val_accuracies_no_regularization.pt')
# files.download('/content/saved_models/model.pth') 

In [None]:
# Modified VGG16 network. Last fully connected layer removed from the pretrained network. All other parameters are frozen.
# An additional layer added to match 4000 classes of our dataset
class ModifiedVGG16(nn.Module):
    def __init__(self, num_classes):
        super(ModifiedVGG16,self).__init__()
        self.net = models.vgg16(pretrained=True)
        
        for p in self.net.parameters(): # freeze parameters
            p.requires_grad=False
        
        num_features = self.net.classifier[6].in_features
        features = list(self.net.classifier.children())[:-1] # Remove last layer
        self.net.classifier = nn.Sequential(*features) # Replace the model classifier
        self.output_layer = nn.Linear(num_features, num_classes) # Add our layer with 4 outputs

        

    def forward(self,x):
        x = self.net(x)
        x = self.output_layer(x)
        return x


In [None]:
vgg16_model = ModifiedVGG16(4000).to(device)

In [None]:
train_losses, train_accuracies, val_losses, val_accuracies = train_loop(vgg16_model, dataset_centered, dataset_validation, optimizer_name='RMSProp', num_epochs=1, batch_size=256, device=device, learning_rate=1e-5)

th.save(model.state_dict(), 'saved_models/model.pth')
th.save(train_losses, 'saved_losses/vgg16_train_losses_no_regularization.pt')
th.save(train_accuracies, 'saved_losses/vgg16_train_accuracies_no_regularization.pt')
th.save(val_losses, 'saved_losses/vgg16_val_losses_no_regularization.pt')
th.save(val_accuracies, 'saved_losses/vgg16_val_accuracies_no_regularization.pt')

In [None]:
# Test the models with the validate function.
loss, accuracy = validate(vgg16_model, dataset_test, batch_size=256, device='cuda')