In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from tqdm.notebook import tqdm, trange
from time import sleep
import os
import pandas as pd
from torchvision.io import read_image
from torchvision.transforms import ToTensor

from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder


In [2]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [3]:
device = get_default_device()

In [4]:
ds_train_src = os.path.abspath("D:\Documenti\GitHub\Advanced_DL\datasets\Fer2013\\train")
ds_test_src = os.path.abspath("D:\Documenti\GitHub\Advanced_DL\datasets\Fer2013\\test")

In [5]:
df_train = ImageFolder(root=ds_train_src, transform=ToTensor())
df_test = ImageFolder(root=ds_test_src, transform=ToTensor())

In [6]:
classes = df_train.classes
classes

['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']

In [7]:
train_dl = DeviceDataLoader(DataLoader(df_train, batch_size=64, shuffle=True),device)
test_dl = DeviceDataLoader(DataLoader(df_test, batch_size=64, shuffle=True),device)

In [8]:
# dataiter = iter(train_dataloader)
# images, labels = dataiter.next()
# print(type(images))
# print(images.shape)
# print(labels.shape)

In [9]:
class Metric():
    def __init__(self, name):
        self.name = name
    
    def eval(self, outputs, labels):
        pass

In [10]:
class Accuracy(Metric):
    
    def eval(self, outputs, labels):
        _, preds = torch.max(outputs, dim=1)
        return torch.tensor(torch.sum(preds == labels).item() / len(preds))

In [11]:
class ImageClassificationBase(nn.Module):
    
    def __init__(self, loss_function, metrics):
        super().__init__()
        self.loss_function = loss_function
        self.metrics = metrics
    
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = self.loss_function(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = self.loss_function(out, labels)   # Calculate loss
        result = {'val_loss': loss.detach()}
        
        for m in self.metrics:
            result[m.name] = m.eval(out, labels)           # Calculate metrics
            
        return result
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        
        result = {'val_loss': epoch_loss.item()}
        
        for m in self.metrics:
            batch = [x[m.name] for x in outputs]
            epoch = torch.stack(batch).mean()      # Combine metrics
            result[m.name] = epoch.item()
            
        return result
    
    def epoch_end(self, epoch, result):
        out = f"Epoch [{epoch}]"
        vals = list(result.keys())
        for v in vals:
            out += f", {v}: {result[v]:.4f}"
        print(out)
        

In [12]:
class Net(ImageClassificationBase):
    
    def __init__(self, loss_function, metrics, out_size):
        super().__init__(loss_function, metrics)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=5, stride=1) # (12, 44, 44 )
        self.bn1 = nn.BatchNorm2d(12)
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=12, kernel_size=5, stride=1) # (12, 40, 40 )
        self.bn2 = nn.BatchNorm2d(12)
        self.pool = nn.MaxPool2d(2,2)                                                    # (12, 20, 20 )
        self.conv4 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=5, stride=1) # (24, 16, 16 )
        self.bn4 = nn.BatchNorm2d(24)
        self.conv5 = nn.Conv2d(in_channels=24, out_channels=24, kernel_size=5, stride=1) # (24, 12, 12 )
        self.bn5 = nn.BatchNorm2d(24)
        self.fc1 = nn.Linear(24*12*12, out_size)

    def forward(self, input):
        output = F.relu(self.bn1(self.conv1(input)))      
        output = F.relu(self.bn2(self.conv2(output)))     
        output = self.pool(output)                        
        output = F.relu(self.bn4(self.conv4(output)))     
        output = F.relu(self.bn5(self.conv5(output)))    
        output = output.view(-1, 24*12*12)
        output = self.fc1(output)

        return output

In [13]:
loss_function = nn.CrossEntropyLoss()
metrics = [Accuracy("val_acc")]
optimizer = optim.Adam
lr =0.001
num_epochs = 10

In [14]:
net = Net(loss_function, metrics, len(classes))

In [15]:
net.to(device)

Net(
  (loss_function): CrossEntropyLoss()
  (conv1): Conv2d(3, 12, kernel_size=(5, 5), stride=(1, 1))
  (bn1): BatchNorm2d(12, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(12, 12, kernel_size=(5, 5), stride=(1, 1))
  (bn2): BatchNorm2d(12, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv2d(12, 24, kernel_size=(5, 5), stride=(1, 1))
  (bn4): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(24, 24, kernel_size=(5, 5), stride=(1, 1))
  (bn5): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=3456, out_features=7, bias=True)
)

In [16]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def fit(epochs, lr, model, train_loader, val_loader, opt_func=torch.optim.SGD):
    history = []
    optimizer = opt_func(model.parameters(), lr)
    for epoch in tqdm(range(epochs), desc = "Current Epoch"):
        # Training Phase 
        model.train()
        train_losses = []
        for batch in tqdm(train_loader, desc = f"Epoch: {epoch}", leave= False):
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [17]:
history = fit(num_epochs, lr, net, train_dl, test_dl, optimizer)

Current Epoch:   0%|          | 0/10 [00:00<?, ?it/s]

Epoch: 0:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [0], val_loss: 1.3991, val_acc: 0.4583, train_loss: 1.5336


Epoch: 1:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [1], val_loss: 1.3316, val_acc: 0.4916, train_loss: 1.2987


Epoch: 2:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [2], val_loss: 1.2684, val_acc: 0.5143, train_loss: 1.1956


Epoch: 3:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [3], val_loss: 1.2802, val_acc: 0.5183, train_loss: 1.1223


Epoch: 4:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [4], val_loss: 1.3032, val_acc: 0.5218, train_loss: 1.0620


Epoch: 5:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [5], val_loss: 1.2587, val_acc: 0.5291, train_loss: 1.0054


Epoch: 6:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [6], val_loss: 1.3314, val_acc: 0.5121, train_loss: 0.9536


Epoch: 7:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [7], val_loss: 1.2749, val_acc: 0.5510, train_loss: 0.8962


Epoch: 8:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [8], val_loss: 1.2854, val_acc: 0.5425, train_loss: 0.8440


Epoch: 9:   0%|          | 0/449 [00:00<?, ?it/s]

Epoch [9], val_loss: 1.3393, val_acc: 0.5382, train_loss: 0.7935


In [18]:
evaluate(net, test_dl)

{'val_loss': 1.33816397190094, 'val_acc': 0.5381637215614319}

In [19]:
for epoch in range(10):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in tqdm(enumerate(train_dataloader, 0),desc='2nd loop'):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

print('Finished Training')

NameError: name 'train_dataloader' is not defined

In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in tqdm(test_dataloader):
        images, labels = data[0].to(device), data[1].to(device)
        # calculate outputs by running images through the network
        outputs = net(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the 10000 test images: {100 * correct // total} %')

In [None]:
correct_pred = {classname: 0 for classname in classes}
total_pred = {classname: 0 for classname in classes}

# again no gradients needed
with torch.no_grad():
    for data in tqdm(test_dataloader):
        images, labels = data[0].to(device), data[1].to(device)
        outputs = net(images)
        _, predictions = torch.max(outputs, 1)
        # collect the correct predictions for each class
        for label, prediction in zip(labels, predictions):
            if label == prediction:
                correct_pred[classes[label]] += 1
            total_pred[classes[label]] += 1


# print accuracy for each class
for classname, correct_count in correct_pred.items():
    accuracy = 100 * float(correct_count) / total_pred[classname]
    print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')