In [None]:
import numpy as np 
import pandas as pd 
import os

In [None]:
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import timm
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from sklearn.model_selection import train_test_split
from PIL import Image
from tqdm import tqdm
from transformers import ViTImageProcessor, ViTModel
from albumentations import (HorizontalFlip, RandomBrightnessContrast, ShiftScaleRotate, 
                            Resize, Compose, Normalize)
from albumentations.pytorch import ToTensorV2


In [None]:
!pip install pillow-avif-plugin

In [None]:
import pillow_avif

In [None]:
def get_train_augmentations():
    return Compose([
        Resize(224, 224),  # Размер изображений под модель
        HorizontalFlip(p=0.5),  # Отражение изображений с вероятностью 50%
        RandomBrightnessContrast(p=0.2),  # Изменение яркости и контрастности
        ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, p=0.5),  # Сдвиг, масштабирование и поворот
        Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),  # Нормализация по стандарту ImageNet
        ToTensorV2()  # Преобразование в тензор
    ])

def get_valid_augmentations():
    return Compose([
        Resize(224, 224),  # Изменение размера изображений
        Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ])

In [None]:
class ImageDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Проходим по всем классам (папкам)
        for class_name in sorted(os.listdir(root_dir)):
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                class_index = int(class_name) - 1  # Преобразуем имя папки в индекс (0-based)
                for image_name in os.listdir(class_dir):
                    image_path = os.path.join(class_dir, image_name)
                    if os.path.isfile(image_path):
                        self.image_paths.append(image_path)
                        self.labels.append(class_index)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert("RGB")
        image = np.array(image)  # Преобразуем в numpy массив

        if self.transform:
            # Применяем трансформации
            augmented = self.transform(image=image)
            image = augmented['image']

        return image, label

In [None]:
class TestImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []

        # Проходим по всем файлам в корневой директории
        for image_name in os.listdir(root_dir):
            image_path = os.path.join(root_dir, image_name)
            if os.path.isfile(image_path):
                self.image_paths.append(image_path)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")
        image = np.array(image)  # Преобразуем в numpy массив

        # Извлекаем название файла
        file_name = os.path.basename(image_path)

        if self.transform:
            # Применяем трансформации
            augmented = self.transform(image=image)
            image = augmented['image']

        return image, file_name

In [None]:
def create_dataloader(data_dir, batch_size=8, train_transforms=None,shuffle = True):
    train_dataset = ImageDataset(root_dir=data_dir, transform=train_transforms)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle)

    return train_loader

In [None]:
# Директория с данными (где хранятся изображения для всех классов)
data_dir = "/kaggle/input/training/rucode"

# Параметры
batch_size = 16

# Аугментации
train_transforms = get_train_augmentations()

# Создание загрузчика данных для обучения
train_loader = create_dataloader(data_dir, batch_size, train_transforms)

# Пример того, как итерировать по данным
for images, labels in train_loader:
    print(images.shape, labels.shape)
    print(labels)
    break  # Для теста выводим один батч

In [None]:
# Пример того, как итерировать по данным
for images, labels in train_loader:
    print(images.shape, labels.shape)
    print(labels)
    print(images[1].shape)
    break  # Для теста выводим один батч

In [None]:
import matplotlib.pyplot as plt

In [None]:
tensor_image = images[6]

# Убедимся, что это тензор на CPU и преобразуем его в numpy
numpy_image = tensor_image.permute(1, 2, 0).cpu().numpy()

# Если изображение было нормализовано, то нужно денормализовать его:
# Например, если использовались средние и стандартные отклонения для нормализации:
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
# denormalize:
numpy_image = numpy_image * std + mean

# Преобразуем значения из диапазона [0, 1] в [0, 255], если они нормализованы
numpy_image = (numpy_image * 255).astype(np.uint8)

# Преобразуем numpy массив обратно в изображение с помощью PIL
image = Image.fromarray(numpy_image)

print(labels[6])
plt.imshow(image)
plt.axis('off')  # Не показывать оси
plt.show()


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.vit = timm.create_model('vit_base_patch8_224.augreg2_in21k_ft_in1k', pretrained=True)
        
        # Полносвязные слои с BatchNorm и Dropout
        self.fc1 = nn.Linear(1000, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.dropout1 = nn.Dropout(0.3)

        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        self.dropout2 = nn.Dropout(0.2)

        self.fc3 = nn.Linear(256, 128)
        self.bn3 = nn.BatchNorm1d(128)
        self.dropout3 = nn.Dropout(0.1)

        self.fc4 = nn.Linear(128, 16)   # Выходной слой для 16 классов

        self.relu = nn.ReLU()
        
        # Разморозка последних двух слоев ViT
        self._unfreeze_last_two_layers()

    def forward(self, x):
        # Проход через Vision Transformer
        output = self.vit(x)

        # Полносвязные слои с BatchNorm, Dropout и ReLU
        output = self.fc1(output)
        output = self.bn1(output)
        output = self.relu(output)
        output = self.dropout1(output)

        output = self.fc2(output)
        output = self.bn2(output)
        output = self.relu(output)
        output = self.dropout2(output)

        output = self.fc3(output)
        output = self.bn3(output)
        output = self.relu(output)
        output = self.dropout3(output)

        output = self.fc4(output)
        output = F.softmax(output, dim=-1)

        return output
    
    def _unfreeze_last_two_layers(self):
        # Разморозка всех слоев
        for param in self.vit.parameters():
            param.requires_grad = False
        
        # Разморозка последних двух слоев
        # Путь к слоям может отличаться в зависимости от версии и типа модели ViT
        # Проверьте правильные имена параметров, если это не сработает

        for name, param in self.vit.named_parameters():
            if 'blocks.10.' in name or 'blocks.11.' in name:  # Последние два слоя могут отличаться
                param.requires_grad = True

In [None]:
model = SimpleNN()

In [None]:
model = model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.01)

In [None]:
from sklearn.metrics import f1_score

In [None]:
def train_model(model, train_loader, criterion, optimizer, device, num_epochs=20):
    model.train()  # Переводим модель в режим обучения

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        all_labels = []
        all_predictions = []

        # Проходим по батчам
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            # Обнуление градиентов
            optimizer.zero_grad()

            # Прямой проход (forward pass)
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Обратный проход (backward pass) и обновление весов
            loss.backward()
            optimizer.step()

            # Статистика
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            # Собираем все метки и предсказания
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

        # Вычисляем F1-Score
        f1_macro = f1_score(all_labels, all_predictions, average='macro')

        # Выводим статистику за эпоху
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%, F1-Score (macro): {f1_macro:.4f}')

In [None]:
train_model(model, train_loader, criterion, optimizer, device, num_epochs=100)

In [None]:
test_dir = '/kaggle/input/testoublic/public_test'

In [None]:
test_aug = get_valid_augmentations()

In [None]:
test_dataset =TestImageDataset(root_dir=test_dir,transform = test_aug)

In [None]:
# Пример того, как итерировать по данным
for images in tqdm(test_dataset):
    print(images)
    
    

In [None]:
def predict_and_collect(model, test_dataset, device, batch_size=16):
    model.eval()  # Переключаем модель в режим оценки
    predictions = []
    image_names = []
    
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    with torch.no_grad():
        for images, paths in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, dim=1)
            
            # Преобразуем предсказания в список и собираем пути изображений
            predictions.extend(predicted.cpu().numpy() + 1)
            image_names.extend(paths)
    
    # Создаем DataFrame
    df = pd.DataFrame({
        'image_name': image_names,
        'class_number': predictions
    })
    
    return df

In [None]:
df_predictions = predict_and_collect(model, test_dataset, device)

In [None]:
df_predictions

In [None]:
df_predictions.to_csv('100_epoch_seed_732.csv',index=False)