In [1]:
import os
import glob
import cv2
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision.models import resnet34, ResNet34_Weights
from torchvision import transforms
from sklearn.metrics import jaccard_score, recall_score, precision_score, f1_score, roc_auc_score
import logging
from datetime import datetime
import random
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import pandas as pd
import segmentation_models_pytorch as smp
import matplotlib.pyplot as plt

  check_for_updates()


In [2]:

torch.cuda.empty_cache()

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
test_logger = logging.getLogger("test_logger")
test_logger.setLevel(logging.INFO)
file_handler = logging.FileHandler("test_results.log", mode="w")
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
test_logger.addHandler(file_handler)

In [5]:

num_epochs = 20
batch_size = 8
learning_rate = 1e-4



In [6]:
def get_train_augmentations():
    return A.Compose([
        A.Resize(256, 256),

        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.Rotate(limit=180, border_mode=cv2.BORDER_CONSTANT, value=0, mask_value=0, p=0.6),
        A.GaussNoise(var_limit=(1.0, 3.0), mean=0.0, p=0.1),
        A.MultiplicativeNoise(multiplier=(0.9, 1.1), per_channel=True, p=0.2),
        A.GaussianBlur(blur_limit=(3, 5), p=0.3),
        A.CoarseDropout(
            max_holes=2,
            max_height=16,
            max_width=16,
            min_holes=1,
            min_height=8,
            min_width=8,
            fill_value=0,
            p=0.1
        ),

        A.Normalize(mean=[0.485, 0.456, 0.406],
                    std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ], additional_targets={'mask': 'mask'})


In [7]:
class PolypDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_paths = sorted(glob.glob(os.path.join(image_dir, "*.*")))
        self.mask_paths = sorted(glob.glob(os.path.join(mask_dir, "*.*")))
        assert len(self.image_paths) == len(self.mask_paths), \
            f"Number of images ({len(self.image_paths)}) and masks ({len(self.mask_paths)}) don't match!"
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = cv2.cvtColor(cv2.imread(self.image_paths[idx]), cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.mask_paths[idx], cv2.IMREAD_GRAYSCALE)
        
        if self.transform:
            transformed = self.transform(image=image, mask=mask)
            image = transformed["image"]
            mask = transformed["mask"]
            # Ensure mask is float32 and in [0,1] range
            mask = mask.float() / 255.0
        else:
            image = cv2.resize(image, (256, 256)) / 255.0   #Проверь 256!
            mask = (cv2.resize(mask, (256, 256)) > 127).astype(np.float32)  # Changed to float32
            image = (image - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225]
            image = torch.from_numpy(image.transpose(2, 0, 1)).float()
            mask = torch.from_numpy(mask).unsqueeze(0).float()  # Ensure float
            
        filename = os.path.basename(self.image_paths[idx])
        return image, mask, filename

def metrics(y_true, y_pred, y_prob):
    # Convert tensors to numpy arrays if needed
    if isinstance(y_true, torch.Tensor):
        y_true = y_true.cpu().numpy()
    if isinstance(y_pred, torch.Tensor):
        y_pred = y_pred.cpu().numpy()
    if isinstance(y_prob, torch.Tensor):  # Исправлено: было y_pred вместо y_prob
        y_prob = y_prob.cpu().numpy()

    # Prepare binary labels (image-level)
    y_true_binar = []
    y_pred_binar = []
    y_prob_binar = []
    
    for i in range(len(y_true)):
        y_true_binar.append(int(y_true[i].any()))
        y_pred_binar.append(int(y_pred[i].any()))
        # For probabilities, use max instead of mean for better ROC AUC calculation
        y_prob_binar.append(float(y_prob[i].mean()))  # Исправлено: max вместо mean

    # Flatten for pixel-level metrics
    y_true_flat = y_true.flatten().astype(int)
    y_pred_flat = (y_pred.flatten() > 0.5).astype(int)
    
    # Calculate pixel-level metrics
    iou = jaccard_score(y_true_flat, y_pred_flat, zero_division=0)

        
    dice = (2. * np.sum(y_true_flat * y_pred_flat)) / (np.sum(y_true_flat) + np.sum(y_pred_flat) + 1e-8)
    
    # Calculate image-level metrics

    rec = recall_score(y_true_binar, y_pred_binar, zero_division=0)
    prec = precision_score(y_true_binar, y_pred_binar, zero_division=0)
    f1 = f1_score(y_true_binar, y_pred_binar, zero_division=0)
    auc = roc_auc_score(y_true_binar, y_prob_binar)  # Исправлено: используем y_prob_binar


    return iou, rec, prec, f1, auc, dice

class DiceBCELoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.bce = nn.BCELoss()

    def forward(self, pred, target):
        # Ensure target is float (in case it's not)
        target = target.float()
        
        smooth = 1.0
        pred = pred.contiguous().view(-1)
        target = target.contiguous().view(-1)
        
        # BCE Loss
        bce_loss = self.bce(pred, target)
        
        # Dice Loss
        intersection = (pred * target).sum()
        dice_coeff = (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth)
        dice_loss = 1 - dice_coeff
        
        return dice_loss + bce_loss

def train_one_epoch(model, dataloader, criterion, optimizer, device, writer, epoch, dataset_name, treshold):
    model.train()
    running_loss = 0.0

    all_y_true = []
    all_y_pred = []
    all_y_prob = []
    for images, masks, _ in tqdm(dataloader, desc="Training", leave=False):
        images = images.to(device)
        masks = masks.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

        preds = (outputs > treshold).float().detach()
        probs = torch.sigmoid(outputs).detach()
        all_y_true.append(masks.cpu().numpy())
        all_y_pred.append(preds.cpu().numpy())
        all_y_prob.append(probs.cpu().numpy()) 

    epoch_loss = running_loss / len(dataloader.dataset)
    all_y_true = np.concatenate(all_y_true, axis=0)
    all_y_pred = np.concatenate(all_y_pred, axis=0)
    all_y_prob = np.concatenate(all_y_prob, axis=0)

    iou, recall, precision, f1, auc, dice = metrics(all_y_true, all_y_pred, all_y_prob)

    writer.add_scalar(f"Loss/Train/{dataset_name}", epoch_loss, epoch)
    writer.add_scalar(f"Metrics/Train_IoU/{dataset_name}", iou, epoch)
    writer.add_scalar(f"Metrics/Train_Recall/{dataset_name}", recall, epoch)
    writer.add_scalar(f"Metrics/Train_Precision/{dataset_name}", precision, epoch)
    writer.add_scalar(f"Metrics/Train_F1/{dataset_name}", f1, epoch)
    writer.add_scalar(f"Metrics/Train_AUC/{dataset_name}", auc, epoch)
    writer.add_scalar(f"Metrics/Train_DICE/{dataset_name}", dice, epoch)
    torch.cuda.empty_cache()
    return epoch_loss, iou, recall, precision, f1, auc, dice

def validate(model, dataloader, criterion, device, writer, epoch, dataset_name, treshold):
    model.eval()
    running_loss = 0.0

    all_y_true = []
    all_y_pred = []
    all_y_prob = []

    for images, masks, _ in tqdm(dataloader, desc="Validation", leave=False):
        images = images.to(device)
        masks = masks.to(device)

        with torch.no_grad():
            outputs = model(images)
            loss = criterion(outputs, masks)
        probs = torch.sigmoid(outputs).detach()
        running_loss += loss.item() * images.size(0)

        preds = (outputs > treshold).float()
        all_y_true.append(masks.cpu().numpy())
        all_y_pred.append(preds.cpu().numpy())
        all_y_prob.append(probs.cpu().numpy()) 

    epoch_loss = running_loss / len(dataloader.dataset)
    all_y_true = np.concatenate(all_y_true, axis=0)
    all_y_pred = np.concatenate(all_y_pred, axis=0)
    all_y_prob = np.concatenate(all_y_prob, axis=0)


    iou, recall, precision, f1, auc, dice = metrics(all_y_true, all_y_pred, all_y_prob)

    writer.add_scalar(f"Loss/Val/{dataset_name}", epoch_loss, epoch)
    writer.add_scalar(f"Metrics/Val_IoU/{dataset_name}", iou, epoch)
    writer.add_scalar(f"Metrics/Val_Recall/{dataset_name}", recall, epoch)
    writer.add_scalar(f"Metrics/Val_Precision/{dataset_name}", precision, epoch)
    writer.add_scalar(f"Metrics/Val_F1/{dataset_name}", f1, epoch)
    writer.add_scalar(f"Metrics/Val_AUC/{dataset_name}", auc, epoch)
    writer.add_scalar(f"Metrics/Val_Train_DICE/{dataset_name}", dice, epoch)

    return epoch_loss, iou, recall, precision, f1, auc, dice

def test_model(model, dataloader, criterion, device, save_folder=".", treshold = 0.502):
    test_logger.info("Начало тестирования...")
    model.eval()
    running_loss = 0.0

    all_y_true = []
    all_y_pred = []
    all_y_prob = []
    if not os.path.exists(save_folder):
        os.makedirs(save_folder)

    with torch.no_grad():
        for images, masks, filenames in tqdm(dataloader, desc="Testing"):
            images = images.to(device)
            masks = masks.to(device)

            outputs = model(images)
            loss = criterion(outputs, masks)
            running_loss += loss.item() * images.size(0)

            preds = (outputs > treshold).float()
            probs = torch.sigmoid(outputs).detach()

            all_y_true.append(masks.cpu().numpy())
            all_y_pred.append(preds.cpu().numpy())
            all_y_prob.append(probs.cpu().numpy())

            preds_np = preds.cpu().numpy()  
            for i in range(preds_np.shape[0]):
                mask_pred = preds_np[i, 0]
                mask_img = (mask_pred * 255).astype(np.uint8)
                save_path = os.path.join(save_folder, filenames[i])
                cv2.imwrite(save_path, mask_img)

    test_loss = running_loss / len(dataloader.dataset)
    all_y_true = np.concatenate(all_y_true, axis=0)
    all_y_pred = np.concatenate(all_y_pred, axis=0)
    all_y_prob = np.concatenate(all_y_prob, axis=0)
                                 
    iou, recall, precision, f1, auc, dice = metrics(all_y_true, all_y_pred, all_y_prob)

    print(f"Test Loss: {test_loss:.4f}")
    print(f"IoU: {iou:.4f} | Recall: {recall:.4f} | Precision: {precision:.4f} | F1 Score: {f1:.4f} | AUC Score: {auc:.4f} | DICE: {dice:.4f}")
    
    test_logger.info(f"Test Loss: {test_loss:.4f}")
    test_logger.info(f"IoU: {iou:.4f} | Recall: {recall:.4f} | Precision: {precision:.4f} | F1 Score: {f1:.4f} | AUC Score: {auc:.4f} | DICE: {dice:.4f}")

    return test_loss, iou, recall, precision, f1, auc, dice


In [8]:
dataset = {"name": "40_60", 
     "train_image_dir": 'C:/Users/Dara/Desktop/Final_thesis/data_choosing/Images/train/40_60',
     "train_mask_dir": "C:/Users/Dara/Desktop/Final_thesis/data_choosing/Masks/train/40_60",
     "val_image_dir": "C:/Users/Dara/Desktop/Final_thesis/data_choosing/Images/val",
     "val_mask_dir": "C:/Users/Dara/Desktop/Final_thesis/data_choosing/Masks/val",
     "test_image_dir": "C:/Users/Dara/Desktop/Final_thesis/data_choosing/Images/test",
     "test_mask_dir": "C:/Users/Dara/Desktop/Final_thesis//data_choosing/Masks/test"}


     


In [9]:


results = []
encoder_name = "resnet34"  


dataset_name = dataset["name"]
print(f"Training on dataset: {dataset_name}")

train_dataset = PolypDataset(dataset["train_image_dir"], dataset["train_mask_dir"], transform=get_train_augmentations())
val_dataset = PolypDataset(dataset["val_image_dir"], dataset["val_mask_dir"])
test_dataset = PolypDataset(dataset["test_image_dir"], dataset["test_mask_dir"])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

model = smp.UnetPlusPlus(
    encoder_name=encoder_name,
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,
    activation="sigmoid"
)
model = model.to(device)

criterion = DiceBCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

log_dir = f"logs/{dataset_name}_{encoder_name}"
writer = SummaryWriter(log_dir=log_dir)

patience = 10
best_val_dice = 0.0  
best_epoch = -1
epochs_without_improvement = 0

for epoch in range(num_epochs):
    print(f"Эпоха {epoch+1}/{num_epochs} для датасета {dataset_name}")
    
    train_loss, train_iou, train_recall, train_precision, train_f1,train_auc, train_dice = train_one_epoch(
        model, train_loader, criterion, optimizer, device, writer, epoch, dataset_name,treshold = 0.502)
    print(f"Train Loss: {train_loss:.4f} | IoU: {train_iou:.4f} | Recall: {train_recall:.4f} | Precision: {train_precision:.4f} | F1: {train_f1:.4f}| AUC Score: {train_auc:.4f} | DICE: {train_dice:.4f}")
    
    val_loss, val_iou, val_recall, val_precision, val_f1, val_auc, val_dice= validate(
        model, val_loader, criterion, device, writer, epoch, dataset_name,treshold = 0.502)
    print(f"Val Loss: {val_loss:.4f} | IoU: {val_iou:.4f} | Recall: {val_recall:.4f} | Precision: {val_precision:.4f} | F1: {val_f1:.4f} | AUC Score: {val_auc:.4f} | DICE: {val_dice:.4f}")

    if val_dice > best_val_dice:
        best_val_dice = val_dice
        best_epoch = epoch
        torch.save(model.state_dict(), f"unetplusplus_{dataset_name}_best_model.pth")
        print("Новая лучшая модель сохранена.")
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1
        if epochs_without_improvement >= patience:
            print(f"Ранняя остановка на эпохе {epoch+1}")
            break

print(f"Обучение завершено для датасета {dataset_name}. Лучшая модель на эпохе {best_epoch+1} с  DICE: {best_val_dice:.4f}")

model.load_state_dict(torch.load(f"unetplusplus_{dataset_name}_best_model.pth"))
test_loss, test_iou, test_recall, test_precision, test_f1, test_auc, test_dice = test_model(
    model, test_loader, criterion, device, save_folder=f"predicted_maskss_{dataset_name}", treshold = 0.502)

results.append({
    "Dataset": dataset_name,
    "Best Epoch": best_epoch + 1,
    "Test IoU": test_iou,
    "Test Recall": test_recall,
    "Test Precision": test_precision,
    "Test F1": test_f1,
    "Test Loss": test_loss,
    "Test AUC": test_auc,
    "Test DICE": test_dice

})

writer.close()

results_df = pd.DataFrame(results)
print(results_df)

Training on dataset: 40_60


  A.CoarseDropout(


Эпоха 1/20 для датасета 40_60


Training:   0%|          | 0/831 [00:00<?, ?it/s]