In [None]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, feature_map):
        return self.conv(feature_map)

class UNET(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512],):
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Down part of UNET
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        # Up part of UNET
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(
                    feature*2, feature, kernel_size=2, stride=2,
                )
            )
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)


    def forward(self, feature_map):
        skip_connections = []

        for down in self.downs:
            feature_map = down(feature_map)
            skip_connections.append(feature_map)
            feature_map = self.pool(feature_map)

        feature_map = self.bottleneck(feature_map)
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            feature_map = self.ups[idx](feature_map)
            skip_connection = skip_connections[idx//2]

            if feature_map.shape != skip_connection.shape:
                feature_map = TF.resize(feature_map, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, feature_map), dim=1)
            feature_map = self.ups[idx+1](concat_skip)

        return self.final_conv(feature_map)

In [None]:
import os
from PIL import Image
from torch.utils.data import Dataset
import numpy as np

class GlaucomaDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index])
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"), dtype=np.float32)
        mask[mask == 255.0] = 1.0
        
        if self.transform is not None:
            augmentations = self.transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        return image, mask

In [None]:
import torch
import torchvision
from torch.utils.data import DataLoader

def save_checkpoint(state, filename="disk_checkpoint.pth.tar"):
    torch.save(state, filename)

def load_checkpoint(checkpoint, model):
    model.load_state_dict(checkpoint["state_dict"])

def get_loaders(
    train_dir,
    train_maskdir,
    val_dir,
    val_maskdir,
    batch_size,
    train_transform,
    val_transform,
    num_workers=4,
    pin_memory=True,
):
    train_ds = GlaucomaDataset(
        image_dir=train_dir,
        mask_dir=train_maskdir,
        transform=train_transform,
    )

    train_loader = DataLoader(
        train_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True,
    )

    val_ds = GlaucomaDataset(
        image_dir=val_dir,
        mask_dir=val_maskdir,
        transform=val_transform,
    )

    val_loader = DataLoader(
        val_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False,
    )

    return train_loader, val_loader

def check_accuracy(loader, model, device="cpu"):
    num_correct = 0
    num_pixels = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device).unsqueeze(1)
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
            num_correct += (preds == y).sum()
            num_pixels += torch.numel(preds)
            

    print(
        f"Accuracy: {num_correct/num_pixels*100:.2f}"
    )
    model.train()

def save_predictions_as_imgs(loader, model, folder="output_images/", device="cpu"):
    model.eval()
    for idx, (x, y) in enumerate(loader):
        x = x.to(device=device)
        with torch.no_grad():
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        torchvision.utils.save_image(
            preds, f"{folder}/pred_{idx}.png"
        )
        torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{idx}.png")

    model.train()

In [None]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim


l_rate = 1e-4
currdevice = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16
epochs = 15
workers = 2
img_height = 160  # 1376 originally
img_width = 240  # 1371 originally
PIN_MEMORY = True
LOAD_MODEL = False
image_train_path = "/content/drive/MyDrive/Colab Notebooks/code/trainingimages/"
image_train_mask = "/content/drive/MyDrive/Colab Notebooks/code/training_ground_truth/"
image_validate_path = "/content/drive/MyDrive/Colab Notebooks/code/testingimages/"
image_validate_mask = "/content/drive/MyDrive/Colab Notebooks/code/testing_ground_truth/"

def train_fn(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(device=currdevice)
        targets = targets.float().unsqueeze(1).to(device=currdevice)

        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)

        # backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update tqdm loop
        loop.set_postfix(loss=loss.item())



train_transform = A.Compose(
    [
        A.Resize(height=img_height, width=img_width),
        A.Rotate(limit=35, p=1.0),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.1),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0,
        ),
        ToTensorV2(),
    ],
)

val_transforms = A.Compose(
    [
        A.Resize(height=img_height, width=img_width),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0,
        ),
        ToTensorV2(),
    ],
)

model = UNET(in_channels=3, out_channels=1).to(currdevice)
loss_fn = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=l_rate)

train_loader, val_loader = get_loaders(
    image_train_path,
    image_train_mask,
    image_validate_path,
    image_validate_mask,
    batch_size,
    train_transform,
    val_transforms,
    workers,
    PIN_MEMORY,
)

if LOAD_MODEL:
    load_checkpoint(torch.load("disk_checkpoint.pth.tar"), model)


check_accuracy(val_loader, model, device=currdevice)
scaler = torch.cuda.amp.GradScaler()

for epoch in range(epochs):
    train_fn(train_loader, model, optimizer, loss_fn, scaler)

    # save model
    checkpoint = {
        "state_dict": model.state_dict(),
        "optimizer":optimizer.state_dict(),
    }
    save_checkpoint(checkpoint)

    # check accuracy
    check_accuracy(val_loader, model, device=currdevice)

    # print some examples to a folder
    save_predictions_as_imgs(
        val_loader, model, folder="/content/drive/MyDrive/Colab Notebooks/code/output_images/", device=currdevice
    )


In [None]:
import torch
from PIL import Image
import numpy as np
import torchvision.transforms as transforms

# Load the trained model
checkpoint = torch.load("disk_checkpoint.pth.tar", map_location=torch.device('cpu'))
model = UNET(in_channels=3, out_channels=1)
model.load_state_dict(checkpoint['state_dict'])
model.eval()

# Preprocess the input image
transform = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.ToTensor(),
    # Add normalization if needed
])
input_image = Image.open("testingimages/96.png")
input_tensor = transform(input_image).unsqueeze(0)

# Perform inference
with torch.no_grad():
    output = model(input_tensor)

# Convert the output to a numpy array
output = torch.sigmoid(output).squeeze().cpu().numpy()  # Applying sigmoid to convert logits to probabilities
predicted_mask = (output > 0.5).astype(np.uint8)  # Applying thresholding to obtain binary mask

# Load and resize the ground truth mask
ground_truth_mask = Image.open("testing_ground_truths/96.png")
ground_truth_mask = ground_truth_mask.resize((predicted_mask.shape[1], predicted_mask.shape[0]), Image.NEAREST)
ground_truth_mask = np.array(ground_truth_mask)
ground_truth_mask = (ground_truth_mask > 0).astype(np.uint8)  # Convert to binary mask

# Calculate metrics
true_positive = (predicted_mask * ground_truth_mask).sum().item()
false_positive = (predicted_mask.sum() - true_positive).item()
precision = true_positive / (true_positive + false_positive + 1e-8)

false_negative = (ground_truth_mask.sum() - true_positive).item()
recall = true_positive / (true_positive + false_negative + 1e-8)

true_negative = ((1 - predicted_mask) * (1 - ground_truth_mask)).sum().item()


intersection = (predicted_mask * ground_truth_mask).sum().item()
union = (predicted_mask.sum() + ground_truth_mask.sum()).item() - intersection
iou = intersection / (union + 1e-8)

# Save the predicted mask image
predicted_mask_image = Image.fromarray(predicted_mask * 255)
predicted_mask_image.save("output.png")

# Print metrics
print(f"Accuracy: {(true_positive + true_negative) / (true_positive + true_negative + false_positive + false_negative)}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"IoU (Intersection over Union): {iou}")
