In [1]:
import torch
import open_clip
import albumentations as A
import numpy as np
from albumentations.pytorch import ToTensorV2
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from sklearn.metrics import f1_score
import pytorch_lightning as pl
from seed import seed_everything

%matplotlib inline

In [2]:
torch.cuda.empty_cache()

In [3]:
seed_everything(137)

# **PREPROCESSING**

In [4]:
BATCH_SIZE = 32
NUM_WORKERS = 8
LR = 0.001
DR = 0.2
EPOCHS = 10
NUM_CLASSES = 10
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

## Normalisation values

In [5]:
# image_size = (224, 224) 
# transform = transforms.Compose([
#     transforms.Resize(image_size),  # Изменение размера
#     transforms.ToTensor()           # Преобразование в тензор
# ])

# # Датасет без нормализации
# dataset = datasets.ImageFolder(root="./data/train/", transform=transform)
# loader = DataLoader(dataset, batch_size=128, shuffle=False, num_workers=8, persistent_workers=True)

# # Инициализируем тензоры для хранения сумм и квадратов сумм
# mean = torch.zeros(3)
# std = torch.zeros(3)
# total_images = 0

# for images, _ in loader:
#     # Количество изображений в текущем батче
#     batch_size = images.size(0)
#     # Суммируем значения по каналам
#     mean += torch.mean(images, dim=[0, 2, 3]) * batch_size
#     # Суммируем квадраты значений по каналам
#     std += torch.std(images, dim=[0, 2, 3]) * batch_size
#     # Общее количество изображений
#     total_images += batch_size

# # Усредняем по всему датасету
# mean /= total_images
# std /= total_images

# print("Mean:", mean.numpy()) # Mean: [0.48012924 0.4843966  0.49254295]
# print("Std:", std.numpy()) # Std: [0.2613408 0.263237  0.269442 ]

## Input Transformations

In [6]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Функция-обёртка для Albumentations
class AlbumentationsTransform:
    def __init__(self, transform):
        self.transform = transform

    def __call__(self, img):
        try:
            img = np.array(img)  # Преобразуем PIL.Image в numpy
            augmented = self.transform(image=img)  # Передаём в albumentations
            return augmented["image"]
        except Exception as e:
            print(f"Error in Albumentations transformation: {e}")
            return img  # Вернем изображение без изменений в случае ошибки


# Определим преобразования с использованием albumentations
transform_train = A.Compose([
    A.HorizontalFlip(p=0.3),
    A.Affine(scale=(0.7, 1.3), translate_percent=(0.1, 0.1), rotate=(-15, 15), p=0.5),
    A.CoarseDropout(max_holes=1, max_height=10, max_width=12, mask_fill_value=0, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.HueSaturationValue(p=0.3),
    A.RandomGamma(p=0.3),
    A.Resize(width=224, height=224),
    A.Normalize(mean=(0.4801, 0.4844, 0.4925), std=(0.261, 0.263, 0.269)),
    ToTensorV2(),
])

transform_test = A.Compose([
    A.Resize(width=224, height=224),
    A.Normalize(mean=(0.4801, 0.4844, 0.4925), std=(0.261, 0.263, 0.269)),
    ToTensorV2(),
])

  A.CoarseDropout(max_holes=1, max_height=10, max_width=12, mask_fill_value=0, p=0.5),


## DataLoaders

In [7]:
from torch.utils.data import random_split

class DataModule(pl.LightningDataModule):
    def __init__(self, batch_size=BATCH_SIZE, train_split=0.8):
        super().__init__()
        self.batch_size = batch_size
        self.train_split = train_split
        self.train_dataset = None
        self.val_dataset = None
        
    def setup(self, stage: str):
        if stage == "fit" or stage is None:
            full_dataset = datasets.ImageFolder(
                root="./data/train/",
                transform=AlbumentationsTransform(transform_train)
            )
            train_size = int(self.train_split * len(full_dataset))
            val_size = len(full_dataset) - train_size
            self.train_dataset, self.val_dataset = random_split(
                full_dataset, [train_size, val_size]
            )

        if stage == "test" or stage is None:
            self.test_dataset = datasets.ImageFolder(
                root="./data/test_upload/",
                transform=AlbumentationsTransform(transform_test)
            )

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset, 
            batch_size=self.batch_size, 
            shuffle=True,
            num_workers=NUM_WORKERS,
            persistent_workers=True,
        ) if self.train_dataset else None
    
    def val_dataloader(self):
        return DataLoader(
            self.val_dataset, 
            batch_size=self.batch_size, 
            shuffle=True,
            num_workers=NUM_WORKERS,
            persistent_workers=True,
        ) if self.val_dataset else None
    
    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=NUM_WORKERS,
            persistent_workers=True,
        ) if self.test_dataset else None

# **BASE MODEL**

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import wandb
import pytorch_lightning as pl
from torchmetrics import F1Score, Accuracy
from pytorch_lightning.callbacks.early_stopping import EarlyStopping


class BottleneckBlock(nn.Module):
    def __init__(
        self, in_channels, out_channels, stride=1, expansion=4, dropout_rate=DR
    ):
        super().__init__()
        bottleneck_channels = out_channels // expansion

        self.conv1 = nn.Conv2d(
            in_channels, bottleneck_channels, kernel_size=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(bottleneck_channels)
        self.dropout1 = nn.Dropout2d(p=dropout_rate)

        self.conv2 = nn.Conv2d(
            bottleneck_channels,
            bottleneck_channels,
            kernel_size=3,
            stride=stride,
            padding=1,
            bias=False,
        )
        self.bn2 = nn.BatchNorm2d(bottleneck_channels)
        self.dropout2 = nn.Dropout2d(p=dropout_rate)

        self.conv3 = nn.Conv2d(
            bottleneck_channels, out_channels, kernel_size=1, bias=False
        )
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.dropout3 = nn.Dropout2d(p=dropout_rate)

        self.relu = nn.ReLU(inplace=True)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_channels, out_channels, kernel_size=1, stride=stride, bias=False
                ),
                nn.BatchNorm2d(out_channels),
            )

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.dropout1(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        out = self.dropout2(out)

        out = self.conv3(out)
        out = self.bn3(out)
        out = self.dropout3(out)

        out += self.shortcut(residual)
        return self.relu(out)


class CarClassifier(pl.LightningModule):
    def __init__(self, num_classes=NUM_CLASSES, learning_rate=LR, dropout_rate=DR):
        super().__init__()
        self.save_hyperparameters()
        self.learning_rate = learning_rate
        self.dropout_rate = dropout_rate

        self.criterion = nn.CrossEntropyLoss()
        self.f1_score = F1Score(num_classes=num_classes, task="multiclass")
        # self.val_acc = Accuracy(num_classes=num_classes, task="multiclass")

        # Backbone with dropout
        self.layer1 = BottleneckBlock(3, 64, stride=2, dropout_rate=dropout_rate)
        self.layer2 = BottleneckBlock(64, 128, stride=2, dropout_rate=dropout_rate)
        self.layer3 = BottleneckBlock(128, 256, stride=2, dropout_rate=dropout_rate)

        # Classifier
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p=dropout_rate)  # Dropout перед полносвязным слоем
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.dropout(x)  # Применяем dropout перед классификацией
        return self.fc(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()

        self.log("train_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("train_acc", acc, on_step=False, on_epoch=True, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = self.val_acc(preds, y)
        f1 = self.f1_score(preds, y)

        self.log("val_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("val_f1", f1, on_step=False, on_epoch=True, prog_bar=True)

        return {"val_loss": loss, "val_f1": f1}

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
        return {
            "optimizer": optimizer,
            "lr_scheduler": scheduler,
            "monitor": "val_loss",
        }

In [7]:
from pytorch_lightning.loggers import WandbLogger


# В основном скрипте обучения:
def train_model():
    # Инициализация wandb
    wandb_logger = WandbLogger(project='car_classifier', log_model='all')
    
    # Создание модели
    model = CarClassifier()

    data_module = DataModule()
    
    # Создание тренера с wandb logger
    trainer = pl.Trainer(
        max_epochs=10,
        logger=wandb_logger,
        callbacks=[EarlyStopping(monitor='val_loss', patience=3)],
    )
        
    # Обучение модели
    trainer.fit(model, datamodule=data_module)

In [8]:
torch.set_float32_matmul_precision('medium')
train_model()
wandb.finish()

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
[34m[1mwandb[0m: Currently logged in as: [33mmoscowzhuravlev[0m ([33mmoscowzhuravlev-selfemployedalex[0m). Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type               | Params | Mode 
---------------------------------------------------------
0 | criterion | CrossEntropyLoss   | 0      | train
1 | f1_score  | MulticlassF1Score  | 0      | train
2 | val_acc   | MulticlassAccuracy | 0      | train
3 | layer1    | BottleneckBlock    | 3.9 K  | train
4 | layer2    | BottleneckBlock    | 24.2 K | train
5 | layer3    | BottleneckBlock    | 95.5 K | train
6 | avgpool   | AdaptiveAvgPool2d  | 0      | train
7 | flatten   | Flatten            | 0      | train
8 | dropout   | Dropout            | 0      | train
9 | fc        | Linear             | 2.6 K  | train
---------------------------------------------------------
126 K     Trainable params
0         Non-trainable params
126 K     Total params
0.505     Total estimated model params size (MB)
49        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

# **FINE TUNED ViT-B-16**

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import open_clip
import pytorch_lightning as pl
from torchmetrics.classification import MulticlassF1Score, Accuracy

class CarClassifierViT(pl.LightningModule):
    def __init__(self, num_classes=10, lr=1e-3, freeze_backbone_epochs=3):
        super().__init__()
        self.save_hyperparameters()
        
        # Загружаем предобученную модель
        self.model, _, _ = open_clip.create_model_and_transforms("ViT-B-16", pretrained="openai")
        self.model.visual.proj = None  # Убираем ненужный projection
        
        # Меняем head (классификационный слой)
        embed_dim = self.model.visual.ln_post.normalized_shape[0]
        self.model.visual.head = nn.Linear(embed_dim, num_classes)
        
        self.criterion = nn.CrossEntropyLoss()
        self.f1_score = MulticlassF1Score(num_classes=num_classes, average="macro")
        self.acc = Accuracy(num_classes=num_classes, task="multiclass")
        self.lr = lr
        self.freeze_backbone_epochs = freeze_backbone_epochs
    
    def forward(self, x):
        features = self.model.encode_image(x)  # Получаем эмбеддинги
        logits = self.model.visual.head(features)  # Пропускаем через head
        return logits
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = self.acc(preds, y)
        self.log("train_loss", loss, on_epoch=True, on_step=False, prog_bar=True)
        self.log("train_acc", acc, on_epoch=True, on_step=False, prog_bar=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.argmax(logits, dim=1)
        f1 = self.f1_score(preds, y)
        self.log("val_loss", loss, on_epoch=True, on_step=False, prog_bar=True)
        self.log("val_f1", f1, on_epoch=True, on_step=False, prog_bar=True)
        return loss
    
    def configure_optimizers(self):
        optimizer = optim.AdamW(self.model.visual.head.parameters(), lr=self.lr)
        scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
        return [optimizer], [scheduler]
    
    def on_train_epoch_start(self):
        if self.current_epoch == self.freeze_backbone_epochs:
            for param in self.model.visual.parameters():
                param.requires_grad = True
            optimizer = self.optimizers()
            backbone_params = set(self.model.visual.parameters())
            existing_params = set(p for group in optimizer.param_groups for p in group['params'])
            new_params = backbone_params - existing_params
            if new_params:
                optimizer.add_param_group({"params": list(new_params), "lr": self.lr * 0.1})


In [9]:
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.callbacks.early_stopping import EarlyStopping


# В основном скрипте обучения:
def train_model():

    torch.set_float32_matmul_precision('medium')

    # Инициализация wandb
    wandb_logger = WandbLogger(project='car_classifier', log_model='all')
    
    # Создание модели
    model = CarClassifierViT()

    data_module = DataModule()
    
    # Создание тренера с wandb logger
    trainer = pl.Trainer(
        max_epochs=1,
        logger=wandb_logger,
        callbacks=[EarlyStopping(monitor='val_loss', patience=3)],
    )
        
    # Обучение модели
    trainer.fit(model, datamodule=data_module)

In [10]:
train_model()

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
[34m[1mwandb[0m: Currently logged in as: [33mmoscowzhuravlev[0m ([33mmoscowzhuravlev-selfemployedalex[0m). Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type               | Params | Mode 
---------------------------------------------------------
0 | model     | CLIP               | 149 M  | train
1 | criterion | CrossEntropyLoss   | 0      | train
2 | f1_score  | MulticlassF1Score  | 0      | train
3 | acc       | MulticlassAccuracy | 0      | train
---------------------------------------------------------
149 M     Trainable params
0         Non-trainable params
149 M     Total params
596.941   Total estimated model params size (MB)
280       Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/home/alex/miniconda3/envs/pytdml/lib/python3.10/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:476: Your `val_dataloader`'s sampler has shuffling enabled, it is strongly recommended that you turn shuffling off for val/test dataloaders.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

NameError: name 'wandb' is not defined