In [None]:
# Импорты
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import transforms
from tqdm.auto import tqdm

# Подключение Google Drive
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
# Константы и маппинг классов
CLASS_MAPPING = {
    'Potato_healthy': 2,
    'Potato_sick_early': 1,
    'Potato_sick_late': 0
}

# Функция загрузки данных
def download_data(path_dataset):
    data = []
    labels = []
    print(f"Загрузка данных из: {path_dataset}")
    # Проходим по папкам-классам
    for class_name in tqdm(sorted(os.listdir(path=path_dataset)), desc="Классы"):
        if class_name not in CLASS_MAPPING:
            continue # Пропускаем файлы или папки, которых нет в маппинге

        class_path = os.path.join(path_dataset, class_name)
        if not os.path.isdir(class_path):
            continue

        # Проходим по изображениям внутри папки
        for filename in sorted(os.listdir(class_path)):
            try:
                full_path = os.path.join(class_path, filename)
                image = Image.open(full_path).convert('RGB').resize((180, 180))
                data.append(np.array(image))
                labels.append(CLASS_MAPPING[class_name])
            except Exception as e:
                print(f"Ошибка при обработке файла {filename}: {e}")

    return np.array(data), np.array(labels)

In [None]:
# Класс датасета
class SimpleDataset(torch.utils.data.Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        # Переводим в тензор
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]
        image = self.transform(image) # Применяем трансформации
        return image, torch.tensor(label).long() # Возвращаем тензор с меткой

In [None]:
# Запуск загрузки и создание датасетов

# Путь к данным
base_path = '/content/drive/MyDrive/my_colab_data/Plants'
train_path = os.path.join(base_path, 'Train')
test_path = os.path.join(base_path, 'Test')

# Загружаем numpy массивы
train_data_np, train_labels_np = download_data(train_path)
test_data_np, test_labels_np = download_data(test_path)

# Экземпляры класса
train_dataset = SimpleDataset(train_data_np, train_labels_np)
test_dataset = SimpleDataset(test_data_np, test_labels_np)

print(f"\nРазмер обучающей выборки: {len(train_dataset)}")
print(f"Размер тестовой выборки: {len(test_dataset)}")
print(f"Размер одного сэмпла (картинка): {train_dataset[0][0].shape}")

In [None]:
# Визуализация для проверки

print("\nПримеры изображений из датасета:")
plt.figure(figsize=(15, 5))
# Отобразим по 3 картинки каждого класса
indices = {0: [], 1: [], 2: []}
for i, label in enumerate(train_labels_np):
    if len(indices[label]) < 3:
        indices[label].append(i)
    if all(len(v) == 3 for v in indices.values()):
        break

plot_idx = 1
for label_id, idx_list in indices.items():
    for idx in idx_list:
        plt.subplot(3, 3, plot_idx)
        # Обратное преобразование для imshow: CHW -> HWC
        plt.imshow(train_dataset[idx][0].permute(1, 2, 0))
        # Находим имя класса по его ID
        class_name = [k for k, v in CLASS_MAPPING.items() if v == label_id][0].replace("Potato_", "")
        plt.title(class_name)
        plt.axis('off')
        plot_idx += 1
plt.tight_layout()
plt.show()

In [None]:
# Определение архитектуры ResNet
class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity # Сложение выхода с входом
        out = self.relu(out)
        return out

In [None]:
# Версия ResNet
class SimpleResNet(nn.Module):
    def __init__(self, num_classes=3):
        super(SimpleResNet, self).__init__()
        self.in_channels = 64

        # Начальный блок
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Residual блоки
        self.layer1 = self._make_layer(64, 2)
        self.layer2 = self._make_layer(128, 2, stride=2)
        self.layer3 = self._make_layer(256, 2, stride=2)

        # Классификатор
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_classes)

    def _make_layer(self, out_channels, blocks, stride=1):
        downsample = None

        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        layers = []
        layers.append(ResNetBlock(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(ResNetBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

print("Архитектура SimpleResNet определена")

In [None]:
# Обучение SimpleResNet с нуля
import torch.optim as optim
# Гиперпараметры
batch_size = 32
num_classes = 3
num_epochs = 10
lr = 0.001

# Загрузчики данных
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_scratch = SimpleResNet(num_classes=num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_scratch.parameters(), lr=lr)

# Списки для хранения метрик
history_scratch = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': []}

print("Начинаем обучение SimpleResNet с нуля...")
for epoch in range(num_epochs):
    # Фаза обучения
    model_scratch.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in tqdm(train_loader, desc=f"Эпоха {epoch+1}/{num_epochs} [Обучение]"):
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model_scratch(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_train_loss = running_loss / len(train_loader)
    epoch_train_acc = correct / total
    history_scratch['train_loss'].append(epoch_train_loss)
    history_scratch['train_acc'].append(epoch_train_acc)

    # Фаза валидации
    model_scratch.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model_scratch(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    epoch_val_loss = val_loss / len(test_loader)
    epoch_val_acc = val_correct / val_total
    history_scratch['val_loss'].append(epoch_val_loss)
    history_scratch['val_acc'].append(epoch_val_acc)

    print(f"Эпоха {epoch+1}/{num_epochs} | "
          f"Train Acc: {epoch_train_acc:.3f} | Val Acc: {epoch_val_acc:.3f} | "
          f"Train Loss: {epoch_train_loss:.3f} | Val Loss: {epoch_val_loss:.3f}")

In [None]:
# Графики для SimpleResNet

def plot_history(history, title):
    plt.figure(figsize=(14, 5))
    epochs_range = range(1, len(history['train_acc']) + 1)

    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, history['train_acc'], 'bo-', label='Точность на обучении')
    plt.plot(epochs_range, history['val_acc'], 'ro-', label='Точность на валидации')
    plt.title('Точность модели')
    plt.xlabel('Эпохи')
    plt.ylabel('Точность')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, history['train_loss'], 'bo-', label='Потери на обучении')
    plt.plot(epochs_range, history['val_loss'], 'ro-', label='Потери на валидации')
    plt.title('Потери модели')
    plt.xlabel('Эпохи')
    plt.ylabel('Потери')
    plt.legend()

    plt.suptitle(title, fontsize=16)
    plt.show()

plot_history(history_scratch, "Результаты обучения SimpleResNet с нуля")

In [None]:
# Настройка модели для Transfer Learning
from torchvision import models

# Загружаем предобученную модель ResNet-18
model_tl = models.resnet18(pretrained=True)

# "Замораживаем" все слои модели
for param in model_tl.parameters():
    param.requires_grad = False

# Заменяем последний слой (классификатор)
num_ftrs = model_tl.fc.in_features # Узнаем количество входов в последний слой
# Создаем новый классификатор
model_tl.fc = nn.Linear(num_ftrs, num_classes)

# Переносим модель
model_tl = model_tl.to(device)

print("Предобученная модель ResNet-18 подготовлена для Transfer Learning")
print("Заморожены все слои, кроме последнего полносвязного (fc)")

In [None]:
# Обучение с Transfer Learning
import torch.optim as optim

# Гиперпараметры.
num_epochs_tl = 5
lr_tl = 0.001

# Параметры нового слоя
optimizer_tl = optim.Adam(model_tl.fc.parameters(), lr=lr_tl)
criterion_tl = nn.CrossEntropyLoss()

history_tl = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': []}

print("\nНачинаем обучение с Transfer Learning")
for epoch in range(num_epochs_tl):
    # Фаза обучения
    model_tl.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in tqdm(train_loader, desc=f"Эпоха {epoch+1}/{num_epochs_tl} [Обучение]"):
        images, labels = images.to(device), labels.to(device)
        optimizer_tl.zero_grad()
        outputs = model_tl(images)
        loss = criterion_tl(outputs, labels)
        loss.backward()
        optimizer_tl.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_train_loss = running_loss / len(train_loader)
    epoch_train_acc = correct / total
    history_tl['train_loss'].append(epoch_train_loss)
    history_tl['train_acc'].append(epoch_train_acc)

    # Фаза валидации
    model_tl.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model_tl(images)
            loss = criterion_tl(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    epoch_val_loss = val_loss / len(test_loader)
    epoch_val_acc = val_correct / val_total
    history_tl['val_loss'].append(epoch_val_loss)
    history_tl['val_acc'].append(epoch_val_acc)

    print(f"Эпоха {epoch+1}/{num_epochs_tl} | "
          f"Train Acc: {epoch_train_acc:.3f} | Val Acc: {epoch_val_acc:.3f} | "
          f"Train Loss: {epoch_train_loss:.3f} | Val Loss: {epoch_val_loss:.3f}")

# Визуализация результатов Transfer Learning
plot_history(history_tl, "Результаты обучения с Transfer Learning (ResNet-18)")

In [None]:
# Визуализация предсказаний

def visualize_model_predictions(model, dataloader, class_names, num_images=16):

    # Отображает изображения из батча и предсказания модели
    model.eval()
    images_so_far = 0
    fig = plt.figure(figsize=(16, 10))

    with torch.no_grad():
        images, labels = next(iter(dataloader))
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)

        for i in range(num_images):
            images_so_far += 1
            ax = plt.subplot(num_images // 4, 4, images_so_far)
            ax.axis('off')

            # Предсказанная и истинная метки
            pred_class = class_names[preds[i]].replace("Potato_", "")
            true_class = class_names[labels[i]].replace("Potato_", "")

            title_color = "green" if pred_class == true_class else "red"
            ax.set_title(f"Предсказание: {pred_class}\n(Верно: {true_class})", color=title_color)

            # Отображаем изображение
            img_display = images.cpu().data[i].permute(1, 2, 0).numpy()
            ax.imshow(img_display)

            if images_so_far == num_images:
                plt.tight_layout()
                plt.show()
                return

test_loader_viz = DataLoader(test_dataset, batch_size=16, shuffle=True)

class_names_list = ['Potato_sick_late', 'Potato_sick_early', 'Potato_healthy']

visualize_model_predictions(model_tl, test_loader_viz, class_names_list)