In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import pytorch_lightning as pl
from torchmetrics.functional import accuracy
import torchmetrics
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision.models import resnet50, ResNet50_Weights, swin_t, Swin_T_Weights
# from models import FineTuningModel
from pytorch_lightning.loggers import TensorBoardLogger
import torch.optim as optim
from pytorch_lightning.callbacks import ModelCheckpoint

In [2]:
data_dir = './data'
batch_size = 32
num_classes = 37
max_epochs = 10

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#The images are resized to resize_size=[256] using interpolation=InterpolationMode.BILINEAR, followed by a central crop 
#of crop_size=[224]. Finally the values are first rescaled to [0.0, 1.0] and then normalized using mean=[0.485, 0.456, 0.406] 
#and std=[0.229, 0.224, 0.225].
resnet_train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(232, interpolation=transforms.InterpolationMode.BILINEAR),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
resnet_test_transforms = transforms.Compose([
    transforms.Resize(232, interpolation=transforms.InterpolationMode.BILINEAR),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

resnet_train_dataset = torchvision.datasets.OxfordIIITPet(root = './data',
                                             split = 'trainval',
                                             transform = resnet_train_transforms,
                                             download = True)

resnet_test_dataset = torchvision.datasets.OxfordIIITPet(root = './data',
                                            split = 'test',
                                            transform = resnet_test_transforms,
                                            download=True)

resnet_train_loader = torch.utils.data.DataLoader(dataset = resnet_train_dataset,
                                           batch_size = batch_size,
                                           shuffle = True,
                                           num_workers = 4)


resnet_test_loader = torch.utils.data.DataLoader(dataset = resnet_test_dataset,
                                           batch_size = batch_size,
                                           shuffle = False,
                                           num_workers = 4)

#The images are resized to resize_size=[232] using interpolation=InterpolationMode.BICUBIC, followed by a central crop 
#of crop_size=[224]. Finally the values are first rescaled to [0.0, 1.0] and then normalized using mean=[0.485, 0.456, 0.406] 
#and std=[0.229, 0.224, 0.225].
swin_train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(232, interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
swin_test_transforms = transforms.Compose([
    transforms.Resize(232, interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

swin_train_dataset = torchvision.datasets.OxfordIIITPet(root = './data',
                                             split = 'trainval',
                                             transform = swin_train_transforms,
                                             download = True)

swin_test_dataset = torchvision.datasets.OxfordIIITPet(root = './data',
                                            split = 'test',
                                            transform = swin_test_transforms,
                                            download=True)

swin_train_loader = torch.utils.data.DataLoader(dataset = swin_train_dataset,
                                           batch_size = batch_size,
                                           shuffle = True,
                                           num_workers = 4)


swin_test_loader = torch.utils.data.DataLoader(dataset = swin_test_dataset,
                                           batch_size = batch_size,
                                           shuffle = False,
                                           num_workers = 4)



In [3]:
class FineTuningModel(pl.LightningModule):
    def __init__(self, backbone, num_classes, model):
        super().__init__()
        self.backbone = backbone
        self.num_classes = num_classes

        # Replace the output layer with a new fully connected layer
        if model == "resnet":
            num_features = self.backbone.fc.in_features
            self.backbone.fc = nn.Linear(num_features, num_classes)
        elif model == "swin":
            num_features = self.backbone.head.in_features
            self.backbone.head = nn.Linear(num_features, num_classes)

        # Define the loss function
        self.loss_function = nn.CrossEntropyLoss()

    def forward(self, x):
        return self.backbone(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.forward(x)
        train_loss = self.loss_function(y_hat, y)
        prediction = torch.argmax(y_hat, dim=1)
        correct = torch.sum(y == prediction).item()
        tensorboard_logs = {'train_acc_step': correct, 'train_loss_step': train_loss}

        return {'loss': train_loss, "correct": correct, "prediction_length": len(y), 'log': tensorboard_logs}

    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()

        train_accuracy = sum([x['correct'] for x in outputs]) / sum(x['prediction_length'] for x in outputs)
        # tensorboard_logs = {'train_accuracy': train_accuracy, 'train_loss': avg_loss, 'step': self.current_epoch}
        self.log('step', self.trainer.current_epoch)
        self.log('train_loss', avg_loss, logger=True, prog_bar=True, on_epoch=True, on_step=False)
        self.log('train_accuracy', train_accuracy, logger=True, prog_bar=True, on_epoch=True, on_step=False)
        # return {'loss': avg_loss, 'log': tensorboard_logs}

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.forward(x)
        test_loss = self.loss_function(y_hat, y)
        prediction = torch.argmax(y_hat, dim=1)
        correct = torch.sum(y == prediction).item()
        # self.log('val_loss', test_loss)
        return {'val_loss': test_loss, "correct": correct, "prediction_length": len(y)}
    
    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()

        val_acc = sum([x['correct'] for x in outputs]) / sum(x['prediction_length'] for x in outputs)
        # tensorboard_logs = {'val_loss': avg_loss, 'val_acc': val_acc, 'step': self.current_epoch}
        self.log('step', self.trainer.current_epoch)
        self.log('val_loss', avg_loss, logger=True, prog_bar=True, on_epoch=True, on_step=False)
        self.log('val_acc', val_acc, logger=True, prog_bar=True, on_epoch=True, on_step=False)
         
        # return {'log': tensorboard_logs}

    def configure_optimizers(self):
        optimizer = optim.AdamW(self.parameters())
        return optimizer
    

## Fine-tune output layer

In [4]:
# Fine-tune ResNet-50
resnet_output = resnet50(weights=ResNet50_Weights.DEFAULT)
for param in resnet_output.parameters():
    param.requires_grad = False
resnet_output.fc.requires_grad_(True)
resnet_model_1 = FineTuningModel(backbone=resnet_output, num_classes=num_classes, model="resnet")

resnet_output_logger = TensorBoardLogger('logs', name='resnet_output_logger')
trainer = pl.Trainer(accelerator='gpu', max_epochs=max_epochs, logger=resnet_output_logger)
trainer.fit(resnet_model_1, resnet_train_loader, resnet_test_loader)



GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type             | Params
---------------------------------------------------
0 | backbone      | ResNet           | 23.6 M
1 | loss_function | CrossEntropyLoss | 0     
---------------------------------------------------
75.8 K    Trainable params
23.5 M    Non-trainable params
23.6 M    Total params
94.335    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")
Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x0000021A294604C0>
Traceback (most recent call last):
  File "C:\Users\23566\AppData\Local\Programs\Python\Python310\lib\site-packages\torch\utils\data\dataloader.py", line 1466, in __del__
    self._shutdown_workers()
  File "C:\Users\23566\AppData\Local\Programs\Python\Python310\lib\site-packages\torch\utils\data\dataloader.py", line 1424, in _shutdown_workers
    if self._persistent_workers or self._workers_status[worker_id]:
AttributeError: '_MultiProcessingDataLoaderIter' object has no attribute '_workers_status'


In [5]:
# Fine-tune Swin-T
swin_output = swin_t(weights=Swin_T_Weights.DEFAULT)
for param in swin_output.parameters():
    param.requires_grad = False
swin_output.head.requires_grad_(True)
swin_model_1 = FineTuningModel(backbone=swin_output, num_classes=num_classes, model="swin")

swin_output_logger = TensorBoardLogger('logs', name='swin_output_logger')
trainer = pl.Trainer(accelerator='gpu', max_epochs=max_epochs, logger=swin_output_logger)
trainer.fit(swin_model_1, swin_train_loader, swin_test_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type             | Params
---------------------------------------------------
0 | backbone      | SwinTransformer  | 27.5 M
1 | loss_function | CrossEntropyLoss | 0     
---------------------------------------------------
28.5 K    Trainable params
27.5 M    Non-trainable params
27.5 M    Total params
110.191   Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x0000021A294604C0>
Traceback (most recent call last):
  File "C:\Users\23566\AppData\Local\Programs\Python\Python310\lib\site-packages\torch\utils\data\dataloader.py", line 1466, in __del__
    self._shutdown_workers()
  File "C:\Users\23566\AppData\Local\Programs\Python\Python310\lib\site-packages\torch\utils\data\dataloader.py", line 1424, in _shutdown_workers
    if self._persistent_workers or self._workers_status[worker_id]:
AttributeError: '_MultiProcessingDataLoaderIter' object has no attribute '_workers_status'


## Fine-tune all units

In [6]:
# Fine-tune ResNet-50
resnet_all = resnet50(weights=ResNet50_Weights.DEFAULT)
#for param in resnet_all.parameters():
#    param.requires_grad = True
# resnet_all.fc.requires_grad_(True)
resnet_model_2 = FineTuningModel(backbone=resnet_all, num_classes=num_classes, model="resnet")

resnet_all_logger = TensorBoardLogger('logs', name='resnet_all_logger')
trainer = pl.Trainer(accelerator='gpu', max_epochs=max_epochs, logger=resnet_all_logger)
trainer.fit(resnet_model_2, resnet_train_loader, resnet_test_loader)



GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type             | Params
---------------------------------------------------
0 | backbone      | ResNet           | 23.6 M
1 | loss_function | CrossEntropyLoss | 0     
---------------------------------------------------
23.6 M    Trainable params
0         Non-trainable params
23.6 M    Total params
94.335    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

In [7]:
# Fine-tune Swin-T
swin_all = swin_t(weights=Swin_T_Weights.DEFAULT)
for param in swin_all.parameters():
    param.requires_grad = True
swin_all.head.requires_grad_(True)
swin_model_2 = FineTuningModel(backbone=swin_all, num_classes=num_classes, model="swin")

swin_all_logger = TensorBoardLogger('logs', name='swin_all_logger')
trainer = pl.Trainer(accelerator='gpu', max_epochs=max_epochs, logger=swin_all_logger)
trainer.fit(swin_model_2, swin_train_loader, swin_test_loader)

# tensorboard --logdir logs

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type             | Params
---------------------------------------------------
0 | backbone      | SwinTransformer  | 27.5 M
1 | loss_function | CrossEntropyLoss | 0     
---------------------------------------------------
27.5 M    Trainable params
0         Non-trainable params
27.5 M    Total params
110.191   Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]



Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]



Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.
