# Set up Environment

In [15]:
# Import Packages
import os
from pathlib import Path

import numpy as np
#import pandas as pd
#import matplotlib

import nibabel as nib
import torch
import torch.nn as nn
import random
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms


In [4]:
SEED = 1

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

device = device = torch.device("cuda")
print(device)

cuda


In [67]:
data_dir = './data/cleaned_data'

train_dir = Path.cwd() / "data/train"
train_dir.mkdir(exist_ok=True, parents=True)

test_dir = Path.cwd() / "data/test"
test_dir.mkdir(exist_ok=True, parents=True)

# Load Data for Pytorch

In [6]:
# CHECK - correct!
# Total Number of Images in our Data Sets
print(f'Total Number of Images in Cleaned Dataset: {len([f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))])}')
print(f'Total Number of Images in Trian Dataset: {len([f for f in os.listdir(train_dir) if os.path.isfile(os.path.join(train_dir, f))])}')
print(f'Total Number of Images in Test Dataset: {len([f for f in os.listdir(test_dir) if os.path.isfile(os.path.join(test_dir, f))])}')

Total Number of Images in Cleaned Dataset: 906
Total Number of Images in Trian Dataset: 812
Total Number of Images in Test Dataset: 94


In [71]:
import os
import torch
import nibabel as nib
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

new_size = (256, 256, 256)  # Target size for MRI volumes

class MRIDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.file_list = os.listdir(data_dir)
        self.transform = transform

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_name = self.file_list[idx]
        img_path = os.path.join(self.data_dir, img_name)
        
        # Load MRI data
        image = nib.load(img_path).get_fdata()  # Load as NumPy array

        # If shape is (D, H, W, 1), remove the last singleton dimension
        if len(image.shape) == 4 and image.shape[-1] == 1:
            image = image[..., 0]  # Convert (D, H, W, 1) -> (D, H, W)

        image = torch.tensor(image, dtype=torch.float32).unsqueeze(0)  # Convert to (1, D, H, W)

        if self.transform:
            image = self.transform(image)

        # Extract label from filename (modify according to your filename structure)
        label = int(img_name.split('_')[-1].split('.')[0])

        return image, label

# Custom Transform Function for 3D MRI
def custom_transform(img):
    """
    Transforms 3D MRI image:
    - Ensures the shape is (1, D, H, W)
    - Resizes to (256, 256, 256)
    - Normalizes intensity values
    """
    if img.ndimension() == 3:  # Ensure 4D (1, D, H, W) if needed
        img = img.unsqueeze(0)

    img = F.interpolate(img.unsqueeze(0), size=new_size, mode="trilinear", align_corners=False).squeeze(0)  # Resize
    img = (img - img.mean()) / (img.std() + 1e-8)  # Normalize
    return img

# Create datasets
train_dataset = MRIDataset(train_dir, transform=custom_transform)
test_dataset = MRIDataset(test_dir, transform=custom_transform)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)  # Adjust batch size
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Example: Fetch a sample
sample_img, sample_label = next(iter(train_loader))
print(f"Sample image shape: {sample_img.shape}, Label: {sample_label}")


Sample image shape: torch.Size([8, 1, 256, 256, 256]), Label: tensor([1, 2, 1, 1, 2, 1, 1, 2])


In [76]:
import torch
import torch.nn as nn
from torchvision import models

# Load Pretrained DenseNet121
model = models.densenet121(pretrained=True)

# Modify First Layer to Accept 1-Channel MRI Inputs (instead of RGB 3-channels)
model.features.conv0 = nn.Conv2d(
    in_channels=1,  # Keep single-channel input
    out_channels=64, 
    kernel_size=7, 
    stride=2, 
    padding=3, 
    bias=False
)

# Modify Output Layer for Custom Classes (e.g., 2 classes: Tumor vs. Normal)
num_ftrs = model.classifier.in_features
model.classifier = nn.Linear(num_ftrs, 2)  # Change '2' based on number of classes

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Print Model Summary
#print(model)

# Example usage with a dummy 3D input tensor
input_tensor = torch.randn(8, 1, 256, 256, 256)  # Batch size 8, 1 channel, 256 volume
output = model(input_tensor)
print(output) # Output will be a probability between 0 and 1


RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [8, 1, 256, 256, 256]

In [None]:
#https://github.com/Tencent/MedicalNet


In [19]:
output_directory = ".data/outputs"
if not os.path.exists(output_directory):
        os.makedirs(output_directory)

In [18]:
import torchvision.models as models
import torch.optim as optim

# Load pre-trained ResNet50
model = models.resnet50(weights=True).to(device)

# Modify the output layer for binary classification
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 1)
model = nn.Sequential(model, nn.Sigmoid())

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Test

# Example input image tensor (batch_size=1, channels=3, height=512, width=512)
input_tensor = torch.randn(224,224)

# Move inputs to the same device as the model
model = model.to(device)
input_tensor = input_tensor.to(device)

# Forward pass
bbox = model(input_tensor)

print("BBox Refinements:", bbox.shape)
print(bbox)

In [20]:
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in target.items()} for target in targets]
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Calculate the loss
        loss = criterion(outputs, labels.unsqueeze(1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

TypeError: Unexpected type <class 'tuple'>

In [8]:
# 224 x 224 crop
#conv -> bn -> activation
#SGD with mini batch size 256
#learning rate = 0.1 then 0.1/10 when plateaus
#no dropout
#identity mapping - input dim = output dim

class bottleneck_block(nn.Module): #create "bottleneck building block" for ResNet 50 (expanded from resnet 34) for more efficiency
    def __init__(self, in_channels, out_channels, downsample=None, stride=1):
        super(bottleneck_block, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0) #1x1, 64
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1) #3x3, 64 - padding because no longer kernel_size 1
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels*4, kernel_size=1, stride=1, padding=0) #1x1, 256 (64 x expansion factor of 4)
        self.bn3 = nn.BatchNorm2d(out_channels*4)
        self.downsample = downsample #use if need to change shape
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.downsample is not None:
            identity = self.downsample(identity)

        x += identity
        x = self.relu(x)
        return x


class ResNet50(nn.Module): # [3,4,6,3] - first layer 3 blocks, second layer 4 blocks, etc.
    def __init__(self, bottleneck_block): # layers = how many times to call block
        super(ResNet50, self).__init__()
        self.in_channels = 64

        #Initial Layer
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3) # 7x7, 64, stride 2
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        #ResNet Layers
        self.layer1 = self.layer(bottleneck_block, num_blocks=3, out_channels=64, stride=1)
        self.layer2 = self.layer(bottleneck_block, num_blocks=4, out_channels=128, stride=2)
        self.layer3 = self.layer(bottleneck_block, num_blocks=6, out_channels=256, stride=2)
        self.layer4 = self.layer(bottleneck_block, num_blocks=3, out_channels=512, stride=2) #512*4 = 2048 channnels

        self.avgpool = nn.AdaptiveAvgPool2d((1,1)) #fix to 1x1 size
        self.fc = nn.Linear(512*4, 120)


    def forward(self, x):
        x = self.conv1(x)
        #print(x.size()) #starts off with stride 2 so should be 112 if input size 224
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        #print(x.size()) #56x56

        x = self.layer1(x)
        #print(x.size()) #56x56
        x = self.layer2(x)
        #print(x.size()) #28x28
        x = self.layer3(x)
        #print(x.size()) #14x14
        x = self.layer4(x)
        #print(x.size()) #7x7

        x = self.avgpool(x)
        #print(x.size()) #1x1
        x = x.reshape(x.shape[0], -1)
        #print(x.size()) #2048 outputs
        x = self.fc(x)
        #print(x.size()) #120 outputs

        return x


    def layer(self, block, num_blocks, out_channels, stride):
        downsample = None
        layers = []

        if stride != 1 or self.in_channels != out_channels*4: #identity mapping to overcome degredation problem
            downsample = nn.Sequential(nn.Conv2d(self.in_channels
                                                          , out_channels*4
                                                          , kernel_size=1
                                                          , stride=stride)
                                                          , nn.BatchNorm2d(out_channels*4))
        layers.append(block(self.in_channels, out_channels, downsample, stride)) #changes the # of channels
        self.in_channels = out_channels*4

        for i in range(num_blocks - 1):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers) #*unpacks list

def test():
    net = ResNet50(bottleneck_block)
    x = torch.randn(2,3,224,224)
    y = net(x).to(device)
    print(y.shape)

test()

torch.Size([2, 120])
