The project consists in comparing at least two different methods for unsupervised domain adaptation and showing the results in a table. The results shall show performance on the source dataset, and performance on the target dataset. Moreover, the upper bound will be evaluated by training and testing the models on the target dataset as a sort of oracle predictor.


Google Colab was used for the project for computational reasons.

---
## Initialization

Since I'm using Google Colab for computational reasons, I will structure this notebook by putting in these first cells all relevant hyperparameters and libraries.


In [1]:
import os
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from coral_pytorch.losses import CoralLoss
from tqdm import tqdm

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
print(torch.cuda.get_device_name(0))
print(device)


NVIDIA GeForce RTX 3070 Laptop GPU
cuda:0


Parameters:

In [2]:
batch_size = 64
num_epochs = 30
learning_rate=0.0001
source_train_dir='AFE\\train'
source_test_dir='AFE\\test'
target_train_dir='FER2013\\train'
target_test_dir='FER2013\\test'

---

## Dataset preparation

In [3]:
# Define the transformations for resizing, converting to grayscale

transform = transforms.Compose([
    transforms.Resize((48, 48)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])


# Create instances of the source and target datasets
source_train_data = datasets.ImageFolder(source_train_dir, transform=transform)
source_test_data = datasets.ImageFolder(source_test_dir, transform=transform)
target_train_data = datasets.ImageFolder(target_train_dir, transform=transform)
target_test_data = datasets.ImageFolder(target_test_dir, transform=transform)
print("imagefolder done")


# Create data loaders for batching the data
source_train_loader = DataLoader(source_train_data, batch_size=batch_size, shuffle=True,num_workers=8)
source_test_loader = DataLoader(source_test_data, batch_size=batch_size, shuffle=True,num_workers=8)
target_train_loader = DataLoader(target_train_data, batch_size=batch_size, shuffle=True,num_workers=8)
target_test_loader = DataLoader(target_test_data, batch_size=batch_size, shuffle=True,num_workers=8)
print("DataLoader done")

imagefolder done
DataLoader done


In [4]:
print(f'Type of first instance in source train data: {type(source_train_data[0][0])}')
print(f'Type of first instance in target train data: {type(target_train_data[0][0])}')


Type of first instance in source train data: <class 'torch.Tensor'>
Type of first instance in target train data: <class 'torch.Tensor'>


---

## Network architecture


In [5]:
model=models.mobilenet_v3_small(weights='DEFAULT')
model = model.to(device)

#for param in model.parameters():
  #param.requires_grad = False

model.classifier[3] = nn.Linear(1024, 7).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(model.classifier)


Sequential(
  (0): Linear(in_features=576, out_features=1024, bias=True)
  (1): Hardswish()
  (2): Dropout(p=0.2, inplace=True)
  (3): Linear(in_features=1024, out_features=7, bias=True)
)


In [6]:
model=models.efficientnet_v2_s(weights='DEFAULT')
model = model.to(device)
print(model.classifier)
#for param in model.parameters():
  #param.requires_grad = False

model.classifier[1] = nn.Linear(1280, 7).to(device)

#model.classifier[1] =torch.nn.Sequential(
#    torch.nn.Dropout(0.5),
#    torch.nn.Linear(1280, 7).to(device)
#)
optimizer = optim.Adam(model.parameters(), lr=learning_rate,weight_decay=0.0001)
print(model.classifier)


Sequential(
  (0): Dropout(p=0.2, inplace=True)
  (1): Linear(in_features=1280, out_features=1000, bias=True)
)
Sequential(
  (0): Dropout(p=0.2, inplace=True)
  (1): Linear(in_features=1280, out_features=7, bias=True)
)


---
---
---

# Coral

## Coral Loss


In [7]:
# Initialize the CORAL loss
def coral_loss(source, target):
    # Source and target are the feature representations of source and target datasets.
    # They should be 2D Tensors, where the number of rows should be the number of instances
    # and the number of columns should be the number of features.
    # So, source.size() = (num_source_instances, num_features)
    # and target.size() = (num_target_instances, num_features)

    # Compute the mean of source and target
    source_mean = torch.mean(source, dim=0, keepdim=True)
    target_mean = torch.mean(target, dim=0, keepdim=True)

    # Compute the covariance of source and target
    source_cov = (source - source_mean).t() @ (source - source_mean) / (source.size(0) - 1)
    target_cov = (target - target_mean).t() @ (target - target_mean) / (target.size(0) - 1)

    # Compute the Frobenius norm of the covariance difference
    loss = torch.norm(source_cov - target_cov, p='fro')

    return loss

# Define the task-specific loss
cross_entropy_loss  = nn.CrossEntropyLoss()

In [8]:
model.train()
total_loss = 0
correct = 0
total = 0

progress_bar = tqdm(enumerate(zip(source_train_loader, target_train_loader)), total=len(source_train_loader))
for epoch in range(num_epochs):
    progress_bar.reset()

    for i, ((source_inputs, source_labels), (target_inputs, _)) in enumerate(zip(source_train_loader, target_train_loader)):

        source_inputs = source_inputs.to(device)
        source_labels = source_labels.to(device)
        target_inputs = target_inputs.to(device)
        # Forward pass
        source_outputs = model(source_inputs)
        target_outputs = model(target_inputs)

        # Compute the loss
        cross_entropy_loss_value = cross_entropy_loss(source_outputs, source_labels)
        coral_loss_value  = coral_loss(source_outputs, target_outputs)
        loss = (coral_loss_value*0.7) + (cross_entropy_loss_value*0.3)


        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        

        # Compute accuracy
        _, predicted = torch.max(source_outputs.data, 1)
        total += source_labels.size(0)
        correct += (predicted == source_labels).sum().item()

        total_loss += loss.item()

        progress_bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / (i + 1)}, Accuracy: {correct / total * 100}%')
        progress_bar.update(1)

    progress_bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / len(source_train_loader)}, Accuracy: {correct / total * 100}%')
    progress_bar.refresh()

progress_bar.close()

Epoch [30/30], Loss: 8.670728371698907, Accuracy: 80.16362615087796%: 100%|██████████| 427/427 [01:44<00:00,  4.09it/s]  


In [9]:
# Evaluation
model.eval()  # set the model to evaluation mode

# Compute target accuracy using test_target_dataloader
correct = 0
total = 0

with torch.no_grad():
    for i, (inputs, labels) in enumerate(target_test_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Test Target Accuracy: {correct / total * 100}%')

correct = 0
total = 0

with torch.no_grad():  # disable gradient computation
    for i, (inputs, labels) in enumerate(source_test_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'source Test Accuracy: {correct / total * 100}%')

correct = 0
total = 0

with torch.no_grad():  # disable gradient computation
    for i, (inputs, labels) in enumerate(target_train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Target train Accuracy: {correct / total * 100}%')

Test Target Accuracy: 44.23237670660351%
source Test Accuracy: 85.97223748306047%
Target train Accuracy: 44.07677035076109%


---

# MMD


In [None]:
model=models.mobilenet_v3_small(weights='DEFAULT')
model = model.to(device)

#for param in model.parameters():
  #param.requires_grad = False

model.classifier[3] = nn.Linear(1024, 7).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(model.classifier)


Sequential(
  (0): Linear(in_features=576, out_features=1024, bias=True)
  (1): Hardswish()
  (2): Dropout(p=0.2, inplace=True)
  (3): Linear(in_features=1024, out_features=7, bias=True)
)


In [None]:
def gaussian_kernel_matrix(x, y, sigma):
    x_size = x.size(0)
    y_size = y.size(0)
    dim = x.size(1)
    x = x.unsqueeze(1)  # (x_size, 1, dim)
    y = y.unsqueeze(0)  # (1, y_size, dim)
    tiled_x = x.expand(x_size, y_size, dim)
    tiled_y = y.expand(x_size, y_size, dim)
    return torch.exp(-((tiled_x - tiled_y)**2).sum(2) / (2.0 * sigma**2))

def compute_mmd(x, y, sigma):
    x_kernel = gaussian_kernel_matrix(x, x, sigma)
    y_kernel = gaussian_kernel_matrix(y, y, sigma)
    xy_kernel = gaussian_kernel_matrix(x, y, sigma)
    mmd = x_kernel.mean() + y_kernel.mean() - 2*xy_kernel.mean()
    return mmd

cross_entropy_loss  = nn.CrossEntropyLoss()

In [None]:
model.train()
total_loss = 0
correct = 0
total = 0

progress_bar = tqdm(enumerate(zip(source_train_loader, target_train_loader)), total=len(source_train_loader))
for epoch in range(num_epochs):
    progress_bar.reset()

    for i, ((source_inputs, source_labels), (target_inputs, _)) in enumerate(zip(source_train_loader, target_train_loader)):
        source_inputs = source_inputs.to(device)
        source_labels = source_labels.to(device)
        target_inputs = target_inputs.to(device)
        # Forward pass
        source_outputs = model(source_inputs)
        target_outputs = model(target_inputs)

        # Compute the loss
        cross_entropy_loss_value = cross_entropy_loss(source_outputs, source_labels)
        mmd_loss_value  = compute_mmd(source_outputs, target_outputs,1)
        loss = (mmd_loss_value*0.5) + (cross_entropy_loss_value*0.5)


        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        

        # Compute accuracy
        _, predicted = torch.max(source_outputs.data, 1)
        total += source_labels.size(0)
        correct += (predicted == source_labels).sum().item()

        total_loss += loss.item()

        progress_bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / (i + 1)}, Accuracy: {correct / total * 100}%')
        progress_bar.update(1)

    progress_bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / len(source_train_loader)}, Accuracy: {correct / total * 100}%')
    progress_bar.refresh()

progress_bar.close()

Epoch [20/20], Loss: 5.234048975745011, Accuracy: 82.73426624661148%: 100%|██████████| 427/427 [00:48<00:00,  8.80it/s] 


In [None]:
# Evaluation
model.eval()  # set the model to evaluation mode

# Compute target accuracy using test_target_dataloader
correct = 0
total = 0

with torch.no_grad():
    for i, (inputs, labels) in enumerate(target_test_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Test Target Accuracy: {correct / total * 100}%')

correct = 0
total = 0

with torch.no_grad():  # disable gradient computation
    for i, (inputs, labels) in enumerate(source_test_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Source Test Accuracy: {correct / total * 100}%')

correct = 0
total = 0

with torch.no_grad():  # disable gradient computation
    for i, (inputs, labels) in enumerate(target_train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Target train Accuracy: {correct / total * 100}%')

Test Target Accuracy: 31.192532738924488%
Source Test Accuracy: 71.11306449840676%
Target train Accuracy: 32.25469365007489%


---

# Oracle Predictor

In [None]:
model=models.mobilenet_v3_small(weights='DEFAULT')
model = model.to(device)

#for param in model.parameters():
  #param.requires_grad = False

model.classifier[3] = nn.Linear(1024, 7).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(model.classifier)


Sequential(
  (0): Linear(in_features=576, out_features=1024, bias=True)
  (1): Hardswish()
  (2): Dropout(p=0.2, inplace=True)
  (3): Linear(in_features=1024, out_features=7, bias=True)
)


In [None]:
cross_entropy_loss = nn.CrossEntropyLoss()
model.train()
progress_bar = tqdm(total=len(target_train_loader))
for epoch in range(num_epochs):
    
    total_loss = 0
    correct = 0
    total = 0
    progress_bar.reset()

    for i, (source_inputs, source_labels) in enumerate(target_train_loader):

        source_inputs = source_inputs.to(device)
        source_labels = source_labels.to(device)
        
        # Forward pass
        source_outputs = model(source_inputs)

        # Compute the loss
        loss = cross_entropy_loss(source_outputs, source_labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Compute accuracy
        _, predicted = torch.max(source_outputs.data, 1)
        total += source_labels.size(0)
        correct += (predicted == source_labels).sum().item()

        total_loss += loss.item()

        progress_bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / (i + 1)}, Accuracy: {correct / total * 100}%')
        progress_bar.update(1)

    progress_bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / len(target_train_loader)}, Accuracy: {correct / total * 100}%')
    progress_bar.refresh()

progress_bar.close()


Epoch [20/20], Loss: 0.3916057722605682, Accuracy: 86.2900135845902%: 100%|██████████| 449/449 [00:25<00:00, 17.42it/s]  


In [None]:
# Evaluation
model.eval()  # set the model to evaluation mode

# Compute target accuracy using test_target_dataloader
correct = 0
total = 0

with torch.no_grad():
    for i, (inputs, labels) in enumerate(target_test_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Test Target Accuracy: {correct / total * 100}%')


Test Target Accuracy: 46.517135692393424%
