In [24]:
import json
import urllib
import torch
import torch.nn as nn
import torch.optim as optim
import os
from skimage import io, transform
from torchvision import models
from torch.utils.data import Dataset, SubsetRandomSampler
from torchvision import transforms
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
device = torch.device("cuda:0")
print(f"Using device: {device}")

class HotdogRecognitionDataset(Dataset):
    def __init__(self, folder, transform=None):
        self.folder = folder
        self.transform = transform
        self.files = os.listdir(folder)
        self.hotdog_prefixes = ['chili-dog', 'frankfurter', 'hotdog']

    def __len__(self):
        return len(self.files)

    def __getitem__(self, index):
        img_name = self.files[index]
        img_path = os.path.join(self.folder, img_name)
        image = Image.open(img_path)
        y = 0
        for prefix in self.hotdog_prefixes:
            if img_name.startswith(prefix):
                y = 1
                break
        if self.transform:
            image = self.transform(image)
        return image, y, img_name

def visualize_samples(dataset, indices, title=None):
    n = len(indices)
    fig, axes = plt.subplots(1, n, figsize=(20, 3))
    if title:
        plt.suptitle(title, fontsize=16)
    
    for i, index in enumerate(indices):
        x, y, img_name = dataset[index]
        ax = axes[i]
        ax.imshow(x)
        ax.set_title(f"Label: {y}", fontsize=12)
        ax.grid(False)
        ax.axis('off')
    plt.tight_layout()
    plt.show()

# Путь к обучающим данным
train_path = '/kaggle/input/hotdog-dataset-zip/Задание 5. Нейронные сети/train/train_kaggle'
test_path = '/kaggle/input/hotdog-dataset-zip/Задание 5. Нейронные сети/test/test_kaggle'

train_dataset = HotdogRecognitionDataset(train_path,
                       transform=transforms.Compose([
                           transforms.Resize((224, 224)),
                           transforms.ToTensor(),
                           # Use mean and std for pretrained models
                           # https://pytorch.org/docs/stable/torchvision/models.html
                           transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
                       ])
                      )
test_dataset = HotdogRecognitionDataset(test_path,
                       transform=transforms.Compose([
                           transforms.Resize((224, 224)),
                           transforms.ToTensor(),
                           # Use mean and std for pretrained models
                           # https://pytorch.org/docs/stable/torchvision/models.html
                           transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
                       ])
                      )
batch_size = 64

data_size = len(train_dataset)
validation_fraction = .2


val_split = int(np.floor((validation_fraction) * data_size))
indices = list(range(data_size))
np.random.seed(42)
np.random.shuffle(indices)

val_indices, train_indices = indices[:val_split], indices[val_split:]

train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,
                                           sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,
                                         sampler=val_sampler)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

def compute_accuracy(model, loader):
    model.eval()  # Enter evaluation mode
    correct_samples = 0
    total_samples = 0

    with torch.no_grad():
        for x, y, _ in loader:
            x = x.to(device)
            y = y.to(device)

            predictions = model(x)
            correct_samples += (predictions.argmax(dim=1) == y).sum().item()
            total_samples += y.size(0)

    return correct_samples / total_samples

def train_model_with_early_stopping(model, train_loader, val_loader, loss, optimizer, num_epochs, patience=5):
    best_val_acc = 0
    patience_counter = 0
    best_model_state = None
    
    loss_history = []
    train_history = []
    val_history = []
    
    for epoch in range(num_epochs):
        model.train()
        loss_accum = 0
        correct_samples = 0
        total_samples = 0
        
        for i_step, (x, y, _) in enumerate(train_loader):
            x_gpu = x.to(device)
            y_gpu = y.to(device)
            
            prediction = model(x_gpu)
            loss_value = loss(prediction, y_gpu)
            
            optimizer.zero_grad()
            loss_value.backward()
            optimizer.step()
            
            _, indices = torch.max(prediction, 1)
            correct_samples += torch.sum(indices == y_gpu).item()
            total_samples += y.shape[0]
            loss_accum += loss_value.item()
        
        scheduler.step()
        
        ave_loss = loss_accum / len(train_loader)
        train_accuracy = correct_samples / total_samples
        val_accuracy = compute_accuracy(model, val_loader)
        
        loss_history.append(ave_loss)
        train_history.append(train_accuracy)
        val_history.append(val_accuracy)
        
        print(f"Epoch {epoch+1}: Loss: {ave_loss:.4f}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")
        
        # Ранняя остановка
        if val_accuracy > best_val_acc:
            best_val_acc = val_accuracy
            best_model_state = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1
            
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break
            
    model.load_state_dict(best_model_state)
    return loss_history, train_history, val_history

# Thanks to https://discuss.pytorch.org/t/imagenet-classes/4923/2
def load_imagenet_classes():
    classes_json = urllib.request.urlopen('https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json').read()
    classes = json.loads(classes_json)

    #Process it to return dict of class index to name
    return { int(k): v[-1] for k, v in classes.items()}

def visualize_resnet_predictions(model, dataset, num_samples=10):
    """
    Тестирует ResNet18 на случайных изображениях и визуализирует предсказания
    """
    model.eval()
    # Загружаем классы ImageNet
    imagenet_classes = load_imagenet_classes()
    indices = np.random.choice(len(dataset), num_samples, replace=False)
    plt.figure(figsize=(20, 8))

    for i, idx in enumerate(indices):
        image_tensor, true_label, filename = dataset[idx]
        
        with torch.no_grad():
            # Добавляем размерность батча и перемещаем на устройство
            prediction = model(image_tensor.unsqueeze(0).to(device))
            values, indices = torch.max(prediction, 1)
        
        pred_class_idx = indices.item()
        pred_class_name = imagenet_classes[pred_class_idx]
        confidence = torch.softmax(prediction, dim=1)[0][pred_class_idx].item()
        
        # Денормализуем изображение для отображения
        mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
        std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
        image_denorm = image_tensor * std + mean
        image_denorm = torch.clamp(image_denorm, 0, 1)
        image_np = image_denorm.permute(1, 2, 0).numpy()

        plt.subplot(2, 5, i + 1)
        plt.imshow(image_np)
        plt.title(f"True: {'Hotdog' if true_label == 1 else 'Not Hotdog'}\nPred: {pred_class_name}\nConf: {confidence:.3f}")
        plt.axis('off')
    plt.tight_layout()
    plt.show()
    
#Тренируем только последний слой
# for param in model.parameters():
#     param.requires_grad = False
# model.fc = nn.Linear(512, 2)
# model.fc = model.fc.to(device)
# loss = nn.CrossEntropyLoss()
# optimizer = optim.SGD( model.parameters(), lr=0.001, momentum=0.9)
# loss_history, train_history, val_history = train_model(model, train_loader, val_loader, loss, optimizer, 2)

model = models.resnet18(pretrained=True)
model = model.to(device)
#Тренируем всю модель
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)
model.fc = model.fc.to(device)
# Разные learning rates для разных частей
parameters = [
    {'params': model.fc.parameters(), 'lr': 0.001},  # Быстрее обучаем новый слой
    {'params': [p for n, p in model.named_parameters() if 'fc' not in n], 'lr': 0.0001}]
loss = nn.CrossEntropyLoss()
optimizer = optim.AdamW(parameters, lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=15)
loss_history, train_history, val_history = train_model_with_early_stopping(
    model, train_loader, val_loader, loss, optimizer, 20, patience=5
)

from torch.utils.data.sampler import Sampler
from torch.utils.data import DataLoader
import sklearn.metrics as metrics

class SubsetSampler(Sampler):

    def __init__(self, indices):
        self.indices = indices

    def __iter__(self):
        return (self.indices[i] for i in range(len(self.indices)))

    def __len__(self):
        return len(self.indices)
        
def evaluate_model(model, dataset, indices):
    sampler = SubsetSampler(indices)
    loader = DataLoader(dataset, batch_size=32, sampler=sampler)
    all_predictions = []
    all_ground_truth = []
    with torch.no_grad():
        for batch in loader:
            x, y, _ = batch
            x = x.to(device)
            y = y.to(device)
            outputs = model(x)
            _, predicted = torch.max(outputs, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_ground_truth.extend(y.cpu().numpy())
            
    predictions = np.array(all_predictions, dtype=bool)
    ground_truth = np.array(all_ground_truth, dtype=bool)
    return predictions, ground_truth




def binary_classification_metrics(prediction, ground_truth):
    prediction_int = prediction.astype(int)
    ground_truth_int = ground_truth.astype(int)
    precision = metrics.precision_score(ground_truth_int, prediction_int)
    recall = metrics.recall_score(ground_truth_int, prediction_int)
    f1 = metrics.f1_score(ground_truth_int, prediction_int)
    return precision, recall, f1

predictions, ground_truth = evaluate_model(model, train_dataset, val_indices)
precision, recall, f1 = binary_classification_metrics(predictions, ground_truth)
print("F1: %4.3f, P: %4.3f, R: %4.3f" % (precision, recall, f1))

Using device: cuda:0
Epoch 1: Loss: 0.1950, Train Acc: 0.9169, Val Acc: 0.9598
Epoch 2: Loss: 0.0331, Train Acc: 0.9929, Val Acc: 0.9609
Epoch 3: Loss: 0.0123, Train Acc: 0.9967, Val Acc: 0.9533
Epoch 4: Loss: 0.0057, Train Acc: 0.9992, Val Acc: 0.9641
Epoch 5: Loss: 0.0024, Train Acc: 0.9997, Val Acc: 0.9576
Epoch 6: Loss: 0.0011, Train Acc: 1.0000, Val Acc: 0.9630
Epoch 7: Loss: 0.0006, Train Acc: 1.0000, Val Acc: 0.9641
Epoch 8: Loss: 0.0008, Train Acc: 1.0000, Val Acc: 0.9641
Epoch 9: Loss: 0.0003, Train Acc: 1.0000, Val Acc: 0.9630
Early stopping at epoch 9
F1: 0.948, P: 0.928, R: 0.938
