In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import transforms, models, datasets
from PIL import Image
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm
import re


train_dir = "/kaggle/input/butterflies/train_butterflies/"
test_dir = "/kaggle/input/butterflies/test_butterflies/"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# предварительное исследование
classes = sorted([d for d in os.listdir(train_dir)])
# print(classes)

total = 0
balance_ = dict()
resolutions = set()

for class_ in classes:
    class_path = os.path.join(train_dir, class_)
    balance_[class_] = 0
    class_images = [_ for _ in os.listdir(class_path)]

    for image in class_images:
        img_path = os.path.join(class_path, image)
        total += 1
        balance_[class_] += 1
        with Image.open(img_path) as img:
            width, height = img.size
            resolutions.add((width, height))

balance = sorted(balance_.items(), key=lambda item: item[1])
print("Всего изображений", total) # 4955
print("Разрешение:", resolutions) # 224 * 224 

In [None]:
import matplotlib.pyplot as plt

classes = [item[0] for item in balance]
values = [item[1] for item in balance]

plt.figure(figsize=(14, 6))
plt.plot(classes, values, marker='o')
plt.xticks(rotation=90)
plt.title('Количество изображений в классе')
plt.xlabel('Класс')
plt.ylabel('Количество')
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# функция для рисования кривых обучения
def plot_training_curves(train_loss, val_loss, train_acc, val_acc):
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    axes[0].plot(train_loss, label='Train Loss', linewidth=2)
    axes[0].plot(val_loss, label='Val Loss', linewidth=2)
    axes[0].set_title('Loss per Epoch', fontsize=14)
    axes[0].set_xlabel('Epoch', fontsize=12)
    axes[0].set_ylabel('Loss', fontsize=12)
    axes[0].legend(loc='upper right', fontsize=10)

    axes[1].plot(train_acc, label='Train Accuracy', linewidth=2)
    axes[1].plot(val_acc, label='Val Accuracy', linewidth=2)
    axes[1].set_title('Accuracy per Epoch', fontsize=14)
    axes[1].set_xlabel('Epoch', fontsize=12)
    axes[1].set_ylabel('Accuracy', fontsize=12)
    axes[1].legend(loc='lower right', fontsize=10)

    plt.show()

In [None]:
# подготовка данных
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # статистика ImageNet
])

train_dataset = datasets.ImageFolder(
    root=train_dir,
    transform=transform
)

train_ds, val_ds = random_split(train_dataset, [0.8, 0.2])

batch_size = 16
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

In [None]:
# датасет для тестовых данных (без меток)
class TestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_files = sorted(
            [os.path.join(root_dir, f) for f in os.listdir(root_dir)],
            key=lambda x: int(re.search(r'(\d+)', os.path.basename(x)).group(1)) # регулярка находит число в имени файла
        )

    def __getitem__(self, idx):
        img_path = self.image_files[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(img_path)

    def __len__(self):
        return len(self.image_files)

test_dataset = TestDataset(root_dir=test_dir, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# первая модель - попытка натренировать с нуля
class BasicBlock1(nn.Module):
    def __init__(self):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(64, 128, 3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, 3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
        )
        self.skip = nn.Sequential(
            nn.Conv2d(64, 128, 1, stride=2, bias=False),
            nn.BatchNorm2d(128)
        )

    def forward(self, x):
        return (self.skip(x) + self.block(x)).relu()
    
class BasicBlock2(nn.Module):
    def __init__(self):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(128, 128, 3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, 3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
        )

    def forward(self, x):
        return (x + self.block(x)).relu()

model = nn.Sequential( # попробуем обучить не слишком глубокую сеть
    nn.Conv2d(3, 64, 7, stride=2, padding=3, bias=False),
    nn.BatchNorm2d(64),
    nn.MaxPool2d((3,3), stride=2, padding=1),
    BasicBlock1(),
    BasicBlock2(),
    nn.AdaptiveAvgPool2d(1),
    nn.Flatten(),
    nn.Linear(128, 50)
).to(device)


In [None]:
# функция обучения модели
def train_model(model, num_epochs, optimizer, criterion, name):
    max_acc = 0
    
    train_loss_list = []
    val_loss_list = []
    train_acc_list = []
    val_acc_list = []
    
    for epoch in range(num_epochs):
        # тренировка
        model.train()
        train_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
        
        train_acc = 100. * correct / total
        train_loss /= len(train_loader)
        
        # валидация
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
        
        val_acc = 100. * correct / total
        val_loss /= len(val_loader)
        
        print(f"Train Loss: {train_loss:.4f} | Acc: {train_acc:.2f}%")
        print(f"Val Loss: {val_loss:.4f} | Acc: {val_acc:.2f}%")
    
        train_loss_list.append(train_loss)
        val_loss_list.append(val_loss)
        train_acc_list.append(train_acc)
        val_acc_list.append(val_acc)
    
        if max_acc < val_acc:
            max_acc = val_acc
            torch.save(model.state_dict(), name)

    plot_training_curves(train_loss_list, val_loss_list, train_acc_list, val_acc_list)

In [None]:
# обучим первую модель
opt = optim.Adam(params=model.parameters(), lr=0.001, weight_decay=0.001)
loss_func = nn.CrossEntropyLoss()
train_model(model, 50, opt, loss_func, "first_model.pth") # поставил эпох побольше, чтобы потом прервать обучение при необходимости

In [None]:
# предсказание на тестовых данных
def form_prediction(name_of_data, model_to_predict, name_of_prediction):
    model_to_predict.eval()
    model_to_predict.load_state_dict(torch.load(name_of_data))
    all_preds = []
    filenames = []
    
    with torch.no_grad():
        for inputs, names in test_loader:
            inputs = inputs.to(device)
            outputs = model_to_predict(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            filenames.extend(names)
    
    class_names = train_dataset.classes
    class_preds = [class_names[p] for p in all_preds]
    
    results = []
    for i, (fn, pred) in enumerate(zip(filenames, class_preds)):
        class_num = int(pred.split('_')[1])
        results.append({'index': i, 'label': class_num})
    
    df = pd.DataFrame(results)
    df.to_csv(name_of_prediction, index=False)

In [None]:
# первая модель достигает порога 0.8 на валидационной выборке
# скорее всего, у нее недостаточная глубина

form_prediction("first_model.pth", model, "first_model_prediction.csv")

In [None]:
# вторая модель - дообученная b3
class ButterflyClassifier(nn.Module):
    def __init__(self): # внезапно, работает лучше без заморозки слоев, я проверял!
        super().__init__()
        self.base_model = models.efficientnet_b3(pretrained=True)
        self.base_model.classifier = nn.Sequential(
            nn.Dropout(p=0.4, inplace=True),
            nn.Linear(self.base_model.classifier[1].in_features, 50)
        )
    
    def forward(self, x):
        return self.base_model(x)

In [None]:
model = ButterflyClassifier().to(device)

loss_func = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr=0.001)

train_model(model, 15, opt, loss_func, "second_model.pth")

In [None]:
form_prediction("second_model.pth", model, "second_model_prediction.csv")

In [None]:
# попробуем изменить последнюю модель
class ButterflyClassifierUpdated(nn.Module):
    def __init__(self):
        super().__init__()
        self.base_model = models.efficientnet_b3(pretrained=True)
        self.base_model.classifier = nn.Sequential( # пробовал менять последние слои, эта архитектура показала себя лучше всего
            nn.Dropout(p=0.4, inplace=True),
            nn.Linear(self.base_model.classifier[1].in_features, 50)
        )
    
    def forward(self, x):
        return self.base_model(x)

In [None]:
# добавим аугментации
aug_transform = transforms.Compose([
    transforms.RandomAffine(degrees=(-20, 20), translate=(0.1, 0.1)), # /*
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.)),
    transforms.RandomApply([transforms.GaussianBlur(3)], p=0.1),
    transforms.RandomAdjustSharpness(1.5, p=0.3), # */
    
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# придется заново создать датасеты
train_dataset = datasets.ImageFolder(
    root=train_dir,
    transform=None
)

train_tmp, val_tmp = random_split(train_dataset, [0.8, 0.2])

In [None]:
class TransformSubset(torch.utils.data.Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform

    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.subset)

train_ds = TransformSubset(train_tmp, aug_transform)
val_ds = TransformSubset(val_tmp, transform)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

In [None]:
model = ButterflyClassifierUpdated().to(device)
opt = optim.Adam(model.parameters(), lr=0.0001, amsgrad=True)
loss_func = nn.CrossEntropyLoss()
train_model(model, 25, opt, loss_func, "third_model.pth") # больше эпох, так как модель сложнее

In [None]:
form_prediction("third_model.pth", model, "third_model_prediction.csv")