In [47]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
import os
import cv2
import torchvision.transforms as transforms
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import random
import torch.nn.functional as F
from sklearn.metrics import precision_score, recall_score
import segmentation_models_pytorch as smp

I = 8

L1 = [0.229, 0.224, 0.225]

L = [0.485, 0.456, 0.406]

_ = (320, 544)

folder_path = "МЫШИПТУ"

seed_value = 52
torch.manual_seed(seed_value)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [48]:
def augmentation():
    path_images = "МЫШИПТУ/images"
    path_masks = "МЫШИПТУ/masks"
    image_files = os.listdir(path_images)

    selected_files_for_vertical = random.sample(image_files, 564)
    selected_files_for_horizontal = random.sample(image_files, 1)

    for files, flip_type in [(selected_files_for_vertical, Image.FLIP_LEFT_RIGHT), (selected_files_for_horizontal, Image.FLIP_TOP_BOTTOM)]:
        for filename in files:
            image = Image.open(os.path.join(path_images, filename))
            rotated_image = image.transpose(flip_type)
            rotated_image.save(os.path.join(path_images, f'revert_{flip_type}_{filename}'))
            image.close()

            mask = Image.open(os.path.join(path_masks, filename))
            rotated_mask = mask.transpose(flip_type)
            rotated_mask.save(os.path.join(path_masks, f'revert_{flip_type}_{filename}'))
            mask.close()

    return selected_files_for_vertical, selected_files_for_horizontal


In [58]:
def make_csv_files(folder_path, folder):
    images = [os.path.join(folder, "images", file) for file in os.listdir(os.path.join(folder_path, folder, "images"))]
    masks = [os.path.join(folder, "masks", file) for file in os.listdir(os.path.join(folder_path, folder, "masks"))]

    csv_file_path = "train_data.csv" if folder == "" else "test_data.csv"
    pd.DataFrame({'orig_image': images, 'mask_image': masks}).to_csv(csv_file_path, index=False)


In [59]:
train_df = pd.read_csv("train_data.csv")
test = pd.read_csv("test_data.csv")

### Подготовка данных

In [60]:
class ImagesDataset(Dataset):
    def __init__(self, folder, data, transform_image, transform_mask):
        self.folder = folder
        self.data = data.copy()
        self.orig_image_paths = [os.path.join(folder, filename) for filename in data['orig_image']]
        self.mask_image_paths = [os.path.join(folder, filename) for filename in data['mask_image']]
        self.transform_image = transform_image
        self.transform_mask = transform_mask

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        orig_image_path = self.orig_image_paths[idx]
        mask_image_path = self.mask_image_paths[idx]

        orig_image = Image.open(orig_image_path).convert('RGB')
        mask_image = Image.open(mask_image_path).convert('L')

        orig_image = self.transform_image(orig_image)
        mask_image = self.transform_mask(mask_image)

        return orig_image, mask_image


In [62]:
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import DataLoader

train, val = train_test_split(train_df, test_size=0.1, random_state=42)

# Define common parameters
SIZE = _
MEAN = L
STD = L1
BATCH_SIZE = I

transform_image = transforms.Compose([
    transforms.Resize(SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=MEAN, std=STD)
])

transform_mask = transforms.Compose([
    transforms.Resize(SIZE),
    transforms.ToTensor()
])

def create_loader(dataset, batch_size, shuffle):
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

train_dataset = ImagesDataset(folder_path, train, transform_image, transform_mask)
val_dataset = ImagesDataset(folder_path, val, transform_image, transform_mask)
test_dataset = ImagesDataset(folder_path, test, transform_image, transform_mask)

train_loader = create_loader(train_dataset, BATCH_SIZE, shuffle=True)
val_loader = create_loader(val_dataset, BATCH_SIZE, shuffle=False)
test_loader = create_loader(test_dataset, BATCH_SIZE, shuffle=False)


### Обучение

In [63]:
def train_model(epochs, train_loader, val_loader, model, optimizer, criterion, scheduler=None, device='cuda'):
    for epoch in range(epochs):
        model.train()
        for xb, yb in tqdm(train_loader):
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            loss = criterion(model(xb), yb)
            loss.backward()
            optimizer.step()
        
        if scheduler:
            scheduler.step()

        model.eval()
        with torch.no_grad():
            for xb, yb in tqdm(val_loader):
                loss = criterion(model(xb.to(device)), yb.to(device))

    return model

### Экспериме

In [64]:
model_name = 'efficientnet-b4'
model = smp.UnetPlusPlus(encoder_name=model_name, encoder_weights='imagenet', in_channels=3, classes=1)
model.to(device)

UnetPlusPlus(
  (encoder): EfficientNetEncoder(
    (_conv_stem): Conv2dStaticSamePadding(
      3, 48, kernel_size=(3, 3), stride=(2, 2), bias=False
      (static_padding): ZeroPad2d((0, 1, 0, 1))
    )
    (_bn0): BatchNorm2d(48, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
    (_blocks): ModuleList(
      (0): MBConvBlock(
        (_depthwise_conv): Conv2dStaticSamePadding(
          48, 48, kernel_size=(3, 3), stride=[1, 1], groups=48, bias=False
          (static_padding): ZeroPad2d((1, 1, 1, 1))
        )
        (_bn1): BatchNorm2d(48, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
        (_se_reduce): Conv2dStaticSamePadding(
          48, 12, kernel_size=(1, 1), stride=(1, 1)
          (static_padding): Identity()
        )
        (_se_expand): Conv2dStaticSamePadding(
          12, 48, kernel_size=(1, 1), stride=(1, 1)
          (static_padding): Identity()
        )
        (_project_conv): Conv2dStaticS

In [65]:
import segmentation_models_pytorch as smp

learning_rate = 0.001
optimizer = optim.Adamax(model.parameters(), lr=learning_rate)
criterion = smp.losses.DiceLoss(mode='binary')
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

In [66]:
num_epochs = 20
model = train_model(num_epochs, train_loader, val_loader, model, optimizer, criterion,
                                                        scheduler)

  8%|▊         | 13/160 [01:33<17:33,  7.16s/it]


KeyboardInterrupt: 

In [3]:
model = smp.UnetPlusPlus('efficientnet-b4', classes=1).to(device)
model.load_state_dict(torch.load("models/efficientnet-b4_IOU-0.8773694597348415.pth"))

<All keys matched successfully>

In [34]:
import torch
from torchvision import transforms
import numpy as np
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score

def predict_masks(model, loader, device='cuda', size=(544, 928)):
    model.eval()
    predicted_masks, ground_truth_masks = [], []
    transform = transforms.Resize(size)

    with torch.no_grad():
        for x_batch, y_batch in tqdm(loader):
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            outputs = torch.round(torch.sigmoid(model(x_batch)))
            outputs, y_batch = map(transform, [outputs, y_batch])
            predicted_masks.append(outputs.cpu().numpy())
            ground_truth_masks.append(y_batch.cpu().numpy())

    predicted_masks = np.concatenate(predicted_masks).squeeze()
    ground_truth_masks = np.concatenate(ground_truth_masks).squeeze()
    return predicted_masks, ground_truth_masks

def calculate_precision_recall(predicted_masks, ground_truth_masks):
    pred_flat = predicted_masks.flatten().astype(int)
    gt_flat = ground_truth_masks.flatten().astype(int)
    precision = precision_score(gt_flat, pred_flat)
    recall = recall_score(gt_flat, pred_flat)
    return precision, recall

def calculate_iou(predicted_masks, ground_truth_masks):
    intersection = np.logical_and(predicted_masks, ground_truth_masks).sum()
    union = np.logical_or(predicted_masks, ground_truth_masks).sum()
    iou = intersection / union if union != 0 else 0
    return iou

def validate_model(model, loader):
    predicted_masks, ground_truth_masks = predict_masks(model, loader)
    precision, recall = calculate_precision_recall(predicted_masks, ground_truth_masks)
    iou = calculate_iou(predicted_masks, ground_truth_masks)
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"IoU: {iou:.4f}")


### Оценка

In [35]:
validate_model(model, val_loader)

100%|██████████| 18/18 [00:04<00:00,  3.72it/s]


Precision: 0.8367
Recall: 0.9922
IoU: 0.8635


In [36]:
validate_model(model, test_loader)

100%|██████████| 2/2 [00:00<00:00,  4.76it/s]


Precision: 0.8223
Recall: 0.9762
IoU: 0.8488
