In [1]:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

import torch
import torchvision
import torch.nn as nn
import torchvision.models as models
import numpy as np
import pandas as pd
import clearml
import matplotlib.pyplot as plt
import seaborn

import warnings
warnings.filterwarnings(action='ignore')
%matplotlib inline

torch.cuda.is_available()

True

In [2]:
import ssl
import certifi

ssl._create_default_https_context = ssl.create_default_context
ssl._create_default_https_context.cafile = certifi.where()

In [3]:
task = clearml.Task.init(project_name="Caption generator", task_name="Selecting encoder", tags=['encoder', 'feature_extraction'])
logger = task.get_logger()

ClearML Task: overwriting (reusing) task id=53aa50a075cb417fbe29d42d48ab7d18
ClearML results page: https://app.clear.ml/projects/22856c5ce3ac4602a83e933c56a4600a/experiments/53aa50a075cb417fbe29d42d48ab7d18/output/log
ClearML Monitor: Could not detect iteration reporting, falling back to iterations as seconds-from-start


## Encoders factory

In [4]:
class BaseEncoder(nn.Module):
    def __init__(self, embedding_dim=None):
        super().__init__()
        self.embedding_dim=None
        self.features_dim=None

    def forward(self, x):
        pass

    def get_features_dim(self):
        return self.features_dim

In [5]:
class ResNet50(BaseEncoder):
    def __init__(self, embedding_dim=256, pretrained=True):
        super().__init__(embedding_dim)
        model = models.resnet50(pretrained=pretrained)
        self.features = nn.Sequential(*list(model.children())[:-1])

        self.feature_dim = 2048

    def forward(self, x):
        features = self.features(x)
        out = torch.flatten(features, 1)

        return out

In [6]:
class ResNet101(BaseEncoder):
    def __init__(self, embedding_dim=256, pretrained=True):
        super().__init__(embedding_dim)
        model = models.resnet101(pretrained=pretrained)
        self.features = nn.Sequential(*list(model.children())[:-1])

        self.feature_dim = 2048

    def forward(self, x):
        features = self.features(x)
        out = torch.flatten(features, x)

        return out

In [7]:
class EfficientNet(BaseEncoder):
    def __init__(self, embedding_dim=256, model_name='efficientnet_b0', pretrained=True):
        super().__init__(embedding_dim)
        weights = 'DEFAULT' if pretrained else None
        if model_name == 'efficientnet_b0':
            model = models.efficientnet_b0(weights=weights)
        elif model_name == 'efficientnet_b4':
            model = models.efficientnet_b4(weights=weights)
        else:
            raise ValueError('Unknown model (efficient net)')

        self.feature_dim = 1280 if 'b0' in model_name else 1792
        self.features = nn.Sequential(*list(model.children())[:-1])

    def forward(self, x):
        features = self.features(x)
        out = torch.flatten(features, 1)

        return out

In [8]:
class EncoderFactory:
    @staticmethod
    def create_encoder(encoder_name, embedding_dim=256, pretrained=True):
        all_encoders = {
            "resnet50": ResNet50,
            "resnet101": ResNet101,
            "efficientnet_b0": lambda **kwargs: EfficientNet(model_name="efficientnet_b0", **kwargs),
            "efficientnet_b4": lambda **kwargs: EfficientNet(model_name="efficientnet_b4", **kwargs)
        }
        selected_encoder = all_encoders[encoder_name](
            embedding_dim=embedding_dim,
            pretrained=pretrained
        )

        return selected_encoder

In [9]:
import os
import ssl
import urllib.request

ssl._create_default_https_context = ssl._create_unverified_context

import urllib.request as ur
ur._opener = None
os.environ['CURL_CA_BUNDLE'] = ''

## Dataset

In [10]:
DATA_PATH = "../data"
os.listdir(DATA_PATH)

['captions.txt', 'images']

In [11]:
for picture in os.listdir(os.path.join(DATA_PATH, "images"))[:10]:
    print(picture, end = ' | ')

583087629_a09334e1fb.jpg | 2641770481_c98465ff35.jpg | 530950375_eea665583f.jpg | 2872197070_4e97c3ccfa.jpg | 2369452202_8b0e8e25ca.jpg | 2789937754_5d1fa62e95.jpg | 543326592_70bd4d8602.jpg | 3173461705_b5cdeef1eb.jpg | 528500099_7be78a0ca5.jpg | 353180303_6a24179c50.jpg | 

In [15]:
import yaml
from torch.utils.data import Dataset
from PIL import Image
from nltk.tokenize import word_tokenize

CONFIG_FILE = "./config.yaml"

In [42]:
class FlickrDataset(Dataset):
    def __init__(self, dataset_size, data_path=DATA_PATH, transform=None):
        super().__init__()
        self.config = config
        self.data_path = data_path
        self.transform = transform

        self.images = os.path.join(data_path, "images")
        self.labels = os.path.join(data_path, "captions.txt")
        self.image_caption = {}
        self._preprocess_labels()

        self.images_idx = list(self.image_caption.keys())[:dataset_size]
        self.labels_idx = list(self.image_caption.values())[:dataset_size]

    def _preprocess_labels(self):
        with open(self.labels, 'r', encoding='utf-8') as f:
            for line in f:
                filename, label = line.strip().split(",", 1)
                tokens = word_tokenize(label)
                self.image_caption[filename] = tokens
    
    def __len__(self):
        return len(os.listdir(self.images))

    def __getitem__(self, idx):
        image_name = self.images_idx[idx]
        label = self.labels_idx[idx]

        image = Image.open(os.path.join(self.data_path, "images", image_name)).convert('RGB')
        if self.transform:
            image = self.transform(image)

        return image, label

In [43]:
with open('config.yaml', 'r') as config:
    cfg = yaml.safe_load(config)

In [44]:
first_file = os.path.join(DATA_PATH, 'images', os.listdir(os.path.join(DATA_PATH, 'images'))[0])

first_image = Image.open(first_file).convert('RGB')
first_image.size

(500, 375)

In [46]:
from torchvision.transforms import v2
import time

total_size = cfg['dataset']['train_size'] + cfg['dataset']['test_size']
transform = v2.Compose([
    v2.RandomResizedCrop(size=(224, 224), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

dataset = FlickrDataset(dataset_size=total_size, data_path=cfg['dataset']['path'], transform=transform)

## Class for all needed tests

In [47]:
class ComparisonTests:
    @staticmethod
    def num_parameters(model, return_trainable=True):
        total_params, trainable_params = [], []
        for p in model.parameters():
            total_params.append(p.numel())
            if return_trainable and p.requires_grad:
                trainable_params.append(p.numel())

        total_sum, trainable_sum = sum(total_params), sum(trainable_params)
        total_trainable_ratio = trainable_sum / total_sum

        return {
            'total_params': total_sum,
            'trainable_params': trainable_sum,
            'total_trainable_ratio': total_trainable_ratio
        }

    @staticmethod
    def model_inference_time(model, test_loader, device, n_batches):
        model = model.to(device)
        model.eval()

        times = []
        with torch.no_grad():
            for idx, (images, _) in enumerate(test_loader):
                if idx >= n_batches:
                    break

                start = time.time()
                _ = model(images)
                torch.cuda.synchronize()
                end = time.time()

                times.append(end - start)

        sum_time = sum(time)
        mean_time = np.mean(time)
        batch_shape = images.shape[0]

        return {
            'sum_time_ms': sum_time * 1000,
            'mean_time_ms': mean_time * 1000,
            'throughput_images_per_sec': batch_shape / mean_time
        }
        

    @staticmethod
    def model_memory_usage(model, device='cuda'):
        model = model.to(device)
        param_memory = sum(p.numel() * 4 for p in model.parameters()) / (1024 ** 2)
        buffer_memory = sum(b.numel() * 4 for b in model.buffers()) / (1024 ** 2)
        dummy_input = torch.randn(32, 3, 224, 224).to(device)
        torch.cuda.reset_peak_memory_stats()
        torch.cuda.empty_cache()
        
        with torch.no_grad():
            _ = model(dummy_input)
        
        activation_memory = torch.cuda.max_memory_allocated() / (1024 ** 2)
        
        return {
            'param_memory_mb': param_memory,
            'buffer_memory_mb': buffer_memory,
            'activation_memory_mb': activation_memory,
            'total_memory_mb': param_memory + buffer_memory + activation_memory
        }

    @staticmethod
    def compute_main_statistics(model, test_loader, device='cuda', n_batches=10):
        pass

    @classmethod
    def comparion_tests(cls, model, test_loader, device='cuda', return_trainable=True, n_batches=10):
        pass