# Imports

In [8]:
import numpy as np
import torch
from torchvision import transforms as T
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

from tqdm.notebook import tqdm
from torch.utils.tensorboard import SummaryWriter

import sys
sys.path.append("..")

from data import pacs
import models.resnet_ms as resnet_ms

# Config

## Regarding Dataset

In [None]:
NUM_CLASSES = 7
CLASSES = ["dog", "elephant", "giraffe", "guitar", "horse", "house", "person"]
DOMAINS = ["art_painting", "cartoon", "photo", "sketch"]

## Hyperparameters

In [None]:
EPOCHS = 25
BATCH_SIZE = 32
LEARNING_RATE = 1e-4
REGULARIZATION = 1e-2
MODEL = resnet_ms.resnet18_fc512_ms12_a0d2
USE_PRETRAINED = True
OPTIMIZER = optim.AdamW
OPTIMIZER_KWARGS = {
    "lr": LEARNING_RATE, # Standard value for AdamW: 1e-3
    "weight_decay": REGULARIZATION # Standard value for AdamW: 1e-2
}
SCHEDULER = optim.lr_scheduler.ReduceLROnPlateau
SCHEDULER_KWARGS = {"mode": "min", "patience": 3}
EARLY_STOPPING_PATIENCE = 5
EARLY_STOPPING_DELTA = 1e-5
AUGMENTATIONS = ()
NUM_SEEDS = 3

## Image Normalization

In [11]:
# Values for pretrained ResNet
pretrained_image_transform = T.Compose([
    *AUGMENTATIONS,
    T.Resize(256),
    T.CenterCrop(224),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

## Device

In [12]:
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Device: {device}")

Device: cuda


## Abstract model building, optimizer and scheduler

In [13]:
build_model = lambda: MODEL(NUM_CLASSES, loss='softmax', pretrained=USE_PRETRAINED)
build_optimizer = lambda model: OPTIMIZER(model.parameters(), **OPTIMIZER_KWARGS)
build_scheduler = lambda optimizer: SCHEDULER(optimizer, **SCHEDULER_KWARGS)

# Set seed for reproducibility

In [14]:
# seed = 42
# torch.manual_seed(42)
# if device == torch.device("cuda"):
#     torch.cuda.manual_seed(seed)
#     torch.backends.cudnn.deterministic = True

# Training

In [15]:
writer = SummaryWriter()
%load_ext tensorboard
%tensorboard --logdir ./runs

Launching TensorBoard...

## Training Loop

In [16]:
class AverageMeter:
    def __init__(self):
        self.reset()
    
    def reset(self):
        self.avg = 0
        self.sum = 0
        self.count = 0
    
    def update(self, val, n):
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

In [17]:
def accuracy(target, output):
    batch_size = target.shape[0]
    _, pred = torch.max(output, dim=-1)
    correct = pred.eq(target).sum()
    return correct.item() / batch_size

In [18]:
def train(epoch: int,
        target_domain: str,
        data_loader:torch.utils.data.DataLoader,
        model: nn.Module,
        optimizer: optim.Optimizer
        ) -> tuple[float, float]:
    """train one epoch"""
    model.train()
    losses = AverageMeter()
    accs = AverageMeter()

    for i, (data, target) in enumerate(data_loader):
        step = (epoch - 1) * len(data_loader) + i + 1
        data = data.to(device)
        target = target.to(device)

        out = model(data)
        loss = F.cross_entropy(out, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        acc = accuracy(target, out)
        losses.update(loss.item(), out.shape[0])
        accs.update(acc, out.shape[0])

        writer.add_scalar(f'Loss/Train/target={target_domain}', loss.item(), step)
        writer.add_scalar(f'Accuracy/Train/target={target_domain}', acc, step)

    return losses.avg, accs.avg

## Evaluation

In [19]:
def evaluate(data_loader: torch.utils.data.DataLoader, model: nn.Module, phase="val") -> tuple[float, float]:
    model.eval()

    losses = AverageMeter()
    accs = AverageMeter()

    with torch.no_grad():
        for data, target in data_loader:
            data = data.to(device)
            target = target.to(device)

            out = model(data)

            # The implementation returns only the feature vector rather than the classification logits.
            # To compare the labels, we therefore must apply the classification layer manually:
            out = model.classifier(out)

            loss = F.cross_entropy(out, target)
            acc = accuracy(target, out)

            losses.update(loss.item(), out.shape[0])
            accs.update(acc, out.shape[0])
    
    return losses.avg, accs.avg

## Training Loop

In [None]:
all_results = {d: [] for d in DOMAINS}
all_results['avgs'] = []
all_results['worst'] = []

for _ in tqdm(range(NUM_SEEDS), desc="Seeds"):
    results = {}

    for target_domain in tqdm(DOMAINS, desc="Target Domain"):
        model = build_model()
        model = model.to(device)

        optimizer = build_optimizer(model)
        scheduler = build_scheduler(optimizer)

        if not USE_PRETRAINED:
            img_mean, img_std = pacs.get_nomalization_stats(target_domain)
            print(f"Normalization values excluding domain {target_domain}:\n\tmean: {img_mean}\n\tstd: {img_std}")
            image_transform = T.Compose([
                *AUGMENTATIONS,
                T.Resize(256),
                T.CenterCrop(224),
                T.ToTensor(),
                T.Normalize(mean=img_mean, std=img_std)
            ])
        else:
            image_transform = pretrained_image_transform

        train_loader, test_loader = pacs.get_data_loaders(target_domain, train_batch_size=BATCH_SIZE, transform=image_transform, shuffle_test=True, drop_last=True)

        best_loss = float('inf')
        patience_counter = 0
        for epoch in tqdm(range(1, EPOCHS + 1), desc=f"Epoch ({target_domain})"):
            train_loss, train_acc = train(epoch, target_domain, train_loader, model, optimizer)
            test_loss, test_acc = evaluate(test_loader, model)

            writer.add_scalar(f"Loss/Test/target={target_domain}", test_loss, epoch)
            writer.add_scalar(f"Accuracy/Test/target={target_domain}", test_acc, epoch)

            scheduler.step(test_loss)

            if best_loss - test_loss < EARLY_STOPPING_DELTA and (patience_counter := patience_counter+1) > EARLY_STOPPING_PATIENCE:
                break

            if test_loss < best_loss:
                best_acc = test_acc
                torch.save(model.state_dict(), f"../checkpoints/mixstyle/best_{target_domain}.pt")

        model.load_state_dict(torch.load(f"../checkpoints/mixstyle/best_{target_domain}.pt"))
        _, acc = evaluate(test_loader, model, phase="final")

        results[target_domain].append(acc)

    avg_acc = np.mean(*results.values())
    worst_case_acc = np.min(*results.values())

    for d in DOMAINS:
        all_results[d].extend(results)
    all_results['avgs'].append(avg_acc)
    all_results['worst'].append(worst_case_acc)

print("Average Accuracy:\n" +
      "{}".format("".join(f"\t{d}: {np.mean(all_results[d])}, std: {np.std(all_results[d])}\n" for d in DOMAINS)) +
      f"\ttotal: {np.mean(all_results['avgs'])}, std: {np.std(all_results['avgs'])}"
      "Worst-case Accuracy:\n" +
      "{}".format("".join(f"\t{d}: {np.min(all_results[d])}" for d in DOMAINS)) +
      f"\ttotal: {np.mean(all_results['worst'])}, std: {np.std(all_results['worst'])}"
)

Target Domain:   0%|          | 0/4 [00:00<?, ?it/s]

Insert MixStyle after the following layers: ['layer1', 'layer2']


Epoch (cartoon):   0%|          | 0/25 [00:00<?, ?it/s]

KeyboardInterrupt: 