**DOWNLOAD THE PACS DATASET & SETUP THE ENVIRONMENT**

In [1]:
## Download PACS Dataset Images
#!git clone https://github.com/MachineLearning2020/Homework3-PACS/
#!mv Homework3-PACS/PACS/ .
#!rm -r Homework3-PACS/
#
## Download PACS Dataset Labels
#!git clone https://github.com/silvia1993/DANN_Template/
#!mv DANN_Template/txt_lists/art_painting.txt PACS/
#!mv DANN_Template/txt_lists/cartoon.txt PACS/
#!mv DANN_Template/txt_lists/photo.txt PACS/
#!mv DANN_Template/txt_lists/sketch.txt PACS/
#!rm -r DANN_Template/
#
## Install additional libraries
#!pip install torchmetrics

**GLOBAL VARIABLES**

In [2]:
import torch
import torch.optim as opt

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
NUM_CLASSES = 7
BATCH_SIZE = 256
LR = 1e-3            # The initial Learning Rate
MOMENTUM = 0.9       # Hyperparameter for SGD, keep this at 0.9 when using SGD
WEIGHT_DECAY = 5e-5  # Regularization, you can keep this at the default
NUM_EPOCHS = 30      # Total number of training epochs (iterations over dataset)
STEP_SIZE = 20       # How many epochs before decreasing learning rate (if using a step-down policy)
GAMMA = 0.1          # Multiplicative factor for learning rate step-down

LOG_FREQUENCY = 10

**DATASET MANAGEMENT**

In [41]:
from torch.utils.data import Dataset

import os

from PIL import Image, ImageEnhance, ImageFilter

from math import ceil


def pil_loader(path):
    with open(path, 'rb') as f:
        img = Image.open(f)
        return img.convert('RGB')
    
def create_label_dict(labels):

    label_to_int = {}
    int_to_label = {}
    label_set = sorted(list(set(labels)), key= lambda x: x.lower())

    for i, l in enumerate(label_set):

            label_to_int[l] = i
            int_to_label[i] = l
    
    return label_to_int, int_to_label

# Define the Dataset class
class PACSDataset(Dataset):
    def __init__(self, split= 'train', transform= None, target_transform= None):
        super(PACSDataset, self).__init__()

        self.transform = transform
        self.target_transform = target_transform

        self.split = split

        self.images = []
        self.str_labels = []
        self.domains = []


        for dom in ['cartoon', 'sketch']:
            #print(dom)

            for cat in os.listdir('PACS/kfold/' + dom + '/'):
                #print(cat)
                ims = []
                labs = []
                doms = []
                for image in os.listdir('PACS/kfold/' + dom + '/' + cat + '/'):
                    ims.append(pil_loader('PACS/kfold/' + dom + '/' + cat + '/' + image))
                    labs.append(cat)
                    doms.append(0 if dom == 'cartoon' else 1)
                
                rangei = range(ceil(len(ims) * 3 / 4)) if self.split == 'train' else range(ceil(len(ims) * 3 / 4), len(ims))
                for i in rangei:
                    self.images.append(ims[i])
                    self.str_labels.append(labs[i])
                    self.domains.append(doms[i])

                #for image in os.listdir('PACS/kfold/' + dom + '/' + cat + '/'):
                #    #print(ima)
                #    self.images.append(pil_loader('PACS/kfold/' + dom + '/' + cat + '/' + image))
                #    self.str_labels.append(cat)
                #    self.domains.append(0 if dom == 'cartoon' else 1)

        self.label_to_int, self.int_to_label = create_label_dict(self.str_labels)
        self.labels = [self.label_to_int[l] for l in self.str_labels]

    def __len__(self): return len(self.labels)

    def __getitem__(self, index):

        image = self.images[index]
        label = self.labels[index]
        domain = self.domains[index]

        if self.transform is not None:
            image = self.transform(image)

        return image, label, domain
    
train_DS = PACSDataset(split= 'train')

**ARCHITECTURE SETUP**

In [51]:
import torch.nn as nn
from torch.autograd import Function
import torchvision.models as models

class ReverseLayerF(Function):
    @staticmethod
    def forward(ctx, x, alpha):
        ctx.alpha = alpha
        return x.view_as(x)

    @staticmethod
    def backward(ctx, grad_output):
        output = grad_output.neg() * ctx.alpha
        return output, None

# Define AlexNet architecture class
class AlexNet(nn.Module):
    def __init__(self, num_classes=1000, num_domains= 4):
        super(AlexNet, self).__init__()

        self.pretrained_alexnet = models.alexnet(pretrained=True)
        self.pretrained_alexnet.classifier = nn.Sequential(*list(self.pretrained_alexnet.classifier.children())[:-1])
        self.new_classifier = nn.Linear(4096, num_classes)

        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        # Category classifier
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes)
        )
        # Domain classifier
        #...
        self.domain_classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 1000),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(1000, 1000),
            nn.ReLU(inplace=True),
            nn.Linear(1000, num_domains)
        )

    def forward(self, x, alpha= None):
        features = self.features(x)
        features = features.view(features.size(0), -1)
        class_outputs = self.classifier(features)

        #...
        domain_output = None
        if alpha is not None:
            reverse_features = ReverseLayerF.apply(features, alpha)
            domain_output = self.domain_classifier(reverse_features)

        return class_outputs, domain_output
    
    def predict_class(self, x):
        features = self.features(x)
        features = features.view(features.size(0), -1)

        # Domain classification branch (no gradient reversal layer during testing)
        class_output = self.classifier(features)

        return class_output
    
    def predict_domain(self, x):
        features = self.features(x)
        features = features.view(features.size(0), -1)

        # Domain classification branch (no gradient reversal layer during testing)
        domain_output = self.domain_classifier(features)

        return domain_output

**OPTIMIZATION LOOP**

In [52]:
from torch.utils.data import DataLoader
import torchvision.transforms as T
from torchvision.models import AlexNet_Weights
import torch.nn.functional as F
from torchmetrics import Accuracy
import torchvision.models as models

#### DATA SETUP
# Define the transforms to use on images
dataset_transform = T.Compose([
    T.Resize(256),
    T.CenterCrop(224),
    T.ToTensor(),
    T.Normalize([0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Define the Dataset object for training & testing
train_dataset = PACSDataset(split= 'train', transform= dataset_transform)
test_dataset = PACSDataset(split= 'test', transform= dataset_transform)

# Define the DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)


#### ARCHITECTURE SETUP
# Create the Network Architecture object
model = AlexNet(num_classes= 7)
# Load pre-trained weights
#model.load_state_dict(pretrained_alexnet.state_dict()).....
# Overwrite the final classifier layer as we only have 7 classes in PACS
#model.classifier[-1] = nn.Linear(4096, 7).....

criterion_class = nn.CrossEntropyLoss()
criterion_domain = nn.CrossEntropyLoss()

#### TRAINING SETUP
# Move model to device before passing it to the optimizer
model = model.to(DEVICE)

# Create Optimizer & Scheduler objects
optimizer = opt.SGD([
    {'params': model.features.parameters()},
    {'params': model.classifier.parameters()},
    {'params': model.domain_classifier.parameters()}
], lr=0.001, momentum=0.9)
scheduler = opt.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)


#### TRAINING LOOP
#... for epoch in range(NUM_EPOCHS)...
current_step = 0
# Start iterating over the epochs
for epoch in range(NUM_EPOCHS):
  print('Starting epoch {}/{}, LR = {}'.format(epoch+1, NUM_EPOCHS, scheduler.get_last_lr()))

  # Iterate over the dataset
  for images, labels, domain_labels in train_dataloader:
    # Bring data over the device of choice
    images = images.to(DEVICE)
    labels = labels.to(DEVICE)
    domain_labels = domain_labels.to(DEVICE)

    model.train() # Sets module in training mode

    # PyTorch, by default, accumulates gradients after each backward pass
    # We need to manually set the gradients to zero before starting a new iteration
    optimizer.zero_grad() # Zero-ing the gradients

    # Forward pass to the network
    class_outputs, domain_outputs = model(images, alpha= 0.1)

    # Compute loss based on output and ground truth
    class_loss = criterion_class(class_outputs, labels)
    domain_loss = criterion_domain(domain_outputs, domain_labels)

    total_loss = class_loss + domain_loss

    # Log loss
    if current_step % LOG_FREQUENCY == 0:
      print('Step {}, Loss {}'.format(current_step, total_loss.item()))

    # Compute gradients for each layer and update weights
    total_loss.backward()  # backward pass: computes gradients
    optimizer.step() # update weights based on accumulated gradients

    current_step += 1

  # Step the scheduler
  scheduler.step()



Starting epoch 1/30, LR = [0.001, 0.001, 0.001]
Step 0, Loss 3.329279899597168
Step 10, Loss 3.3061351776123047
Starting epoch 2/30, LR = [0.001, 0.001, 0.001]
Step 20, Loss 3.265162229537964
Step 30, Loss 3.2161917686462402
Starting epoch 3/30, LR = [0.001, 0.001, 0.001]
Step 40, Loss 3.1619884967803955
Step 50, Loss 3.11891508102417
Starting epoch 4/30, LR = [0.001, 0.001, 0.001]
Step 60, Loss 3.0480637550354004
Step 70, Loss 2.980602264404297
Starting epoch 5/30, LR = [0.001, 0.001, 0.001]
Step 80, Loss 2.9236109256744385
Starting epoch 6/30, LR = [0.001, 0.001, 0.001]
Step 90, Loss 2.893913745880127
Step 100, Loss 2.794426202774048
Starting epoch 7/30, LR = [0.001, 0.001, 0.001]
Step 110, Loss 2.7078022956848145
Step 120, Loss 2.6939802169799805
Starting epoch 8/30, LR = [0.001, 0.001, 0.001]
Step 130, Loss 2.6459264755249023
Step 140, Loss 2.540217161178589
Starting epoch 9/30, LR = [0.001, 0.001, 0.001]
Step 150, Loss 2.5428316593170166
Step 160, Loss 2.496501922607422
Starting e

In [53]:
from tqdm import tqdm

#### TEST LOOP
#...
model = model.to(DEVICE) # this will bring the network to GPU if DEVICE is cuda
model.train(False) # Set Network to evaluation mode

running_corrects = 0
for images, labels, domain in tqdm(test_dataloader):
  images = images.to(DEVICE)
  labels = labels.to(DEVICE)
  domain = domain.to(DEVICE)

  # Forward Pass
  #class_outputs, domain_outputs = model(images)
  domain_outputs = model.predict_domain(images)

  # Get predictions
  _, preds = torch.max(domain_outputs, 1)

  # Update Corrects
  running_corrects += torch.sum(preds == domain).data.item()

# Calculate Accuracy
accuracy = running_corrects / float(len(test_dataset))

print('Test Accuracy: {}'.format(accuracy))

print(f'\nAccuracy on the target domain: {100 * accuracy:.2f}%')

100%|██████████| 7/7 [00:03<00:00,  2.28it/s]

Test Accuracy: 0.6270753512132823

Accuracy on the target domain: 62.71%





In [54]:
from tqdm import tqdm

#### TEST LOOP
#...
model = model.to(DEVICE) # this will bring the network to GPU if DEVICE is cuda
model.train(False) # Set Network to evaluation mode

running_corrects = 0
for images, labels, domain in tqdm(test_dataloader):
  images = images.to(DEVICE)
  labels = labels.to(DEVICE)
  domain = domain.to(DEVICE)

  # Forward Pass
  #class_outputs, domain_outputs = model(images)
  class_outputs = model.predict_class(images)

  # Get predictions
  _, preds = torch.max(class_outputs, 1)

  # Update Corrects
  running_corrects += torch.sum(preds == labels).data.item()

# Calculate Accuracy
accuracy = running_corrects / float(len(test_dataset))

print('Test Accuracy: {}'.format(accuracy))

print(f'\nAccuracy on the target domain: {100 * accuracy:.2f}%')

100%|██████████| 7/7 [00:02<00:00,  2.42it/s]

Test Accuracy: 0.22541507024265645

Accuracy on the target domain: 22.54%



