In [1]:
import torch
print("PyTorch Version:", torch.__version__)
print("CUDA Available:", torch.cuda.is_available())
print("Number of GPUs:", torch.cuda.device_count())
if torch.cuda.is_available():
    print("GPU Name:", torch.cuda.get_device_name(0))
print("Done")

PyTorch Version: 2.6.0+cu118
CUDA Available: True
Number of GPUs: 1
GPU Name: NVIDIA GeForce RTX 4090
Done


In [15]:
import os
import glob
import shutil

# Define paths
input_dir = "E:/Code/ClassPlusSeg/ISIC2018_Task1-2_Validation_Input_Cleaned"
output_dir = "E:/Code/ClassPlusSeg/Final_Dataset/Isic_2018_validation_Images_Cleaned_v1"

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Get all files in the input directory
file_paths = sorted(glob.glob(os.path.join(input_dir, "*.jpg")) + 
                    glob.glob(os.path.join(input_dir, "*.png")))

for file_path in file_paths:
    # Extract the filename from the full path
    file_name = os.path.basename(file_path)
    
    # Check if the filename contains "cleaned_"
    if "cleaned_" in file_name:
        # Remove "cleaned_" from the filename
        new_file_name = file_name.replace("cleaned_", "")
        
        # Define the new file path in the output directory
        new_file_path = os.path.join(output_dir, new_file_name)
        
        # Copy the file to the new location with the updated name
        shutil.copy(file_path, new_file_path)
        print(f"Processed: {file_name} -> {new_file_name}")

print(f"All files processed and saved to {output_dir}")

Processed: cleaned_ISIC_0012255.jpg -> ISIC_0012255.jpg
Processed: cleaned_ISIC_0012346.jpg -> ISIC_0012346.jpg
Processed: cleaned_ISIC_0012576.jpg -> ISIC_0012576.jpg
Processed: cleaned_ISIC_0012585.jpg -> ISIC_0012585.jpg
Processed: cleaned_ISIC_0012623.jpg -> ISIC_0012623.jpg
Processed: cleaned_ISIC_0012627.jpg -> ISIC_0012627.jpg
Processed: cleaned_ISIC_0012633.jpg -> ISIC_0012633.jpg
Processed: cleaned_ISIC_0012643.jpg -> ISIC_0012643.jpg
Processed: cleaned_ISIC_0015294.jpg -> ISIC_0015294.jpg
Processed: cleaned_ISIC_0015351.jpg -> ISIC_0015351.jpg
Processed: cleaned_ISIC_0015370.jpg -> ISIC_0015370.jpg
Processed: cleaned_ISIC_0015462.jpg -> ISIC_0015462.jpg
Processed: cleaned_ISIC_0015480.jpg -> ISIC_0015480.jpg
Processed: cleaned_ISIC_0015492.jpg -> ISIC_0015492.jpg
Processed: cleaned_ISIC_0015518.jpg -> ISIC_0015518.jpg
Processed: cleaned_ISIC_0015552.jpg -> ISIC_0015552.jpg
Processed: cleaned_ISIC_0015590.jpg -> ISIC_0015590.jpg
Processed: cleaned_ISIC_0015634.jpg -> ISIC_0015

In [3]:
import os
import sys
import numpy as np
import cv2
from glob import glob
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import pandas as pd
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score

# Data directories (only train and validation)
train_image_dir = "E:/Code/ClassPlusSeg/ISIC2018_Task1-2_Training_Input_Cleaned"
train_mask_dir = "E:/Code/ClassPlusSeg/ISIC2018_Task1_Training_GroundTruth"
val_image_dir = "E:/Code/ClassPlusSeg/ISIC2018_Task1-2_Validation_Input_Cleaned"
val_mask_dir = "E:/Code/ClassPlusSeg/ISIC2018_Task1_Validation_GroundTruth"

# Global parameters
H = 256
W = 256
batch_size = 16
lr = 1e-4
num_epochs = 100
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# SE Block Definition
class SEBlock(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

# BatchNormReLU
class BatchNormReLU(nn.Module):
    def __init__(self, num_features):
        super(BatchNormReLU, self).__init__()
        self.bn = nn.BatchNorm2d(num_features)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.bn(x)
        x = self.relu(x)
        return x

# Residual Block with SE
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, strides=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=strides, padding=1, bias=False)
        self.bn_relu1 = BatchNormReLU(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn_relu2 = BatchNormReLU(out_channels)
        self.se = SEBlock(out_channels)

        self.shortcut = nn.Sequential()
        if strides != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=strides, padding=0, bias=False)
            )

    def forward(self, x):
        identity = self.shortcut(x)
        x = self.conv1(x)
        x = self.bn_relu1(x)
        x = self.conv2(x)
        x = self.bn_relu2(x)
        x = self.se(x)
        x = x + identity
        return x

# Decoder Block with SE
class DecoderBlock(nn.Module):
    def __init__(self, in_channels, skip_channels, out_channels):
        super(DecoderBlock, self).__init__()
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.concat_channels = in_channels + skip_channels
        self.res_block = ResidualBlock(self.concat_channels, out_channels, strides=1)
        self.se = SEBlock(out_channels)

    def forward(self, x, skip):
        x = self.upsample(x)
        x = torch.cat([x, skip], dim=1)
        x = self.res_block(x)
        x = self.se(x)
        return x

# Dense PPM Bridge
class DensePPMBridge(nn.Module):
    def __init__(self, in_channels=256, out_channels=512):
        super(DensePPMBridge, self).__init__()
        growth_rate = 128

        self.dense1 = nn.Sequential(
            nn.Conv2d(in_channels, growth_rate, kernel_size=3, stride=2, padding=1, bias=False),
            BatchNormReLU(growth_rate)
        )
        self.dense2 = nn.Sequential(
            nn.Conv2d(in_channels + growth_rate, growth_rate, kernel_size=3, stride=1, padding=1, bias=False),
            BatchNormReLU(growth_rate)
        )

        self.ppm_pool1 = nn.Sequential(nn.AdaptiveAvgPool2d(1), nn.Conv2d(in_channels + 2 * growth_rate, 128, 1, bias=False), BatchNormReLU(128))
        self.ppm_pool2 = nn.Sequential(nn.AdaptiveAvgPool2d(2), nn.Conv2d(in_channels + 2 * growth_rate, 128, 1, bias=False), BatchNormReLU(128))
        self.ppm_pool3 = nn.Sequential(nn.AdaptiveAvgPool2d(3), nn.Conv2d(in_channels + 2 * growth_rate, 128, 1, bias=False), BatchNormReLU(128))
        self.ppm_pool6 = nn.Sequential(nn.AdaptiveAvgPool2d(6), nn.Conv2d(in_channels + 2 * growth_rate, 128, 1, bias=False), BatchNormReLU(128))
        
        self.final_conv = nn.Conv2d(in_channels + 2 * growth_rate + 4 * 128, out_channels, kernel_size=1, bias=False)

    def forward(self, x):
        d1 = self.dense1(x)
        d1_cat = torch.cat([F.interpolate(x, scale_factor=0.5, mode='bilinear', align_corners=True), d1], dim=1)
        d2 = self.dense2(d1_cat)
        dense_out = torch.cat([F.interpolate(x, scale_factor=0.5, mode='bilinear', align_corners=True), d1, d2], dim=1)

        ppm1 = self.ppm_pool1(dense_out)
        ppm1 = F.interpolate(ppm1, size=dense_out.size()[2:], mode='bilinear', align_corners=True)
        ppm2 = self.ppm_pool2(dense_out)
        ppm2 = F.interpolate(ppm2, size=dense_out.size()[2:], mode='bilinear', align_corners=True)
        ppm3 = self.ppm_pool3(dense_out)
        ppm3 = F.interpolate(ppm3, size=dense_out.size()[2:], mode='bilinear', align_corners=True)
        ppm6 = self.ppm_pool6(dense_out)
        ppm6 = F.interpolate(ppm6, size=dense_out.size()[2:], mode='bilinear', align_corners=True)

        ppm_out = torch.cat([dense_out, ppm1, ppm2, ppm3, ppm6], dim=1)
        out = self.final_conv(ppm_out)
        return out

# SE-ResUNet Model
class SEResUNet(nn.Module):
    def __init__(self, input_shape=(256, 256, 3)):
        super(SEResUNet, self).__init__()
        self.input_shape = input_shape

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn_relu1 = BatchNormReLU(64)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.se1 = SEBlock(64)
        self.shortcut1 = nn.Conv2d(3, 64, kernel_size=1, stride=1, padding=0, bias=False)

        self.encoder2 = ResidualBlock(64, 128, strides=2)
        self.encoder3 = ResidualBlock(128, 256, strides=2)
        self.bridge = DensePPMBridge(256, 512)

        self.decoder1 = DecoderBlock(512, 256, 256)
        self.decoder2 = DecoderBlock(256, 128, 128)
        self.decoder3 = DecoderBlock(128, 64, 64)

        self.output = nn.Conv2d(64, 1, kernel_size=1, stride=1, padding=0)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        s1 = self.conv1(x)
        s1 = self.bn_relu1(s1)
        s1 = self.conv2(s1)
        s1 = self.se1(s1)
        shortcut = self.shortcut1(x)
        s1 = s1 + shortcut

        s2 = self.encoder2(s1)
        s3 = self.encoder3(s2)
        b = self.bridge(s3)

        d1 = self.decoder1(b, s3)
        d2 = self.decoder2(d1, s2)
        d3 = self.decoder3(d2, s1)

        out = self.output(d3)
        out = self.sigmoid(out)
        return out

# Metrics and Utility Functions
class DiceLoss(nn.Module):
    def __init__(self, smooth=1e-15):
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, y_pred, y_true):
        y_pred = y_pred.view(-1)
        y_true = y_true.view(-1)
        intersection = (y_pred * y_true).sum()
        dice = (2. * intersection + self.smooth) / (y_pred.sum() + y_true.sum() + self.smooth)
        return 1.0 - dice

def dice_coef(y_pred, y_true, smooth=1e-15):
    y_pred = y_pred.view(-1)
    y_true = y_true.view(-1)
    intersection = (y_pred * y_true).sum()
    return (2. * intersection + smooth) / (y_pred.sum() + y_true.sum() + smooth)

def iou(y_pred, y_true, smooth=1e-15):
    y_pred = y_pred.view(-1)
    y_true = y_true.view(-1)
    intersection = (y_pred * y_true).sum()
    union = y_true.sum() + y_pred.sum() - intersection
    return (intersection + smooth) / (union + smooth)

def create_dir(path):
    try:
        if not os.path.exists(path):
            os.makedirs(path)
            print(f"Created directory: {path}")
        else:
            print(f"Directory already exists: {path}")
    except OSError as e:
        print(f"Error creating directory {path}: {e}")
        raise

def load_data():
    train_x = sorted(glob(os.path.join(train_image_dir, "*.jpg")))
    train_y = sorted(glob(os.path.join(train_mask_dir, "*.png")))
    valid_x = sorted(glob(os.path.join(val_image_dir, "*.jpg")))
    valid_y = sorted(glob(os.path.join(val_mask_dir, "*.png")))
    return (train_x, train_y), (valid_x, valid_y)

class ISICDataset(Dataset):
    def __init__(self, images, masks):
        self.images = images
        self.masks = masks

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        img = cv2.imread(img_path, cv2.IMREAD_COLOR)
        img = cv2.resize(img, (W, H))
        img = img / 255.0
        img = img.astype(np.float32)
        img = np.transpose(img, (2, 0, 1))

        mask_path = self.masks[idx]
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (W, H))
        mask = mask / 255.0
        mask = mask.astype(np.float32)
        mask = np.expand_dims(mask, axis=0)
        return torch.tensor(img, dtype=torch.float32), torch.tensor(mask, dtype=torch.float32)

# Main Execution
np.random.seed(42)
torch.manual_seed(42)

create_dir("Segmentation_Results")

(train_x, train_y), (valid_x, valid_y) = load_data()

print(f"Train: {len(train_x)} - {len(train_y)}")
print(f"Valid: {len(valid_x)} - {len(valid_y)}")

train_dataset = ISICDataset(train_x, train_y)
valid_dataset = ISICDataset(valid_x, valid_y)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

# Model, loss, optimizer
model = SEResUNet(input_shape=(H, W, 3)).to(device)
criterion = DiceLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# Training loop
best_val_loss = float('inf')
model_path = os.path.join("files", "segModel.pth")

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]", leave=False)
    for images, masks in train_bar:
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * images.size(0)
        train_bar.set_postfix({"Train Loss": loss.item()})
    train_loss /= len(train_loader.dataset)

    model.eval()
    val_loss = 0.0
    val_dice = 0.0
    val_iou = 0.0
    val_bar = tqdm(valid_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]", leave=False)
    with torch.no_grad():
        for images, masks in val_bar:
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            loss = criterion(outputs, masks)
            val_loss += loss.item() * images.size(0)
            outputs = (outputs > 0.5).float()
            val_dice += dice_coef(outputs, masks).item() * images.size(0)
            val_iou += iou(outputs, masks).item() * images.size(0)
            val_bar.set_postfix({"Val Loss": loss.item()})
    
    val_loss /= len(valid_loader.dataset)
    val_dice /= len(valid_loader.dataset)
    val_iou /= len(valid_loader.dataset)

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Dice: {val_dice:.4f}, Val IoU: {val_iou:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        create_dir("files")
        torch.save(model.state_dict(), model_path)
        print(f"Saved best model at epoch {epoch+1}")

Created directory: Segmentation_Results
Train: 2594 - 2594
Valid: 100 - 100


                                                                                                                       

KeyboardInterrupt: 