In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
image_dir = '/content/drive/MyDrive/Seg_UKAN/inputs/busi/busi/images'
mask_dir = '/content/drive/MyDrive/Seg_UKAN/inputs/busi/busi/masks/0'


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, jaccard_score

In [None]:
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()

        # Contracting path
        self.enc1 = self.conv_block(3, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)
        self.enc5 = self.conv_block(512, 1024)

        # Expansive path
        self.up6 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.dec6 = self.conv_block(1024, 512)  # Adjust concatenated input size

        self.up7 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec7 = self.conv_block(512, 256)  # Adjust concatenated input size

        self.up8 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec8 = self.conv_block(256, 128)  # Adjust concatenated input size

        self.up9 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec9 = self.conv_block(128, 64)   # Adjust concatenated input size

        self.final = nn.Conv2d(64, 1, kernel_size=1)  # Final output to binary mask

    def conv_block(self, in_channels, out_channels):
        block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU()
        )
        return block

    def forward(self, x):
        # Contracting path
        enc1 = self.enc1(x)
        enc2 = self.enc2(nn.functional.max_pool2d(enc1, 2))
        enc3 = self.enc3(nn.functional.max_pool2d(enc2, 2))
        enc4 = self.enc4(nn.functional.max_pool2d(enc3, 2))
        enc5 = self.enc5(nn.functional.max_pool2d(enc4, 2))

        # Expansive path
        up6 = self.up6(enc5)
        dec6 = self.dec6(torch.cat([up6, enc4], dim=1))

        up7 = self.up7(dec6)
        dec7 = self.dec7(torch.cat([up7, enc3], dim=1))

        up8 = self.up8(dec7)
        dec8 = self.dec8(torch.cat([up8, enc2], dim=1))

        up9 = self.up9(dec8)
        dec9 = self.dec9(torch.cat([up9, enc1], dim=1))

        # Final layer
        output = torch.sigmoid(self.final(dec9))
        return output

In [None]:
class SegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, img_size=(128, 128), transform=None):
        self.image_files = sorted(os.listdir(image_dir))
        self.mask_files = sorted(os.listdir(mask_dir))
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.img_size = img_size
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_files[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_files[idx])

        img = cv2.imread(img_path, cv2.IMREAD_COLOR)
        img = cv2.resize(img, self.img_size)
        img = img.astype(np.float32)  / 255.0

        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, self.img_size)
        mask = mask.astype(np.float32)  / 255.0
        mask = np.expand_dims(mask, axis=-1)

        if self.transform:
            img = self.transform(img)
            mask = self.transform(mask)

        return img, mask


In [None]:
transform = transforms.ToTensor()
dataset = SegmentationDataset(image_dir, mask_dir, transform=transform)

In [None]:
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

# Instantiate the model, optimizer, and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.BCELoss()

In [None]:
epochs = 100
for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for images, masks in train_loader:
        images = images.to(device)
        masks = masks.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, masks)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss / len(train_loader)}")
    model.eval()
    with torch.no_grad():
        val_loss = 0
        for images, masks in val_loader:
            images = images.to(device)
            masks = masks.to(device)

            outputs = model(images)
            loss = criterion(outputs, masks)
            val_loss += loss.item()

        print(f"Validation Loss: {val_loss / len(val_loader)}")

Epoch [1/100], Loss: 0.6984755396842957
Validation Loss: 0.6226633588473002
Epoch [2/100], Loss: 0.48517543645132155
Validation Loss: 0.4626832703749339
Epoch [3/100], Loss: 0.4248088626634507
Validation Loss: 0.45003903408845264
Epoch [4/100], Loss: 0.40970335120246526
Validation Loss: 0.4401915023724238
Epoch [5/100], Loss: 0.38032856015931993
Validation Loss: 0.37842748562494916
Epoch [6/100], Loss: 0.32141913828395663
Validation Loss: 0.33742887278397876
Epoch [7/100], Loss: 0.29856324124903905
Validation Loss: 0.36711379637320835
Epoch [8/100], Loss: 0.29768483695529757
Validation Loss: 0.31640640397866565
Epoch [9/100], Loss: 0.28119711719808127
Validation Loss: 0.3043758049607277
Epoch [10/100], Loss: 0.2626284325406665
Validation Loss: 0.2996428857247035
Epoch [11/100], Loss: 0.2620268130586261
Validation Loss: 0.2965271398425102
Epoch [12/100], Loss: 0.2572320493913832
Validation Loss: 0.31024697174628574
Epoch [13/100], Loss: 0.26075042145592825
Validation Loss: 0.32080110162

In [None]:
def calculate_continuous_metrics(y_true, y_pred, epsilon=1e-6):
    # Flatten the arrays for metric calculation
    y_true_flat = y_true.flatten()
    y_pred_flat = y_pred.flatten()

    # Intersection and Union for continuous IoU
    intersection = np.sum(np.minimum(y_true_flat, y_pred_flat))
    union = np.sum(np.maximum(y_true_flat, y_pred_flat))

    iou = (intersection + epsilon) / (union + epsilon)  # Add epsilon to avoid division by zero

    # Continuous F1 score (soft F1)
    true_positives = np.sum(y_true_flat * y_pred_flat)
    precision = true_positives / (np.sum(y_pred_flat) + epsilon)
    recall = true_positives / (np.sum(y_true_flat) + epsilon)
    f1 = 2 * (precision * recall) / (precision + recall + epsilon)

    return iou, f1


In [None]:
model.eval()
y_true = []
y_pred = []
with torch.no_grad():
    for images, masks in val_loader:
        images = images.to(device)
        masks = masks.to(device)

        outputs = model(images)
        outputs = outputs.cpu().numpy()
        masks = masks.cpu().numpy()

        y_true.append(masks)
        y_pred.append(outputs)

# Concatenate all predictions and true masks
y_true = np.concatenate(y_true, axis=0)
y_pred = np.concatenate(y_pred, axis=0)

# Apply a threshold to convert the continuous predictions to binary
y_pred_thresholded = (y_pred > 0.5).astype(np.float32)

# Now calculate IoU and F1 scores using the thresholded predictions
iou, f1 = calculate_continuous_metrics(y_true, y_pred_thresholded)
print(f"IoU: {iou:.4f}, F1 Score: {f1:.4f}")


IoU: 0.5891, F1 Score: 0.7414


In [None]:
output_dir='/content/drive/MyDrive/busi_UNET_output'
for idx in range(y_pred_thresholded.shape[0]):
    pred_mask = y_pred_thresholded[idx, 0]  # Extract the predicted mask (single channel)
    pred_mask = (pred_mask * 255).astype(np.uint8)  # Scale to 0-255

    # Save the mask image
    img_save_path = os.path.join(output_dir, f"segmented_{idx}.png")
    cv2.imwrite(img_save_path, pred_mask)

print(f"Segmented images saved in {output_dir}")

Segmented images saved in /content/drive/MyDrive/busi_UNET_output
