In [None]:
from google.colab import drive

drive.mount('/content/MyDrive')

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms


import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import copy
from tqdm.notebook import tqdm


from sklearn.metrics import precision_score, recall_score, f1_score


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

device

In [14]:
m = nn.LeakyReLU(0.1)

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)

    def forward(self, x):
        x = self.conv1(x)
        x = m(x)
        x = self.conv2(x)
        x = m(x)
        return x

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(EncoderBlock, self).__init__()
        self.conv_block = ConvBlock(in_channels, out_channels)
        self.pool = nn.MaxPool2d(kernel_size=2)

    def forward(self, x):
        x = self.conv_block(x)
        p = self.pool(x)
        return x, p

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DecoderBlock, self).__init__()
        self.upconv = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        self.conv_block = ConvBlock(out_channels * 2, out_channels)

    def forward(self, x, skip_features):
        x = self.upconv(x)
        x = torch.cat((x, skip_features), dim=1)
        x = self.conv_block(x)
        return x

class UNet(nn.Module):
    def __init__(self, input_channels, output_channels):
        super(UNet, self).__init__()
        self.encoder1 = EncoderBlock(input_channels, 16)
        self.encoder2 = EncoderBlock(16, 32)
        self.encoder3 = EncoderBlock(32, 64)
        self.encoder4 = EncoderBlock(64, 128)

        self.bridge = ConvBlock(128, 256)

        self.decoder1 = DecoderBlock(256, 128)
        self.decoder2 = DecoderBlock(128, 64)
        self.decoder3 = DecoderBlock(64, 32)
        self.decoder4 = DecoderBlock(32, 16)

        self.out_conv = nn.Conv2d(16, output_channels, kernel_size=1)

    def forward(self, x):
        s1, p1 = self.encoder1(x)
        s2, p2 = self.encoder2(p1)
        s3, p3 = self.encoder3(p2)
        s4, p4 = self.encoder4(p3)

        b = self.bridge(p4)

        d1 = self.decoder1(b, s4)
        d2 = self.decoder2(d1, s3)
        d3 = self.decoder3(d2, s2)
        d4 = self.decoder4(d3, s1)

        outputs = self.out_conv(d4)
        return outputs


In [None]:

def norm_im(image):
  return (image - torch.mean(image))/torch.std(image)



class Dataset(Dataset):
    def __init__(self, images_dir, golds_dir, transform=None):
        self.images_dir = images_dir
        self.golds_dir = golds_dir
        self.image_files = [f for f in os.listdir(images_dir) if f.endswith('.jpg')]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        image_path = os.path.join(self.images_dir, img_name)
        image = Image.open(image_path)

        unique_id = img_name.replace('CT_', '').replace('.jpg', '')
        gold_name = f"gold_{unique_id}.png"
        gold_path = os.path.join(self.golds_dir, gold_name)

        try:
            gold = Image.open(gold_path)
        except FileNotFoundError:
            raise FileNotFoundError(f"Ground truth image not found: {gold_path}")

        if self.transform:
            image = self.transform(image)
        image = norm_im(image)

        gold = transforms.ToTensor()(gold)

        gold = torch.where(gold == 0, torch.tensor(0.0), torch.tensor(1.0))
        return image, gold


transform = transforms.Compose([
    transforms.ToTensor()

])

BASE_PATH = ""

# Load datasets
train_dataset = Dataset(images_dir=os.path.join(BASE_PATH, 'images', 'tr'), golds_dir=os.path.join(BASE_PATH, 'golds'), transform=transform)
val_dataset = Dataset(images_dir=os.path.join(BASE_PATH, 'images', 'val'), golds_dir=os.path.join(BASE_PATH, 'golds'), transform=transform)
test_dataset = Dataset(images_dir=os.path.join(BASE_PATH, 'images', 'ts'), golds_dir=os.path.join(BASE_PATH, 'golds'), transform=transform)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)




In [None]:
for inputs, golds in train_loader:
    unique_values, counts = torch.unique(golds, return_counts=True)
    break

pos_weight = counts[0]/counts[1]

print(pos_weight)


In [None]:
def train_model(train_loader, val_loader, model, criterion, optimizer, scheduler, num_epochs, device):
    model.to(device)
    best_model_wts = copy.deepcopy(model.state_dict())
    best_f1 = 0.0

    for epoch in tqdm(range(num_epochs)):
        print(f"Epoch {epoch+1}/{num_epochs} started")

        # Training phase
        model.train()
        train_labels_all = []
        train_preds_all = []
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device).float()
            optimizer.zero_grad()

            # Forward pass
            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                preds = (outputs > 0.5).float()

            train_loss += loss.item() * inputs.size(0)
            train_labels_all.extend(labels.cpu().numpy().flatten())
            train_preds_all.extend(preds.cpu().numpy().flatten())

        train_loss /= len(train_loader.dataset)
        print(f"Training Loss: {train_loss:.4f}")


        train_labels_all = np.array(train_labels_all).astype(int)
        train_preds_all = np.array(train_preds_all).astype(int)

        # Metrics
        train_recall = recall_score(train_labels_all, train_preds_all, average='binary')
        train_precision = precision_score(train_labels_all, train_preds_all, average='binary')
        train_f1 = f1_score(train_labels_all, train_preds_all)
        print(f"Training Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, F1 Score: {train_f1:.4f}")

        # Validation
        model.eval()
        valid_labels_all = []
        valid_preds_all = []
        valid_loss = 0.0
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device).float()

            with torch.no_grad():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                preds = (outputs > 0.5).float()

            valid_loss += loss.item() * inputs.size(0)
            valid_labels_all.extend(labels.cpu().numpy().flatten())
            valid_preds_all.extend(preds.cpu().numpy().flatten())

        valid_loss /= len(val_loader.dataset)
        print(f"Validation Loss: {valid_loss:.4f}")

        valid_labels_all = np.array(valid_labels_all).astype(int)
        valid_preds_all = np.array(valid_preds_all).astype(int)

        # Metrics
        valid_precision = precision_score(valid_labels_all, valid_preds_all, average='binary')
        valid_recall = recall_score(valid_labels_all, valid_preds_all, average='binary')
        valid_f1 = f1_score(valid_labels_all, valid_preds_all)
        print(f"Validation Precision: {valid_precision:.4f}, Recall: {valid_recall:.4f}, F1 Score: {valid_f1:.4f}")

        # Checking for best model
        if valid_f1 > best_f1:
            best_f1 = valid_f1
            best_model_wts = copy.deepcopy(model.state_dict())

        # Step the scheduler
        scheduler.step()

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model


model = UNet(input_channels=1, output_channels=1)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
num_epochs = 25

criterion = nn.BCEWithLogitsLoss(pos_weight = pos_weight)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

best_model = train_model(train_loader, val_loader, model, criterion, optimizer, exp_lr_scheduler, num_epochs, device)


MODEL_SAVE_PATH = ''

torch.save(best_model.state_dict(), MODEL_SAVE_PATH)

print(f"Model parameters saved to {MODEL_SAVE_PATH}")



In [18]:
#unet with dropout

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout_p=0.5):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.dropout = nn.Dropout2d(p=dropout_p)

    def forward(self, x):
        x = self.conv1(x)
        x = m(x)
        x = self.conv2(x)
        x = m(x)
        return x

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout_p=0.5):
        super(EncoderBlock, self).__init__()
        self.conv_block = ConvBlock(in_channels, out_channels, dropout_p)
        self.pool = nn.MaxPool2d(kernel_size=2)

    def forward(self, x):
        x = self.conv_block(x)
        p = self.pool(x)
        return x, p

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout_p=0.5):
        super(DecoderBlock, self).__init__()
        self.upconv = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        self.conv_block = ConvBlock(out_channels * 2, out_channels, dropout_p)

    def forward(self, x, skip_features):
        x = self.upconv(x)
        x = torch.cat((x, skip_features), dim=1)
        x = self.conv_block(x)
        return x

class UNet(nn.Module):
    def __init__(self, input_channels, output_channels, dropout_p=0.5):
        super(UNet, self).__init__()
        self.encoder1 = EncoderBlock(input_channels, 16, dropout_p)
        self.encoder2 = EncoderBlock(16, 32, dropout_p)
        self.encoder3 = EncoderBlock(32, 64, dropout_p)
        self.encoder4 = EncoderBlock(64, 128, dropout_p)

        self.bridge = ConvBlock(128, 256, dropout_p)

        self.decoder1 = DecoderBlock(256, 128, dropout_p)
        self.decoder2 = DecoderBlock(128, 64, dropout_p)
        self.decoder3 = DecoderBlock(64, 32, dropout_p)
        self.decoder4 = DecoderBlock(32, 16, dropout_p)

        self.out_conv = nn.Conv2d(16, output_channels, kernel_size=1)

    def forward(self, x):
        s1, p1 = self.encoder1(x)
        s2, p2 = self.encoder2(p1)
        s3, p3 = self.encoder3(p2)
        s4, p4 = self.encoder4(p3)

        b = self.bridge(p4)

        d1 = self.decoder1(b, s4)
        d2 = self.decoder2(d1, s3)
        d3 = self.decoder3(d2, s2)
        d4 = self.decoder4(d3, s1)

        outputs = self.out_conv(d4)
        return outputs


In [19]:
def evaluate_model(loader, model, device):
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for inputs, labels in loader:
            inputs = inputs.to(device)
            labels = labels.to(device).float()
            outputs = model(inputs)
            preds = (outputs > 0.5).float()
            all_labels.extend(labels.cpu().numpy().flatten())
            all_preds.extend(preds.cpu().numpy().flatten())

    all_labels = np.array(all_labels).astype(int)
    all_preds = np.array(all_preds).astype(int)
    precision = precision_score(all_labels, all_preds, average='binary')
    recall = recall_score(all_labels, all_preds, average='binary')
    f1 = f1_score(all_labels, all_preds)
    return precision, recall, f1


In [20]:
test_precision, test_recall, test_f1 = evaluate_model(test_loader, best_model, device)

In [None]:
print(test_precision, test_recall, test_f1)

In [None]:
dropout_p_values = [0.3, 0.5, 0.7]

for p in dropout_p_values:
    print(f"Training with dropout p={p}")
    model = UNet(input_channels=1, output_channels=1, dropout_p=p).to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    trained_model = train_model(train_loader, val_loader, model, criterion, optimizer, exp_lr_scheduler, num_epochs, device)

    #Testing
    test_precision, test_recall, test_f1 = evaluate_model(test_loader, trained_model, device)
    print(f"Test Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1 Score: {test_f1:.4f}")

