In [214]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
import pandas as pd
from torch import nn
import random
from torchvision import transforms

In [215]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "mps:0" if torch.backends.mps.is_available() else "cpu")
DEVICE

device(type='mps', index=0)

In [216]:
# устанавливаем seed, чтобы результаты не изменялись при не изменение чего-либо
torch.manual_seed(666)
random.seed(666)

In [217]:
class ImageToImageDataset(Dataset):
    """
    Создает набор данных для работы с парами изображений.

    Parameters:
        input_dir (str): Путь к папке с входными изображениями.
        target_dir (str, optional): Путь к папке с целевыми изображениями (по умолчанию: None).
        transform (callable, optional): Преобразование изображений (по умолчанию: None).
        mode (str): Режим работы ('train' или другой) (по умолчанию: 'train').

    Attributes:
        input_dir (str): Путь к папке с входными изображениями.
        target_dir (str): Путь к папке с целевыми изображениями (если указан).
        transform (callable): Функция преобразования изображений.
        mode (str): Режим работы.
        filenames (list): Список имен файлов изображений в папке с входными изображениями.
    """
    def __init__(self, input_dir, target_dir=None, transform=None, mode='train'):
        self.input_dir = input_dir
        self.target_dir = target_dir
        self.transform = transform
        self.mode = mode

        # Получение списка имен файлов в папке с входными изображениями
        self.filenames = [f for f in os.listdir(input_dir) if f.endswith('.png')]

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        # Формирование пути к входному изображению и его открытие
        input_path = os.path.join(self.input_dir, self.filenames[idx])
        input_image = Image.open(input_path).convert('L')

        # Применение преобразования к входному изображению, если оно задано
        if self.transform:
            input_image = self.transform(input_image)

        # Если режим 'train', получаем путь к целевому изображению и его открытие
        if self.mode == 'train':
            target_path = os.path.join(self.target_dir, self.filenames[idx])
            target_image = Image.open(target_path).convert('L')
            # Возвращаем пару изображений (входное и выходной)
            return input_image, target_image
        else:
            # Возвращаем только входное изображение
            return input_image

In [218]:
class ImageToNumDataset(Dataset):
    """
    Создает набор данных изображений.

    Parameters:
        img_dir (str): Путь к папке с изображениями.
        transform (callable, optional): Преобразование изображений (по умолчанию: None).
        answers_file (str, optional): Путь к файлу с ответами (по умолчанию: None).

    Attributes:
        img_dir (str): Путь к папке с изображениями.
        transform (callable): Функция преобразования изображений.
        answers_file (str): Путь к файлу с ответами (если указан).
        img_labels (DataFrame): DataFrame с метками изображений или None, если метки отсутствуют.
        image_filenames (list): Список имен файлов изображений в папке.
    """
    def __init__(self, img_dir, transform=None, answers_file=None):
        # Инициализация класса ImageToNumDataset с указанием директории изображений,
        # возможности трансформации и файла с ответами
        self.img_dir = img_dir
        self.transform = transform
        self.answers_file = answers_file

        # Если указан файл с ответами, загружаем его в виде DataFrame,
        # иначе оставляем метки изображений пустыми
        if self.answers_file is not None:
            self.img_labels = pd.read_csv(answers_file)
        else:
            self.img_labels = None

        # Получение списка имен файлов изображений в указанной директории и их сортировка по номеру
        self.image_filenames = [file for file in os.listdir(img_dir) if file.endswith('.png')]
        self.image_filenames.sort(key=lambda x: int(x.replace("img_", "", 1).replace(".png", "", 1)))

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        # Получение имени файла изображения по индексу
        img_name = self.image_filenames[idx]
        # Формирование пути к изображению
        img_path = os.path.join(self.img_dir, img_name)
        # Открытие изображения и преобразование в оттенки серого
        image = Image.open(img_path).convert("L")

        # Применение трансформации (если указана)
        if self.transform:
            image = self.transform(image)

        # Если имеются метки изображений, возвращаем изображение и соответствующую метку
        if self.img_labels is not None:
            label = self.img_labels.iloc[idx, 1]
            return image, label
        else:
            # Если меток нет, возвращаем только изображение
            return image


In [219]:
class NoMaskModel(nn.Module):
    """
    Модель нейронной сети для обработки изображений.

    Attributes:
        dropout (nn.Dropout): Операция dropout для регуляризации.
        input_liner (nn.Linear): Полносвязный слой для обработки входных данных.
        softmax (nn.Softmax): Функция Softmax для получения вероятностного распределения.
        output_liner (nn.Linear): Выходной полносвязный слой модели.
        relu (nn.ReLU): Функция активации ReLU для извлечения признаков.
        pool (nn.MaxPool2d): Операция пулинга для уменьшения размерности данных.
        conv1 (nn.Conv2d): Первый сверточный слой.
        conv2 (nn.Conv2d): Второй сверточный слой.
        b1 (nn.BatchNorm2d): Нормализация для первого сверточного слоя.
        b2 (nn.BatchNorm2d): Нормализация для второго сверточного слоя.
        backend_liner1 (nn.Linear): Внутренний полносвязный слой.
    """

    dropout: nn.Dropout
    input_liner: nn.Linear
    softmax: nn.Softmax
    output_liner: nn.Linear
    relu: nn.ReLU
    pool: nn.MaxPool2d
    conv1: nn.Conv2d
    conv2: nn.Conv2d
    b1: nn.BatchNorm2d
    b2: nn.BatchNorm2d
    backend_liner1: nn.Linear

    def __init__(self):
        super().__init__()
        # Инициализация слоев и операций
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.2)

        # Определение сверточных слоев с указанием параметров
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=self.conv1.out_channels, out_channels=12, kernel_size=3, stride=1, padding=1)

        # Инициализация слоев нормализации
        self.bn1 = nn.BatchNorm2d(self.conv1.out_channels)
        self.bn2 = nn.BatchNorm2d(self.conv2.out_channels)

        # Инициализация функции активации ReLU
        self.relu = nn.ReLU()

        # Инициализация полносвязных слоев
        self.input_liner = nn.Linear(self.conv2.out_channels * 64 * 64, 128)
        self.output_liner = nn.Linear(self.input_liner.out_features, 3)

        # Инициализация функции Softmax для получения вероятностного распределения
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        # Свертка, нормализация, применение функции активации и пулинг
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.pool(self.relu(x))

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.pool(self.relu(x))

        # Выравнивание тензора перед подачей на полносвязный слой
        x = x.view(-1, self.conv2.out_channels * 64 * 64)

        # Проход через полносвязные слои с применением активации и dropout
        x = self.relu(self.input_liner(x))
        x = self.dropout(x)

        # Выходной слой
        x = self.output_liner(x)

        # Применение Softmax для получения вероятностного распределения
        x = self.softmax(x)

        return x

Первая модель

In [220]:
import torch
from torchvision.transforms.v2 import ToDtype, Normalize, Compose, PILToTensor

transform = Compose([
    PILToTensor(),
    ToDtype(torch.float32, scale=True),
    Normalize((0.5,), (0.5,))
])

In [221]:
# Создание датасета 
dataset = ImageToNumDataset("data/train_images", answers_file="data/train_answers.csv", transform=transform)

In [222]:
from torch.utils.data import DataLoader, random_split

train_dataset, validation_dataset = random_split(dataset, (0.8, 0.2))

train_dataloader = DataLoader(train_dataset, batch_size=2**5, shuffle=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=2**5, shuffle=False)

In [223]:
model = NoMaskModel()
model = model.to(DEVICE)
# загрузка модели
model.load_state_dict(torch.load("models/model.pt"))

<All keys matched successfully>

In [224]:
from torch import nn, optim

# Определение функции потерь для задачи классификации
criterion = nn.CrossEntropyLoss()
# Инициализация оптимизатора для обновления параметров модели с использованием алгоритма Adam
optimizer = optim.Adam(model.parameters(), lr=1e-6)

In [225]:
from ignite.metrics import Accuracy, Loss
from ignite.engine import create_supervised_trainer, create_supervised_evaluator

trainer = create_supervised_trainer(model, optimizer, criterion, device=DEVICE)
evaluator = create_supervised_evaluator(model, metrics={'accuracy': Accuracy(), 'nll': Loss(criterion)}, device=DEVICE)

In [226]:
# Сбор потерь и метрик для построения графиков
train_loss_values = []
validation_loss_values = []
validation_accuracy_values = []

In [227]:
import logging


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
logging.getLogger("ignite.engine.engine.Engine").setLevel(logging.WARNING)

In [228]:
EPOCHS = 50

In [229]:
import datetime
from ignite.engine import Events


@trainer.on(Events.EPOCH_STARTED)
def log_training_start(engine):
    logging.info(f"Starting learning at epoch {engine.state.epoch} in {datetime.datetime.now()}")


@trainer.on(Events.EPOCH_COMPLETED)
def log_validation_results(engine):
    logging.info(f"End learning at epoch {engine.state.epoch} in {datetime.datetime.now()}")
    logging.info(f"Starting validation on epoch {engine.state.epoch}")
    # Запуск оценки модели на валидационном наборе данных
    evaluator.run(validation_dataloader)
    metrics = evaluator.state.metrics
    # Сбор и вывод средней точности и потерь на валидационном наборе
    validation_loss_values.append(metrics['nll'])
    validation_accuracy_values.append(metrics['accuracy'])
    logging.info(
        f"Validation Results - Epoch: {engine.state.epoch}  "
        f"Avg accuracy: {metrics['accuracy']:.3f} "
        f"Avg loss: {metrics['nll']:.3f}"
    )
    logging.info(f"End of validation on epoch {engine.state.epoch}")
    torch.save(model.state_dict(), "models/model.pt")

In [230]:
trainer.run(train_dataloader, max_epochs=EPOCHS)

INFO:root:Starting learning at epoch 1 in 2023-12-22 23:20:50.008104
ERROR:ignite.engine.engine.Engine:Engine run is terminating due to exception: 


KeyboardInterrupt: 

In [None]:
from matplotlib import pyplot as plt

# Графики обучения
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(train_loss_values, label='Training Loss')
plt.plot(validation_loss_values, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(validation_accuracy_values, label='Validation Accuracy', color='red')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
import torch
import random


# устанавливаем seed, чтобы результаты не изменялись при не изменение чего-либо
torch.manual_seed(666)
random.seed(666)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

test_model = NoMaskModel()
test_model = test_model.to(DEVICE)
test_model.load_state_dict(torch.load("models/model.pt"))
test_dataset = ImageToNumDataset("data/test_images", transform=transform)

In [None]:
import csv

# Генерация файлов ответа
test_model.eval()
len_dataset = len(test_dataset)
with open("answer.csv", "w") as file:
    writer = csv.writer(file, delimiter=",")
    writer.writerow(["id", "target_feature"])
    for index, image in enumerate(test_dataset):
        with torch.no_grad():
            pred_y = test_model(image.unsqueeze(0))
        answer = max(((n, i) for i, n in enumerate(pred_y[0])), key=lambda x: x[0])[1]
        writer.writerow([index, answer])
        if index % 10 == 0 or index % 10 == 9:
            print(f"{(index / len_dataset) * 100:.2f}%")
print("100%")

Вторая модель

In [None]:
class ImageToImageDataset(Dataset):
    def __init__(self, input_dir, target_dir=None, transform=None, target_transform=None, mode='train'):
        self.input_dir = input_dir
        self.target_dir = target_dir
        self.transform = transform
        self.target_transform = target_transform or transform
        self.mode = mode
        self.filenames = [f for f in os.listdir(input_dir) if f.endswith('.png')]

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        input_path = os.path.join(self.input_dir, self.filenames[idx])
        input_image = Image.open(input_path).convert('L')

        if self.transform:
            input_image = self.transform(input_image)

        if self.mode == 'train':
            target_path = os.path.join(self.target_dir, self.filenames[idx])
            target_image = Image.open(target_path).convert('L')

            if self.target_transform:
                target_image = self.target_transform(target_image)

            return input_image, target_image
        else:
            return input_image

In [None]:
import torchvision

def save_image(tensor, filename):
    """ Сохраняет тензор как изображение. """
    torchvision.utils.save_image(tensor, filename)

# Определение модели
class SimpleCNN(nn.Module):
    """
    Простая сверточная нейронная сеть.

    Attributes:
        conv1 (nn.Conv2d): Первый сверточный слой.
        conv2 (nn.Conv2d): Второй сверточный слой.
        conv3 (nn.Conv2d): Третий сверточный слой.
        relu (nn.ReLU): Функция активации ReLU для извлечения признаков.
        sigmoid (nn.Sigmoid): Функция активации Sigmoid для создания вероятностного вывода.
    """

    def __init__(self):
        super(SimpleCNN, self).__init__()
        # Инициализация сверточных слоев и функций активации
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 1, 3, padding=1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Проход данных через сверточные слои и функции активации
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.sigmoid(self.conv3(x))
        return x

# Параметры
num_epochs = 6
batch_size = 4
learning_rate = 0.00001

# Подготовка данных
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))  # Нормализация если нужна
])

target_transform = transforms.Compose([
    transforms.ToTensor()  # Только преобразование в тензор для масок
])

train_dataset = ImageToImageDataset(input_dir="data/train_images", target_dir="data/train_lung_masks", transform=transform, target_transform =target_transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

model = SimpleCNN()
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Обучение модели
for epoch in range(num_epochs):
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
model_path = "model_mask.pth"
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")
dataset = ImageToNumDataset(img_dir="data/train_images_after_model", transform=transform, answers_file="data/train_answers.csv")
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
import torch.nn as nn
from torchsummary import summary

class EnhancedNoMaskModel(nn.Module):
    """
   Расширенная модель нейронной сети для обработки изображений с увеличенным количеством сверточных слоев.

   Attributes:
       pool (nn.MaxPool2d): Операция пулинга для уменьшения размерности данных.
       dropout (nn.Dropout): Операция dropout для регуляризации.
       conv1 (nn.Conv2d): Первый сверточный слой.
       conv2 (nn.Conv2d): Второй сверточный слой.
       conv3 (nn.Conv2d): Третий сверточный слой.
       conv4 (nn.Conv2d): Четвертый сверточный слой.
       bn1 (nn.BatchNorm2d): Нормализация для первого сверточного слоя.
       bn2 (nn.BatchNorm2d): Нормализация для второго сверточного слоя.
       bn3 (nn.BatchNorm2d): Нормализация для третьего сверточного слоя.
       bn4 (nn.BatchNorm2d): Нормализация для четвертого сверточного слоя.
       relu (nn.ReLU): Функция активации ReLU для извлечения признаков.
       input_liner (nn.Linear): Полносвязный слой для обработки входных данных.
       liner1 (nn.Linear): Полносвязный слой для внутреннего представления.
       output_liner (nn.Linear): Выходной полносвязный слой модели.
       softmax (nn.Softmax): Функция Softmax для получения вероятностного распределения.
   """

    def __init__(self):
        super().__init__()
        # Инициализация сверточных слоев, операций и полносвязных слоев
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.2)


        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=12, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(in_channels=24, out_channels=48, kernel_size=3, stride=1, padding=1)

        self.bn1 = nn.BatchNorm2d(6)
        self.bn2 = nn.BatchNorm2d(12)
        self.bn3 = nn.BatchNorm2d(24)
        self.bn4 = nn.BatchNorm2d(48)

        self.relu = nn.ReLU()

        self.input_liner = nn.Linear(48 * 16 * 16, 48 * 16)
        self.liner1 = nn.Linear(48 * 16, 48)
        self.output_liner = nn.Linear(48, 3)

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.pool(self.relu(x))

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.pool(self.relu(x))

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.pool(self.relu(x))

        x = self.conv4(x)
        x = self.bn4(x)
        x = self.pool(self.relu(x))

        # Выравнивание данных перед подачей на полносвязные слои
        x = x.view(-1, 48 * 16 * 16)

        x = self.relu(self.input_liner(x))
        x = self.dropout(x)
        x = self.liner1(x)
        x = self.output_liner(x)

        # Применение Softmax для получения вероятностного распределения
        x = self.softmax(x)

        return x

model = EnhancedNoMaskModel()

summary(model, (1, 256, 256))

In [None]:
model = EnhancedNoMaskModel()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
from sklearn.metrics import f1_score


def calculate_f1(loader):
    """
    Расчет метрики F1 для модели на заданном загрузчике данных.

    Args:
        loader (DataLoader): Загрузчик данных для расчета метрики.

    Returns:
        float: Значение метрики F1 для модели на данных из загрузчика.
    """
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    return f1_score(all_labels, all_preds, average='macro')

In [None]:
# Обучение
num_epochs = 1
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    f1 = calculate_f1(test_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}, F1-Score: {f1}")
    model_path = f"output_models/model_main{epoch+1}.pth"
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to {model_path}")

print("Training Complete")
