In [1]:
import pandas as pd
import nibabel as nib
import numpy as np
import os
import ast

# 1. Load and clean data
df = pd.read_csv('merged_brain_age_hdr.csv', converters={'hdr_paths': ast.literal_eval})
df['hdr_paths'] = df['hdr_paths'].apply(lambda x: [p.strip() for p in x if p.strip()])
df['selected_hdr'] = df['hdr_paths'].apply(lambda x: x[1] if len(x)>=2 else None)
df = df.dropna(subset=['selected_hdr']).reset_index(drop=True)

# 2. Convert to absolute paths
df['selected_hdr'] = df['selected_hdr'].apply(os.path.abspath)

# 3. Validate paths
df = df[df['selected_hdr'].apply(os.path.exists)].reset_index(drop=True)

# 4. Verify sample file
if df.empty:
    print("DataFrame is empty after filtering! Check file paths.")
else:
    sample_path = df['selected_hdr'].iloc[0]
    print(f"Sample path: {sample_path}")
    print(f"File exists: {os.path.exists(sample_path)}")

try:
    sample_img = nib.load(sample_path)
    print(f"Image shape: {sample_img.header.get_data_shape()}")
except Exception as e:
    print(f"Error loading sample: {str(e)}")

Sample path: E:\Brain Age prediction\Brain Age prediction\disc1\OAS1_0001_MR1\processed\MPRAGE\T88_111\OAS1_0001_MR1_mpr_n4_anon_111_t88_masked_gfc.hdr
File exists: True
Image shape: (176, 208, 176, 1)


In [2]:
def load_3d_volume(hdr_path, age):
    try:
        # Ensure hdr_path is a string
        if not isinstance(hdr_path, str):
            hdr_path = hdr_path.decode('utf-8')
        
        # Load NIfTI file
        img = nib.load(hdr_path).get_fdata()

        # Normalize to [0, 1] with epsilon to avoid division by zero
        min_val, max_val = img.min(), img.max()
        img = (img - min_val) / (max_val - min_val + 1e-8)

        # Add channel dimension if missing
        if img.ndim == 3:
            img = np.expand_dims(img, -1)  # Shape: (D, H, W, 1)

        return img.astype(np.float32), np.array(age, dtype=np.float32)

    except Exception as e:
        print(f"Error loading {hdr_path}: {e}")
        return None, None

sample_path = df['selected_hdr'].iloc[0]
sample_age = df['Age'].iloc[0]

img, age = load_3d_volume(sample_path, sample_age)
print(f"Image shape: {img.shape} | Age: {age}")

Image shape: (176, 208, 176, 1) | Age: 74.0


In [3]:
import torch
from torch.utils.data import Dataset, DataLoader 
import torchio as tio


class BrainAgeDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data = pd.read_csv(csv_file, converters={'hdr_paths': ast.literal_eval})
        self.data['hdr_paths'] = self.data['hdr_paths'].apply(lambda x: [p.strip() for p in x if p.strip()])
        self.data['selected_hdr'] = self.data['hdr_paths'].apply(lambda x: x[1] if len(x) >= 2 else None)
        self.data = self.data.dropna(subset=['selected_hdr']).reset_index(drop=True)
        self.data['selected_hdr'] = self.data['selected_hdr'].apply(os.path.abspath)
        self.data = self.data[self.data['selected_hdr'].apply(os.path.exists)].reset_index(drop=True)

        # Assign the transform correctly
        self.transform = transform

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        hdr_path = self.data.loc[idx, 'selected_hdr']
        age = self.data.loc[idx, 'Age']
    
        img, age = self.load_3d_volume(hdr_path, age)
    
        if img is None:
            raise ValueError(f"Error loading image at index {idx}")
    
        #print(f"Initial image shape: {img.shape}")  # (D, H, W, C)
    
        # Ensure the shape is (D, H, W, C)
        if img.ndim == 3:  # If missing channel dimension
            img = np.expand_dims(img, -1)
    
        #print(f"Shape after adding channel: {img.shape}")  # (D, H, W, C)
    
        # Apply augmentation if a transform is provided
        if self.transform:
            img_tensor = torch.tensor(img).permute(3, 0, 1, 2)  # (D, H, W, C) -> (C, D, H, W)
            #print(f"Shape before augmentation: {img_tensor.shape}")  # (C, D, H, W)
            img = self.transform(tio.ScalarImage(tensor=img_tensor)).tensor.numpy()
            #print(f"Shape after augmentation: {img.shape}")  # (C, D, H, W)
    
        # Adjust the final shape to (C, D, H, W)
        img = torch.tensor(img).permute(0, 1, 2, 3)  # Keep the original (C, D, H, W)
    
        #print(f"Final shape before returning: {img.shape}")  # (1, D, H, W)
    
        return img, torch.tensor(age, dtype=torch.float32)
    
        
    def load_3d_volume(self, hdr_path, age):
        try:
            img = nib.load(hdr_path).get_fdata()

            # Normalize to [0, 1]
            min_val, max_val = img.min(), img.max()
            img = (img - min_val) / (max_val - min_val + 1e-8)

            if img.ndim == 3:  # (176, 208, 176) -> (176, 208, 176, 1)
                img = np.expand_dims(img, -1)

            return img.astype(np.float32), np.array(age, dtype=np.float32)

        except Exception as e:
            print(f"Error loading {hdr_path}: {e}")
            return None, None

In [4]:
import torch.nn as nn
import torch.optim as optim

class My3DBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout_prob = .3):
        super(My3DBlock,self).__init__()
        self.conv1 = nn.Conv3d(in_channels, out_channels, kernel_size = 3, padding = 1)
        self.conv2 = nn.Conv3d(out_channels, out_channels, kernel_size = 3, padding = 1)
        self.conv3 = nn.Conv3d(out_channels, out_channels, kernel_size = 3, padding = 1)
        self.conv4 = nn.Conv3d(out_channels, out_channels, kernel_size = 3, padding = 1)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool3d(kernel_size = 2, stride = 2)
        self.dropout = nn.Dropout(p=dropout_prob)

    def forward(self,x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.relu(self.conv3(x))
        x = self.relu(self.conv4(x))
        x = self.maxpool(x)
        x = self.dropout(x)
        return x

In [5]:
class BrainAgeModel(nn.Module):
    def __init__(self):
        super(BrainAgeModel, self).__init__()
        
        # Define the 4 My3DBlocks
        self.block1 = My3DBlock(1, 16)    # Input has 1 channel
        self.block2 = My3DBlock(16, 32)
        self.block3 = My3DBlock(32, 64)
        self.block4 = My3DBlock(64, 128)
        
        # Fully connected layers
        self.fc1 = nn.Linear(128 * 11 * 13 * 11, 128)  # Adjust dimensions based on input size
        self.fc2 = nn.Linear(128, 1)  # Output: Regression (age prediction)
        
        self.dropout = nn.Dropout(p=0.4)  # Additional dropout before the final layer
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)

        # Flatten for fully connected layers
        x = x.view(x.size(0), -1)  # Shape: (batch_size, flattened_features)

        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

In [13]:
# Define augmentations using TorchIO
augmentations = tio.Compose([
    tio.RandomAffine(scales=(1, 1), degrees=(0, 0, 15), translation=0),
    tio.RandomBlur(p=0.3),
    tio.RandomNoise(p=0.3)
])

# Datasets and DataLoader
from torch.utils.data import random_split, DataLoader

# Load full dataset
full_dataset = BrainAgeDataset(csv_file='merged_brain_age_hdr.csv', transform=augmentations)

# Define split ratio (e.g., 80% train, 20% validation)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

# Split dataset
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True, num_workers=0)  # Fixed typo 'shuffel' -> 'shuffle'


# Model, Loss, Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BrainAgeModel().to(device)
criterion = nn.MSELoss()  # For regression
optimizer = optim.Adam(model.parameters(), lr=0.001)

print(f"Using device: {device}")

Using device: cuda


In [14]:
# Model instance
model = BrainAgeModel()

# Test with dummy input (batch_size=2)
sample_input = torch.rand(1, 1, 176, 208, 176)  # Adjust shape as needed
output = model(sample_input)

print(f"Input shape: {sample_input.shape}")
print(f"Output shape: {output.shape}")  # Expected: (batch_size, 1)

Input shape: torch.Size([1, 1, 176, 208, 176])
Output shape: torch.Size([1, 1])


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim

def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=10):
    """
    Trains the model and evaluates on validation set.
    
    Args:
        model (nn.Module): The neural network model.
        train_loader (DataLoader): DataLoader for training data.
        val_loader (DataLoader): DataLoader for validation data.
        criterion (nn.Module): Loss function.
        optimizer (Optimizer): Optimization algorithm.
        device (torch.device): Device to run the training (CPU/GPU).
        num_epochs (int): Number of epochs.
    """
    model.to(device)
    
    for epoch in range(num_epochs):
        # ---------------- TRAINING PHASE ----------------
        model.train()  # Set model to training mode
        train_loss = 0.0
        
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()  # Reset gradients
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs.squeeze(), targets)  # Compute loss
            loss.backward()  # Backpropagation
            optimizer.step()  # Update weights
            
            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)

        # ---------------- VALIDATION PHASE ----------------
        model.eval()  # Set model to evaluation mode
        val_loss = 0.0
        
        with torch.no_grad():  # No gradient calculation during validation
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs.squeeze(), targets)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)

        # Print loss per epoch
        print(f"Epoch [{epoch+1}/{num_epochs}] - Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

    print("Training complete!")
    return model


# ------------------ CALL TRAINING FUNCTION ------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define loss function and optimizer
criterion = nn.MSELoss()  # Assuming regression task (age prediction)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Call the training function using your existing model and dataloaders
trained_model = train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=10)




Epoch [1/10] - Train Loss: 1375.4220, Val Loss: 1077.2207
Epoch [2/10] - Train Loss: 804.0489, Val Loss: 606.1556
Epoch [3/10] - Train Loss: 730.5050, Val Loss: 613.8200
Epoch [4/10] - Train Loss: 750.2138, Val Loss: 605.4150
Epoch [5/10] - Train Loss: 747.0973, Val Loss: 610.4305
Epoch [6/10] - Train Loss: 684.8285, Val Loss: 886.2824
Epoch [7/10] - Train Loss: 763.7273, Val Loss: 604.8235
Epoch [8/10] - Train Loss: 781.2485, Val Loss: 611.8008
Epoch [9/10] - Train Loss: 730.3856, Val Loss: 843.1059
Epoch [10/10] - Train Loss: 736.4224, Val Loss: 654.9553
Training complete!


In [8]:
def quick_test():
    model.train()
    for i, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)
        loss.backward()
        optimizer.step()
        print(f"Batch {i+1} - Loss: {loss.item():.4f} ")
        if i == 2:
            break


quick_test()

Batch 1 - Loss: 3795.8010 
Batch 2 - Loss: 3280.9177 
Batch 3 - Loss: 17971.0859 
