In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Dataset, random_split, Subset
from torch.optim.lr_scheduler import ReduceLROnPlateau
import os
from PIL import Image
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import os


class Config:
    DATA_PATH = "data/dataset_detr_cropped" 
    CLASSES = ["Большой Желтохохлый Какаду",
               "Буроухий Краснохвостый Попугай",
               "Волнистый Попугайчик",
               "Зеленокрылый Ара",
               "Индийский кольчатый попугай",
               "Корелла",
               "Королевский Попугай",
               "Красная Розелла",
               "Краснохвостый Траурный Какаду",
               "Красный Ара",
               "Розовощёкий Неразлучник",
               "Розовый Какаду",
               "Сине-жёлтый Ара",
               "Украшенный Лорикет",
               "Черноголовый Попугай"]
    IMG_SIZE = 224
    
    ARCHITECTURE = "ImprovedAlexNet"     
    PRETRAINED = False   
    MODEL_NAME = "ImprovedAlexNet_DETR_NEW"
    SAVE_PATH = f"results/models/{MODEL_NAME}.pth"           
    LEARNING_CURVES_PATH = f"results/learning_curves/{MODEL_NAME}.png"

    BATCH_SIZE = 64
    EPOCHS = 50
    LR = 0.001
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


In [2]:
class CustomResizeTransform:
    def __init__(self, target_size):
        self.target_size = target_size
    
    def __call__(self, img):
        width, height = img.size
        
        if width <= self.target_size and height <= self.target_size:
            return img.resize((self.target_size, self.target_size), Image.BILINEAR)
        
        else:
            if width > height:
                new_width = self.target_size
                new_height = int(height * (self.target_size / width))
            else:
                new_height = self.target_size
                new_width = int(width * (self.target_size / height))
            
            img = img.resize((new_width, new_height), Image.BILINEAR)
            
            if img.size[0] < self.target_size or img.size[1] < self.target_size:
                new_img = Image.new(img.mode, (self.target_size, self.target_size))
                new_img.paste(img, ((self.target_size - img.size[0]) // 2, 
                                   (self.target_size - img.size[1]) // 2))
                return new_img
            else:
                return img

In [3]:
train_transform = transforms.Compose([
    CustomResizeTransform(Config.IMG_SIZE),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15), 
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

test_transform = transforms.Compose([
    CustomResizeTransform(Config.IMG_SIZE), 
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

In [4]:
class AlexNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 96, 11, stride=4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, 2),
            nn.Conv2d(96, 256, 5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, 2),
            nn.Conv2d(256, 384, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 384, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((6, 6))
        )
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256*6*6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, len(Config.CLASSES)),
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), 256*6*6)
        return self.classifier(x)

In [5]:
class ImprovedAlexNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 96, 11, stride=4),
            nn.BatchNorm2d(96),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, 2),
            
            nn.Conv2d(96, 256, 5, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, 2),
            
            nn.Conv2d(256, 384, 3, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(384, 384, 3, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(384, 512, 3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            
            nn.AdaptiveAvgPool2d((6, 6))
        )
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(512*6*6, 4096),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(4096),
            
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(4096),
            
            nn.Linear(4096, len(Config.CLASSES)),
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), 512*6*6)
        return self.classifier(x)

In [6]:
class CustomResNet18(nn.Module):
    def __init__(self):
        super().__init__()
        resnet = models.resnet18(pretrained=True)

        self.features = nn.Sequential(
            resnet.conv1,
            resnet.bn1,
            resnet.relu,
            resnet.maxpool,
            resnet.layer1,
            resnet.layer2,
            resnet.layer3,
            resnet.layer4,
            resnet.avgpool
        )

        self.classifier = nn.Linear(512, len(Config.CLASSES))
        
        for param in self.features.parameters():
            param.requires_grad = False

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [7]:
def load_data():
    train_dataset = datasets.ImageFolder(Config.DATA_PATH, transform=train_transform)
    
    test_dataset = datasets.ImageFolder(Config.DATA_PATH, transform=test_transform)
    
    valid_indices = [
        i for i, (path, label) in enumerate(train_dataset.samples)
        if train_dataset.classes[label] in Config.CLASSES
    ]
    
    train_dataset = Subset(train_dataset, valid_indices)
    test_dataset = Subset(test_dataset, valid_indices)
    
    old_class_to_idx = {cls: idx for idx, cls in enumerate(train_dataset.dataset.classes)}
    new_class_to_idx = {cls: idx for idx, cls in enumerate(Config.CLASSES)}
    
    for i in valid_indices:
        path, old_label = train_dataset.dataset.samples[i]
        cls = train_dataset.dataset.classes[old_label]
        train_dataset.dataset.samples[i] = (path, new_class_to_idx[cls])
        test_dataset.dataset.samples[i] = (path, new_class_to_idx[cls])
    
    train_size = int(0.85 * len(train_dataset))
    test_size = len(train_dataset) - train_size
    train_indices, val_indices = random_split(range(len(train_dataset)), [train_size, test_size])
    
    train_subset = Subset(train_dataset, train_indices)
    val_subset = Subset(test_dataset, val_indices)
    
    return train_subset, val_subset

def init_model():
    if Config.ARCHITECTURE == "AlexNet":
        model = AlexNet()
    elif Config.ARCHITECTURE == "ImprovedAlexNet":
        model = ImprovedAlexNet()
    elif Config.ARCHITECTURE == "CustomResNet18":
        model = CustomResNet18()
    return model.to(Config.DEVICE)

def train():
    train_set, test_set = load_data()
    test_loader = DataLoader(test_set, batch_size=Config.BATCH_SIZE)
    
    model = init_model()
    optimizer = optim.Adam(model.parameters(), lr=Config.LR)
    criterion = nn.CrossEntropyLoss()
    
    scheduler = ReduceLROnPlateau(
        optimizer, 
        mode='max',
        factor=0.5,
        patience=5,
        verbose=True
    )
    
    best_acc = 0.0
    history = {'train_acc': [], 'test_acc': []}
    
    for epoch in range(Config.EPOCHS):
        model.train()
        running_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for inputs, labels in DataLoader(train_set, batch_size=Config.BATCH_SIZE, shuffle=True):
            inputs = inputs.to(Config.DEVICE)
            labels = labels.to(Config.DEVICE)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            train_correct += (preds == labels).sum().item()
            train_total += labels.size(0)
        
        train_acc = train_correct / train_total
        history['train_acc'].append(train_acc)
        
        # Валидация
        model.eval()
        test_correct = 0
        test_total = 0
        test_loss = 0.0
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs = inputs.to(Config.DEVICE)
                labels = labels.to(Config.DEVICE)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                test_loss += loss.item()
                
                _, preds = torch.max(outputs, 1)
                test_correct += (preds == labels).sum().item()
                test_total += labels.size(0)
        
        test_acc = test_correct / test_total
        history['test_acc'].append(test_acc)
        
        # Обновляем шедулер на основе test_acc
        scheduler.step(test_acc)
        
        if test_acc > best_acc:
            best_acc = test_acc
            torch.save(model.state_dict(), Config.SAVE_PATH)
        
        # Выводим текущий LR
        current_lr = optimizer.param_groups[0]['lr']
        print(
            f"Epoch {epoch+1}/{Config.EPOCHS} | "
            f"Train Loss: {running_loss/len(train_set):.4f} | "
            f"Train Acc: {train_acc:.4f} | "
            f"Test Loss: {test_loss/len(test_set):.4f} | "
            f"Test Acc: {test_acc:.4f} | "
            f"LR: {current_lr:.6f}"  # Добавили вывод LR
        )
    
    # Сохранение кривых обучения
    plt.figure(figsize=(12, 5))
    plt.plot(history['train_acc'], label='Train Acc')
    plt.plot(history['test_acc'], label='Test Acc')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    plt.tight_layout()
    os.makedirs(os.path.dirname(Config.LEARNING_CURVES_PATH), exist_ok=True)
    plt.savefig(Config.LEARNING_CURVES_PATH)
    plt.close()
    
    return history

In [8]:
history = train()



Epoch 1/50 | Train Loss: 0.0427 | Train Acc: 0.2170 | Test Loss: 0.0408 | Test Acc: 0.2415 | LR: 0.001000
Epoch 2/50 | Train Loss: 0.0340 | Train Acc: 0.3543 | Test Loss: 0.0295 | Test Acc: 0.3476 | LR: 0.001000
Epoch 3/50 | Train Loss: 0.0307 | Train Acc: 0.4085 | Test Loss: 0.0301 | Test Acc: 0.4454 | LR: 0.001000
Epoch 4/50 | Train Loss: 0.0272 | Train Acc: 0.4654 | Test Loss: 0.0253 | Test Acc: 0.5061 | LR: 0.001000
Epoch 5/50 | Train Loss: 0.0252 | Train Acc: 0.5090 | Test Loss: 0.0233 | Test Acc: 0.5342 | LR: 0.001000
Epoch 6/50 | Train Loss: 0.0225 | Train Acc: 0.5384 | Test Loss: 0.0194 | Test Acc: 0.6096 | LR: 0.001000
Epoch 7/50 | Train Loss: 0.0226 | Train Acc: 0.5543 | Test Loss: 0.0171 | Test Acc: 0.6601 | LR: 0.001000
Epoch 8/50 | Train Loss: 0.0200 | Train Acc: 0.5935 | Test Loss: 0.0187 | Test Acc: 0.6230 | LR: 0.001000
Epoch 9/50 | Train Loss: 0.0198 | Train Acc: 0.6076 | Test Loss: 0.0188 | Test Acc: 0.6435 | LR: 0.001000
Epoch 10/50 | Train Loss: 0.0184 | Train Acc: 