In [None]:
# ------------------------- Блок 1: Импорты -------------------------
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models, datasets
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

# Импорт словаря перевода
from animals_sber import animals_sber

# Проверка доступности GPU и установка устройства
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# ------------------------- Блок 2: Параметры -------------------------

DATASET_DIR = r'C:/Users/admin/Desktop/work/model_resnet/датасет'

IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 50
LR = 3e-5
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15  

# ------------------------- Блок 3: Подготовка данных -------------------------

# Трансформации для данных, на train усиленная аугментация 
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.2, scale=(0.02, 0.1), ratio=(0.3, 3.3))
])

val_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

full_dataset = datasets.ImageFolder(DATASET_DIR)

# Вывод информации о количестве изображений
print("\nDataset statistics:")
print(f"Total images: {len(full_dataset)}")
print(f"Number of classes: {len(full_dataset.classes)}")
print("Class distribution:")
for class_name, count in zip(full_dataset.classes, np.bincount(full_dataset.targets)):
    print(f"{class_name}: {count} images")



# Разделение на тренировочную, валидационную и тестовую выборки

from collections import defaultdict
import random
import numpy as np

# Собираем индексы по классам
class_indices = defaultdict(list)
for idx, (_, label) in enumerate(full_dataset.samples):
    class_indices[label].append(idx)

# Перемешиваем внутри каждого класса
for label in class_indices:
    random.shuffle(class_indices[label])

train_idx = []
val_idx = []
test_idx = []

for label, indices in class_indices.items():
    n_total = len(indices)
    n_test = int(TEST_SPLIT * n_total)
    n_val = int(VAL_SPLIT * n_total)
    n_train = n_total - n_test - n_val
    
    train_idx.extend(indices[:n_train])
    val_idx.extend(indices[n_train:n_train + n_val])
    test_idx.extend(indices[n_train + n_val:])

random.shuffle(train_idx)
random.shuffle(val_idx)
random.shuffle(test_idx)



train_dataset = datasets.ImageFolder(
    DATASET_DIR,
    transform=train_transform
)
train_dataset = Subset(train_dataset, train_idx)

val_dataset = datasets.ImageFolder(
    DATASET_DIR,
    transform=val_transform
)
val_dataset = Subset(val_dataset, val_idx)

test_dataset = datasets.ImageFolder(
    DATASET_DIR,
    transform=test_transform
)
test_dataset = Subset(test_dataset, test_idx)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=4,
    pin_memory=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=4,
    pin_memory=True
)

class_names = full_dataset.classes
num_classes = len(class_names)

print("\nDataset splits:")
print(f"Training set: {len(train_dataset)} images")
print(f"Validation set: {len(val_dataset)} images")
print(f"Test set: {len(test_dataset)} images")

# ------------------------- Блок 4: Модель -------------------------

class ResNet50Model(nn.Module):
    def __init__(self, num_classes):
        super(ResNet50Model, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        
        # Замораживаем все слои
        for param in self.resnet.parameters():
            param.requires_grad = False
            
        # Заменяем последний слой
        num_ftrs = self.resnet.fc.in_features
        self.resnet.fc = nn.Sequential(
            nn.Linear(num_ftrs, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, x):
        return self.resnet(x)

model = ResNet50Model(num_classes).to(device)

# Функция потерь и оптимизатор
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)

# ------------------------- Блок 5: Callbacks и утилиты -------------------------

# Ранняя остановка
class EarlyStopping:
    def __init__(self, patience=7, delta=0.001):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        
    def __call__(self, val_loss):
        score = -val_loss
        
        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0

early_stopping = EarlyStopping(patience=5)

# Сохранение модели
def save_model(model, path='best_resnet50.pth'):
    torch.save(model.state_dict(), path)

# Планировщик обучения
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, factor=0.5)

# Функция для оценки модели
def evaluate_model(model, data_loader, criterion, dataset_name="Validation"):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(data_loader.dataset)
    accuracy = correct / total
    
    print(f"\n{dataset_name} Results:")
    print(f"Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
    
    # Classification Report
    translated_labels = [animals_sber.get(label, label) for label in class_names]
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=translated_labels))
    
    # Confusion Matrix
    plt.figure(figsize=(10,8))
    sns.heatmap(confusion_matrix(all_labels, all_preds), annot=True, fmt="d", 
                xticklabels=translated_labels, yticklabels=translated_labels, cmap="Blues")
    plt.title(f"{dataset_name} Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()
    
    return all_preds, all_labels

# ------------------------- Блок 6: Обучение -------------------------

def train_model(model, train_loader, val_loader, epochs, optimizer, criterion):
    best_val_loss = float('inf')
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        correct_train = 0
        total_train = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
        
        train_loss = train_loss / len(train_loader.dataset)
        train_acc = correct_train / total_train
        
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs.data, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()
        
        val_loss = val_loss / len(val_loader.dataset)
        val_acc = correct_val / total_val
        
        scheduler.step(val_loss)
        
        early_stopping(val_loss)
        if early_stopping.early_stop:
            print("Early stopping")
            break
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            save_model(model)
        
        print(f'\nEpoch {epoch+1}/{epochs}:')
        print(f'Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f}')
        print(f'Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f}')
    
    return model

# Первый этап обучения (замороженные слои)
print("\nFirst stage training (frozen layers)")
model = train_model(model, train_loader, val_loader, EPOCHS, optimizer, criterion)

# Оценка на валидационной выборке после первого этапа
print("\nEvaluating on validation set after first stage:")
val_preds, val_labels = evaluate_model(model, val_loader, criterion, "Validation")

# ------------------------- Блок 7: Разморозка и дообучение -------------------------

# Размораживаем все слои
for param in model.resnet.parameters():
    param.requires_grad = True

# Новый оптимизатор с меньшим learning rate
optimizer = optim.Adam(model.parameters(), lr=LR/10)

# Второй этап обучения
print("\nSecond stage training (unfrozen layers)")
model = train_model(model, train_loader, val_loader, 20, optimizer, criterion)

# Оценка на валидационной выборке после второго этапа
print("\nEvaluating on validation set after second stage:")
val_preds, val_labels = evaluate_model(model, val_loader, criterion, "Validation")

# ------------------------- Блок 8: Оценка на тестовой выборке -------------------------

# Загрузка лучшей модели
model.load_state_dict(torch.load('best_resnet50.pth', map_location=device))

# Оценка на тестовой выборке
print("\nFinal evaluation on test set:")
test_preds, test_labels = evaluate_model(model, test_loader, criterion, "Test")