In [83]:
import torch.nn.functional as F  # импортируем все библиотеки
from sklearn.model_selection import train_test_split
import itertools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import TensorDataset
from torch import nn, optim, tensor, Tensor
from IPython.display import clear_output
from tqdm import tqdm
import torchvision
from torchsummary import summary
from PIL import Image
import os
from torchvision import transforms
from torch.utils.data import random_split
from torchvision.transforms.v2 import Compose, PILToTensor, ToDtype, Normalize
from ignite.engine import create_supervised_trainer, create_supervised_evaluator
from ignite.metrics import Accuracy, Loss, Metric
from sklearn.metrics import f1_score

In [84]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [85]:
class Xray_Mask_Dataloader(Dataset):
    def __init__(self, root_dir='data', images_dir='test_images'):
        self.root_dir = root_dir
        self.image_dir = os.path.join(root_dir, images_dir)

        self.image_list = os.listdir(self.image_dir)
        self.transform = transforms.Compose([
            PILToTensor(),
            ToDtype(torch.float32, scale=True)
        ])

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, f'img_{idx}.png')
        image = Image.open(img_name)
        image = self.transform(image)
        image = torch.cat([image, image, image], dim=0)

        return image.to(device)

In [86]:
class CropByMask(object):
    def __init__(self):
        pass

    def __call__(self, img, mask):
        img_np = np.array(img)
        mask_np = np.array(mask)
        img_cropped = np.where(mask_np == 1, img_np * 1.3, img_np * 0.7)
        img_cropped = torch.Tensor(img_cropped).view(1, 256, 256)
        return img_cropped

In [87]:
class Mask_Conclusion_Dataloader(Dataset):
    def __init__(self, root_dir='data', images_dir='train_images', mask_dir='train_lung_masks'):
        self.root_dir = root_dir
        self.image_dir = os.path.join(root_dir, images_dir)
        self.mask_dir = os.path.join(root_dir, mask_dir)

        self.image_list = os.listdir(self.image_dir)

        self.cropper = CropByMask()

        self.transform = transforms.Compose([
            PILToTensor(),
            ToDtype(torch.float32, scale=True),
        ])

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, f'img_{idx}.png')
        image = Image.open(img_name)
        image = self.transform(image)

        mask_name = os.path.join(self.mask_dir, f'img_{idx}.png')
        mask = Image.open(mask_name)
        mask = self.transform(mask)
        image = self.cropper(image, mask)
        image = torch.cat([image, image, image], dim=0)

        return image.to(device)

In [88]:
import cv2
import numpy as np
from skimage import measure, color

def connected_component_filter(binary_mask, min_size_threshold=1200):
    labeled_mask = measure.label(binary_mask)
    unique_labels, label_sizes = np.unique(labeled_mask, return_counts=True)
    
    for label, size in zip(unique_labels, label_sizes):
        if size < min_size_threshold:
            binary_mask[labeled_mask == label] = 0
    
    return binary_mask

def auto_choose_seed_points(binary_mask):
    labeled_mask = measure.label(binary_mask)
    colored_mask = color.label2rgb(labeled_mask, bg_label=0)
    centers = []
    for label in range(1, labeled_mask.max() + 1):
        region_indices = np.where(labeled_mask == label)
        center_y, center_x = np.mean(region_indices, axis=1)
        centers.append((int(center_x), int(center_y)))

    return centers

def flood_fill(binary_mask, seed_point):
    filled_mask = binary_mask.copy()
    h, w = binary_mask.shape
    mask = np.zeros((h+2, w+2), dtype=np.uint8)
    
    cv2.floodFill(filled_mask, mask, seed_point, 1)
    
    return filled_mask

In [89]:
class Up(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2).to(device)
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(inplace=True),

            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU()
        ).to(device)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2, x1], dim=1)
        return self.double_conv(x)


class Down(nn.Module):
    def __init__(self, in_channels, out_channels, maxpool=True):
        super().__init__()

        if not maxpool:  # если не нужен макспулинг
            self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(inplace=True),

            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(inplace=True),
            ).to(device)

        else:  # если нужен макспулинг
            self.double_conv = nn.Sequential(
            nn.MaxPool2d(2, stride=2),

            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(inplace=True),
            ).to(device)


    def forward(self, x):
        return self.double_conv(x)


class Step_Activation(nn.Module):
    def __init__(self):
        super(Step_Activation, self).__init__()
    
    def forward(self, x):
        x[x>=0] = 1
        x[x<0] = 0
        return x


class Out(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.out = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1),
            nn.Sigmoid()
            ).to(device)


    def forward(self, x):
        return self.out(x)


class Neural(nn.Module):
    def __init__(self, ran, is_summary_version=False):  # ran - количество блоков
        super().__init__()
        self.ran = ran
        self.is_summary_version = is_summary_version

        self.down = [Down(3, 64, maxpool=False)]

        self.up = []

        in_f = 32
        out_f = 64

        for i in range(self.ran - 1):
            in_f *= 2
            out_f *= 2
            self.down.append(Down(in_f, out_f))
        in_f *= 2
        out_f *= 2
        self.mid1 = Down(in_f, out_f)
        self.mid2 = Down(out_f, out_f * 2, maxpool=False)
        self.mid3 = Down(out_f * 2, out_f, maxpool=False)
        in_f, out_f = out_f, in_f

        for i in range(self.ran):
            self.up.append(Up(in_f, out_f))
            in_f //= 2
            out_f //= 2
        self.out = Out(64, 1)

    def forward(self, x):
        downs = [self.down[0](x)]
        for i in range(self.ran - 1):
            downs.append(self.down[i + 1](downs[i]))
        mid1 = self.mid1(downs[-1])
        mid2 = self.mid2(mid1)
        mid3 = self.mid3(mid2)
        up = self.up[0](mid3, downs[-1])
        ups = [up]
        for i in range(self.ran - 1):
            ups.append(self.up[i + 1](ups[i], downs[-(i + 2)]))
        out = self.out(ups[-1])

        if self.is_summary_version:
            print(*[i.size() for i in downs], sep='\n')
            print('medium')
            print(mid1.size())
            print(mid2.size())
            print(mid3.size())
            print('medium')
            print(*[i.size() for i in ups], sep='\n')
            print('out')
            print(out.size())

        return out

In [90]:
xray_mask_dataset = Xray_Mask_Dataloader(images_dir='test_images')

mask_conclusion_dataset = Mask_Conclusion_Dataloader(mask_dir='mask_test_images', images_dir='test_images')

In [91]:
segment_model = torch.load('segment_model.pth')
class_model = torch.load('class_model.pth')

In [92]:
with torch.no_grad():
    for i in range(len(xray_mask_dataset)):
        model_ans = segment_model(xray_mask_dataset[i].unsqueeze(0)).squeeze().cpu()
        filtered_mask = model_ans.numpy().copy()
        filtered_mask[filtered_mask>=0.4] = 255
        filtered_mask[filtered_mask<0.4] = 0
        filtered_mask = connected_component_filter(filtered_mask)
        filtered_mask = Tensor(filtered_mask)
        image = Image.fromarray(filtered_mask.byte().numpy(), mode='L')
        image.save(f"data/mask_test_images/img_{i}.png")
        if i % 10 == 0:
            print(f'{i}/7000')

0/7000
10/7000
20/7000
30/7000
40/7000
50/7000
60/7000
70/7000
80/7000
90/7000
100/7000
110/7000
120/7000
130/7000
140/7000
150/7000
160/7000
170/7000
180/7000
190/7000
200/7000
210/7000
220/7000
230/7000
240/7000
250/7000
260/7000
270/7000
280/7000
290/7000
300/7000
310/7000
320/7000
330/7000
340/7000
350/7000
360/7000
370/7000
380/7000
390/7000
400/7000
410/7000
420/7000
430/7000
440/7000
450/7000
460/7000
470/7000
480/7000
490/7000
500/7000
510/7000
520/7000
530/7000
540/7000
550/7000
560/7000
570/7000
580/7000
590/7000
600/7000
610/7000
620/7000
630/7000
640/7000
650/7000
660/7000
670/7000
680/7000
690/7000
700/7000
710/7000
720/7000
730/7000
740/7000
750/7000
760/7000
770/7000
780/7000
790/7000
800/7000
810/7000
820/7000
830/7000
840/7000
850/7000
860/7000
870/7000
880/7000
890/7000
900/7000
910/7000
920/7000
930/7000
940/7000
950/7000
960/7000
970/7000
980/7000
990/7000
1000/7000
1010/7000
1020/7000
1030/7000
1040/7000
1050/7000
1060/7000
1070/7000
1080/7000
1090/7000
1100/7000
1

In [93]:
with torch.no_grad():
    model_answers = []
    for i in range(len(mask_conclusion_dataset)):
        model_ans = class_model(mask_conclusion_dataset[i].unsqueeze(0)).squeeze().cpu()
        model_ans = torch.argmax(model_ans).numpy()
        model_answers.append(model_ans)
        if i % 10 == 0:
            print(f'{i}/7000')

0/7000
10/7000
20/7000
30/7000
40/7000
50/7000
60/7000
70/7000
80/7000
90/7000
100/7000
110/7000
120/7000
130/7000
140/7000
150/7000
160/7000
170/7000
180/7000
190/7000
200/7000
210/7000
220/7000
230/7000
240/7000
250/7000
260/7000
270/7000
280/7000
290/7000
300/7000
310/7000
320/7000
330/7000
340/7000
350/7000
360/7000
370/7000
380/7000
390/7000
400/7000
410/7000
420/7000
430/7000
440/7000
450/7000
460/7000
470/7000
480/7000
490/7000
500/7000
510/7000
520/7000
530/7000
540/7000
550/7000
560/7000
570/7000
580/7000
590/7000
600/7000
610/7000
620/7000
630/7000
640/7000
650/7000
660/7000
670/7000
680/7000
690/7000
700/7000
710/7000
720/7000
730/7000
740/7000
750/7000
760/7000
770/7000
780/7000
790/7000
800/7000
810/7000
820/7000
830/7000
840/7000
850/7000
860/7000
870/7000
880/7000
890/7000
900/7000
910/7000
920/7000
930/7000
940/7000
950/7000
960/7000
970/7000
980/7000
990/7000
1000/7000
1010/7000
1020/7000
1030/7000
1040/7000
1050/7000
1060/7000
1070/7000
1080/7000
1090/7000
1100/7000
1

In [94]:
import csv
with open('predict.csv', 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow(['id', 'target_feature'])

    for i, number in enumerate(model_answers, start=0):
        csv_writer.writerow([i, number])