In [None]:
from PIL import Image
import os

In [None]:
import numpy as np
from glob import glob
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchmetrics import Precision, Recall
from pytorch_unet import UNet
import cv2

In [None]:
# Check if CUDA is available
cuda_available = torch.cuda.is_available()

if cuda_available:
    # Get the name of the GPU
    gpu_name = torch.cuda.get_device_name(0)  # Assumes you have at least one GPU
else:
    gpu_name = "CUDA not available. Running on CPU."

# Print the results
print(f"CUDA Available: {cuda_available}")
print(f"GPU Name: {gpu_name}")


In [None]:
tiff_directory = 'D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_training\\kmms_training\\images'
png_directory = 'D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_training\\kmms_training\\images_png'

In [None]:
tiff_directory_2 = 'D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_test\\kmms_test\\images'
png_directory_2 = 'D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_test\\kmms_test\\images'

In [None]:
os.makedirs(png_directory_2, exist_ok=True)
os.makedirs(png_directory, exist_ok=True)

In [None]:
h = 256
w = 256

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, masks, transform=None):
        self.images = images
        self.masks = masks
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = cv2.imread(self.images[idx], cv2.IMREAD_COLOR)
        mask = cv2.imread(self.masks[idx], cv2.IMREAD_GRAYSCALE)

        image = cv2.resize(image, (w, h))
        mask = cv2.resize(mask, (w, h))

        image = image / 255.0
        mask = mask / 255.0
        mask = np.expand_dims(mask, axis=-1)

        if self.transform:
            image = self.transform(image)

        return image, mask

In [None]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
def load_data(train_path, test_path):
    train_images = sorted(glob(os.path.join(train_path, "images", "*.png")))
    train_masks = sorted(glob(os.path.join(train_path, "masks", "*.png")))

    test_images = sorted(glob(os.path.join(test_path, "images", "*.png")))
    test_masks = sorted(glob(os.path.join(test_path, "masks", "*.png")))

    return (train_images, train_masks), (test_images, test_masks)

In [None]:
def train(model, train_loader, valid_loader, criterion, optimizer, num_epochs, model_path):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        total_val_loss = 0.0
        for images, masks in train_loader:
            images, masks = images.to(device), masks.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.squeeze(1)
            masks = masks.squeeze(3)
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        average_loss = total_loss / len(train_loader)

        model.eval()
        with torch.no_grad():
            for images, masks in valid_loader:
                images, masks = images.to(device), masks.to(device)

                outputs = model(images)
                outputs = outputs.squeeze(1)
                masks = masks.squeeze(3)
                val_loss = criterion(outputs, masks)
                total_val_loss += val_loss.item()

            average_val_loss = total_val_loss / len(valid_loader)

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {average_loss:.6f}, Val Loss: {average_val_loss:.3f}")

    torch.save(model.state_dict(), model_path)

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu2 = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        return x

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(EncoderBlock, self).__init__()
        self.conv_block = ConvBlock(in_channels, out_channels)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        skip = self.conv_block(x)
        pooled = self.pool(skip)
        return skip, pooled

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, skip_channels, out_channels):
        super(DecoderBlock, self).__init__()
        self.upconv = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
        self.conv_block = ConvBlock(in_channels, out_channels)
        self.skip_channels = skip_channels

    def forward(self, x, skip):
        x = self.upconv(x)
        x = torch.cat([x, skip], dim=1)
        x = self.conv_block(x)
        return x

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()

        self.encoder1 = EncoderBlock(in_channels, 64)
        self.encoder2 = EncoderBlock(64, 128)
        self.encoder3 = EncoderBlock(128, 256)
        self.encoder4 = EncoderBlock(256, 512)

        self.bottleneck = ConvBlock(512, 1024)

        self.decoder1 = DecoderBlock(1024, 512, 512)
        self.decoder2 = DecoderBlock(512, 256, 256)
        self.decoder3 = DecoderBlock(256, 128, 128)
        self.decoder4 = DecoderBlock(128, 64, 64)

        self.output_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        skip1, x = self.encoder1(x)
        skip2, x = self.encoder2(x)
        skip3, x = self.encoder3(x)
        skip4, x = self.encoder4(x)

        x = self.bottleneck(x)

        x = self.decoder1(x, skip4)
        x = self.decoder2(x, skip3)
        x = self.decoder3(x, skip2)
        x = self.decoder4(x, skip1)

        x = self.output_conv(x)
        # print("shape of x from model:",x.shape)
        return x

In [None]:
def load_data(images_folder, masks_folder):
    train_images = sorted(glob(os.path.join(images_folder, "*.png")))
    train_masks = sorted(glob(os.path.join(masks_folder, "*.png")))

    return train_images, train_masks

In [None]:
if __name__ == "__main__":
    np.random.seed(42)
    torch.manual_seed(42)

    # create_dir("files")

    batch_size = 16
    learning_rate = 1e-4
    num_epochs = 50
    model_path = "D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\model.pth"

    train_img_path = "D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_training\\kmms_training\\images_png"
    test_img_path = "D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_test\\kmms_test\\images"
    train_mask_path = "D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_training\\kmms_training\\masks"
    test_mask_path = "D:\\Code\\PyTorch_Segmentation\\ImageSegmentation\\kmms_test\\kmms_test\\masks"

    train_images, train_masks = load_data(train_img_path, train_mask_path)
    test_images, test_masks = load_data(test_img_path, test_mask_path)
    #train_images, train_masks = shuffle(train_images, train_masks)

    # print(f"Train: {len(train_images)} - {len(train_masks)}")
    # print(f"Test: {len(test_images)} - {len(test_masks)}")

    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.float())])


    train_dataset = CustomDataset(train_images, train_masks, transform=transform)
    test_dataset = CustomDataset(test_images, test_masks, transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    input_size = (h,w,3)
    model = UNet(in_channels=input_size[2], out_channels=1)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    train(model, train_loader, test_loader, criterion, optimizer, num_epochs, model_path)

In [None]:
def tensor_to_numpy(tensor):
    """Convert a PyTorch tensor to a NumPy array."""
    return tensor.cpu().detach().numpy()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import jaccard_score

# IOU score list -> it will get updated after each comparision
iou_scores = []

# Assuming threshold value of 0.5, adjust as needed
threshold = 0.1
# Iterate over the test loader and make predictions
for images, masks in test_loader:
    with torch.no_grad():
        # Forward pass
        predictions = model(images)
    predictions = predictions.squeeze(1)
    masks = masks.squeeze(3)
    images = images.squeeze(1)
    # Convert PyTorch tensor to NumPy array for plotting
    images_np = [np.array(tensor_to_numpy(img)) for img in images]
    predictions_np = [np.array(tensor_to_numpy(pred)) for pred in predictions]
    masks_np = [np.array(tensor_to_numpy(mask)) for mask in masks]



    # Plot the original images, segmented results, and ground truth masks
    for i in range(len(images)):
        plt.figure(figsize=(12, 4))

        # Plot original image
        plt.subplot(1, 3, 1)
        plt.imshow(images_np[i].T)
        plt.title("Original Image")

        # Plot segmented result
        plt.subplot(1, 3, 2)
        plt.imshow(predictions_np[i], cmap='gray')
        plt.title("Segmented Result")

        # Plot ground truth mask
        plt.subplot(1, 3, 3)
        plt.imshow(masks_np[i], cmap='gray')
        plt.title("Ground Truth Mask")

        plt.show()  # Display the figure

        # Close the current figure to avoid the warning
        plt.close()

        for i in range(len(predictions)):
            # Convert tensors to numpy arrays
            pred_flat = (predictions[i] > threshold).view(-1).cpu().numpy()
            mask_flat = (masks[i] > threshold).view(-1).cpu().numpy()

            # Calculate IoU
            iou = jaccard_score(mask_flat, pred_flat)
            iou_scores.append(iou)

print(iou_scores)
# Average IoU over all samples
average_iou = np.mean(iou_scores)

print(f'Average IoU: {average_iou}')
