In [2]:
import unet2dmodel
import torch

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split

# ============================
# 1. Custom Dataset definition
# ============================
class TumorDataset(Dataset):
    def __init__(self, image_dir, mask_dir):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.image_files = sorted(os.listdir(image_dir))
        self.mask_files = sorted(os.listdir(mask_dir))

        assert len(self.image_files) == len(self.mask_files), "Mismatch between image and mask counts."

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Load the numpy arrays
        img_path = os.path.join(self.image_dir, self.image_files[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_files[idx])

        image = np.load(img_path).astype(np.float32)
        mask = np.load(mask_path).astype(np.float32)

        # Normalize image and mask between 0 and 1
        image = (image - image.min()) / (image.max() - image.min() + 1e-8)
        mask = (mask - mask.min()) / (mask.max() - mask.min() + 1e-8)

        # Add channel dimension (1, H, W)
        image = np.expand_dims(image, axis=0)
        mask = np.expand_dims(mask, axis=0)

        # Convert to torch tensors
        image = torch.from_numpy(image)
        mask = torch.from_numpy(mask)

        return image, mask


# ============================
# 2. Create dataset
# ============================
image_dir = "/Users/daniel/Documents/CSAI/Internship/CODE/data/LUNA16/images"
mask_dir = "/Users/daniel/Documents/CSAI/Internship/CODE/data/LUNA16/labels"
dataset = TumorDataset(image_dir, mask_dir)

# ============================
# 3. Split into train/val/test
# ============================
total_size = len(dataset)
train_size = int(0.7 * total_size)
val_size = int(0.15 * total_size)
test_size = total_size - train_size - val_size

train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])

# ============================
# 4. Create DataLoaders
# ============================
batch_size = 32

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

# ============================
# 5. Example check
# ============================
images, masks = next(iter(train_loader))
print("Image batch shape:", images.shape)  # [32, 1, 64, 64]
print("Mask batch shape:", masks.shape)


Image batch shape: torch.Size([32, 1, 64, 64])
Mask batch shape: torch.Size([32, 1, 64, 64])


In [44]:
import torch
import torch.nn as nn

class DiceLoss(nn.Module):
    def __init__(self, smooth=1e-6):
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, preds, targets):
        preds = torch.sigmoid(preds)  # for logits output
        preds = preds.view(preds.size(0), -1)
        targets = targets.view(targets.size(0), -1)

        intersection = (preds * targets).sum(dim=1)
        dice = (2. * intersection + self.smooth) / (preds.sum(dim=1) + targets.sum(dim=1) + self.smooth)
        loss = 1 - dice.mean()
        return loss

In [45]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DoubleConv(nn.Module):
    """(Conv => BN => ReLU) * 2"""
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class UNet(nn.Module):
    def __init__(self, in_channels=1, out_channels=1):
        super(UNet, self).__init__()

        # Encoder
        self.enc1 = DoubleConv(in_channels, 64)
        self.enc2 = DoubleConv(64, 128)
        self.enc3 = DoubleConv(128, 256)
        self.enc4 = DoubleConv(256, 512)

        # Bottleneck
        self.bottleneck = DoubleConv(512, 1024)

        # Decoder
        self.up1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.dec1 = DoubleConv(1024, 512)
        self.up2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec2 = DoubleConv(512, 256)
        self.up3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec3 = DoubleConv(256, 128)
        self.up4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec4 = DoubleConv(128, 64)

        # Output
        self.out_conv = nn.Conv2d(64, out_channels, kernel_size=1)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        # Encoder
        x1 = self.enc1(x)
        x2 = self.pool(x1)
        x3 = self.enc2(x2)
        x4 = self.pool(x3)
        x5 = self.enc3(x4)
        x6 = self.pool(x5)
        x7 = self.enc4(x6)
        x8 = self.pool(x7)

        # Bottleneck
        x9 = self.bottleneck(x8)

        # Decoder
        x = self.up1(x9)
        x = self.dec1(torch.cat([x, x7], dim=1))
        x = self.up2(x)
        x = self.dec2(torch.cat([x, x5], dim=1))
        x = self.up3(x)
        x = self.dec3(torch.cat([x, x3], dim=1))
        x = self.up4(x)
        x = self.dec4(torch.cat([x, x1], dim=1))

        return self.out_conv(x)


In [50]:
import torch.optim as optim

# Initialize model, loss, optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=1, out_channels=1).to(device)
criterion = DiceLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * images.size(0)

    train_loss /= len(train_loader.dataset)

    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, masks in val_loader:
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            loss = criterion(outputs, masks)
            val_loss += loss.item() * images.size(0)

    val_loss /= len(val_loader.dataset)

    print(f"Epoch [{epoch+1}/{num_epochs}]  Train Loss: {train_loss:.4f}  Val Loss: {val_loss:.4f}")

    if train_loss < 0.2:
        break


Epoch [1/50]  Train Loss: 0.8288  Val Loss: 0.8627
Epoch [2/50]  Train Loss: 0.7773  Val Loss: 0.8620
Epoch [3/50]  Train Loss: 0.7391  Val Loss: 0.8617
Epoch [4/50]  Train Loss: 0.7126  Val Loss: 0.8614
Epoch [5/50]  Train Loss: 0.6938  Val Loss: 0.8606
Epoch [6/50]  Train Loss: 0.6772  Val Loss: 0.8591
Epoch [7/50]  Train Loss: 0.6642  Val Loss: 0.8532
Epoch [8/50]  Train Loss: 0.6532  Val Loss: 0.8240
Epoch [9/50]  Train Loss: 0.6441  Val Loss: 0.7863
Epoch [10/50]  Train Loss: 0.6365  Val Loss: 0.7610
Epoch [11/50]  Train Loss: 0.6304  Val Loss: 0.7498
Epoch [12/50]  Train Loss: 0.6247  Val Loss: 0.7237
Epoch [13/50]  Train Loss: 0.6200  Val Loss: 0.7057
Epoch [14/50]  Train Loss: 0.6158  Val Loss: 0.6896
Epoch [15/50]  Train Loss: 0.6126  Val Loss: 0.6670
Epoch [16/50]  Train Loss: 0.6096  Val Loss: 0.6528
Epoch [17/50]  Train Loss: 0.6074  Val Loss: 0.6471
Epoch [18/50]  Train Loss: 0.6049  Val Loss: 0.6292
Epoch [19/50]  Train Loss: 0.6025  Val Loss: 0.6260
Epoch [20/50]  Train 

In [47]:
import torch

def dice_coeff(preds, targets, smooth=1e-6):
    """Computes Dice coefficient (same idea as Dice loss, but as a score)."""
    preds = torch.sigmoid(preds)  # convert logits to probabilities
    preds = (preds > 0.5).float()  # threshold to binary mask

    preds = preds.view(preds.size(0), -1)
    targets = targets.view(targets.size(0), -1)

    intersection = (preds * targets).sum(dim=1)
    dice = (2. * intersection + smooth) / (preds.sum(dim=1) + targets.sum(dim=1) + smooth)
    return dice.mean().item()


model.eval()
test_dice = 0.0

with torch.no_grad():
    for images, masks in test_loader:
        images, masks = images.to(device), masks.to(device)
        outputs = model(images)
        test_dice += dice_coeff(outputs, masks)

test_dice /= len(test_loader)
print(f"Mean Dice coefficient on test set: {test_dice:.4f}")

Mean Dice coefficient on test set: 0.1342


In [None]:
'''class UNet2DModel(nn.Module):
    def __init__(self, in_channels=1, out_channels=1, init_features=32):
        super(UNet2DModel, self).__init__()
        features = init_features
        self.encoder1 = UNet2DModel._block(in_channels, features, name="enc1")
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder2 = UNet2DModel._block(features, features * 2, name="enc2")
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder3 = UNet2DModel._block(features * 2, features * 4, name="enc3")
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.encoder4 = UNet2DModel._block(features * 4, features * 8, name="enc4")
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.bottleneck = UNet2DModel._block(features * 8, features * 16, name="bottleneck")

        self.upconv4 = nn.ConvTranspose2d(
            features * 16, features * 8, kernel_size=2, stride=2
        )
        self.decoder4 = UNet2DModel._block((features * 8) * 2, features * 8, name="dec4")
        self.upconv3 = nn.ConvTranspose2d(
            features * 8, features * 4, kernel_size=2, stride=2
        )
        self.decoder3 = UNet2DModel._block((features * 4) * 2, features * 4, name="dec3")
        self.upconv2 = nn.ConvTranspose2d(
            features * 4, features * 2, kernel_size=2, stride=2
        )
        self.decoder2 = UNet2DModel._block((features * 2) * 2, features * 2, name="dec2")
        self.upconv1 = nn.ConvTranspose2d(
            features * 2, features, kernel_size=2, stride=2
        )
        self.decoder1 = UNet2DModel._block(features * 2, features, name="dec1")

    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool1(enc1))
        enc3 = self.encoder3(self.pool2(enc2))
        enc4 = self.encoder4(self.pool3(enc3))

        bottleneck = self.bottleneck(self.pool4(enc4))

        dec4 = self.upconv4(bottleneck)
        dec4 = torch.cat((dec4, enc4), dim=1)
        dec4 = self.decoder4(dec4)
        dec3 = self.upconv3(dec4)
        dec3 = torch.cat((dec3, enc3), dim=1)
        dec3 = self.decoder3(dec3)
        dec2 = self.upconv2(dec3)
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.decoder2(dec2)
        dec1 = self.upconv1(dec2)
        dec1 = torch.cat((dec1, enc1), dim=1)
        dec1 = self.decoder1(dec1)

        return torch.sigmoid(dec1)'''