In [None]:
# plik do treningu modelu resnet50

import os
import pandas as pd
from PIL import Image
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as transforms
import torchvision.models as models
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from torchvision.models import ResNet50_Weights
from datetime import datetime
import csv
from functions import calculate_metrics

MODEL_NAME = 'resnet50'
BATCH_SIZE = 32
LEARNING_RATE = 1e-3
NUM_EPOCHS = 50
PATIENCE = 10

In [None]:
# tworzenie folderów na output
time = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
model_output_path = MODEL_NAME + "_" + time
os.makedirs(f"models/{MODEL_NAME}_{time}", exist_ok=True)
os.makedirs(f"models/{MODEL_NAME}_{time}/score", exist_ok=True)
os.makedirs(f"models/{model_output_path}", exist_ok=True)

# wybór datasetu
source_dir = 'data/for_test_split'
# source_dir = 'data/2019_images_split'
# source_dir = 'data/2015_2019_split'

train_csv = source_dir + '/train.csv'
train_dir = source_dir + '/train_images'
validation_csv = source_dir + '/validation.csv'
validation_dir = source_dir + '/validation_images'
test_csv = source_dir + '/test.csv'
test_dir = source_dir + '/test_images'

# utworzenie datasetów
class RetinaDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, shuffle=False):
        self.labels_df = pd.read_csv(csv_file)

        if shuffle:
            self.labels_df = self.labels_df.sample(frac=1).reset_index(drop=True)

        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.labels_df.iloc[idx, 0] + '.png')
        image = Image.open(img_name).convert('RGB')
        label = int(self.labels_df.iloc[idx, 1])

        if self.transform:
            image = self.transform(image)

        return image, label

# transformacje i augmenacje
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.ColorJitter(
        brightness=0.1,
        contrast=0.3,
        saturation=0.3,
        hue=0.1
    ),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

])

train_dataset = RetinaDataset(csv_file=train_csv, root_dir=train_dir, transform=train_transform, shuffle=True)
validation_dataset = RetinaDataset(csv_file=validation_csv, root_dir=validation_dir, transform=val_test_transform, shuffle=True)
test_dataset = RetinaDataset(csv_file=test_csv, root_dir=test_dir, transform=val_test_transform, shuffle=True)

# ważenie klas
labels = [label for _, label in train_dataset]
class_counts = Counter(labels)
class_weights = {cls: 1.0 / count for cls, count in class_counts.items()}
sample_weights = [class_weights[label] for label in labels]

sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),
    replacement=True
)

print("Wagi klas: ")
for cls, weight in class_weights.items():
    print(f"{cls} : {weight:.4f}")


train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler)
validation_loader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

# model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = models.resnet50(weights=ResNet50_Weights.DEFAULT)

# zamrożenie parametrów
for param in model.parameters():
    param.requires_grad = False

# zmiana warstwy klasyfikującej
num_ftrs = model.fc.in_features

model.fc = nn.Sequential(nn.Linear(num_ftrs, 5))
model = model.to(device)

print("Dostępne urządzenie: ", device)

In [None]:
# loss, optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=LEARNING_RATE)

num_epochs = NUM_EPOCHS
patience = PATIENCE

num_epochs_plot = 0

train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

best_val_acc = 0.0
best_val_loss = np.inf
epochs_without_improvement = 0

for epoch in range(num_epochs):
    num_epochs_plot += 1
    model.train()
    running_loss = 0.0
    correct_train, total_train = 0, 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    avg_train_loss = running_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    train_acc = 100 * correct_train / total_train
    train_accuracies.append(train_acc)

    model.eval()
    correct_val, total_val = 0, 0
    val_running_loss = 0.0
    with torch.no_grad():
        for inputs, labels in validation_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    avg_val_loss = val_running_loss / len(validation_loader)
    val_losses.append(avg_val_loss)

    val_acc = 100 * correct_val / total_val
    val_accuracies.append(val_acc)

    print(f"Epoch {epoch+1}/{num_epochs}, "
          f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.2f}%, "
          f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%")

    # zapis modelu po każdej epoce
    # torch.save(model, f"models/{model_output_path}/{model_output_path}_epoch_{epoch+1}.pth")

    # early stopping i zapis

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        # epochs_without_improvement = 0
        torch.save(model, f"models/{model_output_path}/{model_output_path}_best_val_acc.pth")
    # else:
    #     epochs_without_improvement += 1

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        epochs_without_improvement = 0
        torch.save(model, f"models/{model_output_path}/{model_output_path}_best_val_loss.pth")
    else:
        epochs_without_improvement += 1

    if epochs_without_improvement >= patience:
        print(f"Early stopping at epoch {epoch+1} as validation loss did not improve.")
        break

torch.save(model, f"models/{model_output_path}/{model_output_path}_last_epoch_{epoch+1}.pth")

In [None]:
# ocena modelu

model.eval()
y_true, y_pred, y_probs = [], [], []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        probs = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs, 1)
        y_true.extend(labels.numpy())
        y_pred.extend(predicted.cpu().numpy())
        y_probs.extend(probs.cpu().numpy())

test_accuracy = 100 * np.mean(np.array(y_true) == np.array(y_pred))

cm = confusion_matrix(y_true, y_pred)
epochs_range = range(1, num_epochs_plot + 1)

# funkcja straty
plt.figure(figsize=(7, 6))
plt.plot(epochs_range, train_losses, label="Zbiór treningowy")
plt.plot(epochs_range, val_losses, label="Zbiór walidacyjny")
plt.xlabel("Liczba epok")
plt.ylabel("Funkcja straty")
plt.legend(loc='upper right')
plt.grid()
plt.savefig(f"models/{model_output_path}/score/funkcja_straty_{model_output_path}.png")
plt.show()

# skuteczność
plt.figure(figsize=(7, 6))
plt.plot(epochs_range, train_accuracies, label="Zbiór treningowy")
plt.plot(epochs_range, val_accuracies, label="Zbiór walidacyjny")
plt.axhline(y=test_accuracy, color='m', linestyle='--', label=f"Zbiór testowy: {test_accuracy:.2f}%")
plt.xlabel("Liczba epok")
plt.ylabel("Skuteczność [%]")
plt.legend(loc='lower right')
plt.grid()
plt.savefig(f"models/{model_output_path}/score/skutecznosc_{model_output_path}.png")
plt.show()

# macierz pomyłek
plt.figure(figsize=(7, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=range(5), yticklabels=range(5))
plt.xlabel("Wartość predykcji")
plt.ylabel("Klasa prawdziwa")
plt.title("Macierz pomyłek")
plt.savefig(f"models/{model_output_path}/score/macierz_pomylek_{model_output_path}.png")
plt.show()

metrics = calculate_metrics(cm)
print(metrics)

for class_label, scores in metrics.items():
    print(f"{class_label}: {scores}")

# zapis metryk
df = pd.DataFrame(metrics).T
df.to_csv(f"models/{model_output_path}/score/metryki_{model_output_path}.csv", index=False)

# zapis loss i acc
with open(f"models/{model_output_path}/score/metryki_2_{model_output_path}.csv", mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Funkcja straty - dane treningowe (train_losses)', 'Funkcja straty - dane walidacyjne (val_losses)',
                     'Skuteczność - dane treningowe (train_accuracies)', 'Skuteczność - dane walidacyjne val_accuracies'])
    
    for i in range(len(train_losses)):
        writer.writerow([
            train_losses[i],
            val_losses[i],
            train_accuracies[i],
            val_accuracies[i]
        ])