<a href="https://colab.research.google.com/github/Hitika-Jain/ParkInsight/blob/main/mri_model_main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pydicom

In [None]:
import os
import pydicom
import numpy as np
from glob import glob
from tqdm import tqdm

def load_dicom_volume(dicom_dir):
    """
    Loads and stacks DICOM slices from a given directory into a 3D volume.
    Normalizes the volume to [0, 1].
    """
    dicom_files = glob(os.path.join(dicom_dir, "*.dcm"))
    slices = []

    for f in dicom_files:
        try:
            ds = pydicom.dcmread(f)
            slices.append(ds)
        except:
            continue

    if not slices:
        return None

    # Sort slices by InstanceNumber to maintain correct order
    try:
        slices.sort(key=lambda s: int(s.InstanceNumber))
    except:
        return None

    try:
        images = [s.pixel_array for s in slices]
        volume = np.stack(images, axis=0).astype(np.float32)
        # Normalize to [0, 1]
        volume -= np.min(volume)
        volume /= (np.max(volume) + 1e-5)
        return volume
    except:
        return None

def process_all_scans(root_dir, save_dir, label):
    """
    Recursively processes all iso-level folders in the given root_dir,
    converts them into 3D NumPy volumes, and saves them with global numbering.
    """
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    scan_counter = 1

    for patient in tqdm(os.listdir(root_dir), desc=f"Processing {label}"):
        patient_path = os.path.join(root_dir, patient)
        if not os.path.isdir(patient_path):
            continue

        for subdir, _, _ in os.walk(patient_path):
            dicom_files = glob(os.path.join(subdir, "*.dcm"))
            if dicom_files:
                volume = load_dicom_volume(subdir)
                if volume is not None:
                    filename = f"{label}_subject{scan_counter}.npy"
                    save_path = os.path.join(save_dir, filename)
                    np.save(save_path, volume)
                    scan_counter += 1

# Example usage
if __name__ == "__main__":
    pd_root = "/content/drive/MyDrive/parkison's"
    hc_root = "/content/drive/MyDrive/Health Controlled"
    save_path = "/content/drive/MyDrive/3D_Numpy_Volumes"

    process_all_scans(pd_root, os.path.join(save_path, "pd"), label="pd")
    process_all_scans(hc_root, os.path.join(save_path, "hc"), label="hc")

In [None]:
import os

def count_npy_files(directory):
    return len([f for f in os.listdir(directory) if f.endswith(".npy")])

# Paths to your saved directories
pd_dir = "/content/drive/MyDrive/3D_Numpy_Volumes/pd"
hc_dir = "/content/drive/MyDrive/3D_Numpy_Volumes/hc"

pd_count = count_npy_files(pd_dir)
hc_count = count_npy_files(hc_dir)

print(f"Parkinson's scans (pd): {pd_count}")
print(f"Healthy scans (hc): {hc_count}")

In [None]:
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display
import numpy as np

volume = np.load("/content/drive/MyDrive/3D_Numpy_Volumes/hc/hc_subject1.npy")
def show_views(slice_index):
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))

    # Sagittal view (slice along axis=0)
    axes[0].imshow(volume[slice_index, :, :], cmap='gray')
    axes[0].set_title(f"Sagittal (X={slice_index})")
    axes[0].axis('off')

    # Coronal view (slice along axis=1)
    axes[1].imshow(volume[:, slice_index, :], cmap='gray')
    axes[1].set_title(f"Coronal (Y={slice_index})")
    axes[1].axis('off')

    # Axial view (slice along axis=2)
    axes[2].imshow(volume[:, :, slice_index], cmap='gray')
    axes[2].set_title(f"Axial / Top-Down (Z={slice_index})")
    axes[2].axis('off')

    plt.show()

widgets.interact(
    show_views,
    slice_index=widgets.IntSlider(min=0, max=255, step=1, value=128)
)

In [None]:
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display
import numpy as np

# Load and squeeze to ensure 3D volume
volume = np.load("/content/drive/MyDrive/3D_Numpy_Volumes/pd/pd_subject1.npy")
volume = np.squeeze(volume)  # Ensures shape is (D, H, W)

# Choose the smallest axis size as slider max
min_dim = min(volume.shape)

def show_views(slice_index):
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))

    axes[0].imshow(volume[slice_index, :, :], cmap='gray')
    axes[0].set_title(f"Sagittal (X={slice_index})")
    axes[0].axis('off')

    axes[1].imshow(volume[:, slice_index, :], cmap='gray')
    axes[1].set_title(f"Coronal (Y={slice_index})")
    axes[1].axis('off')

    axes[2].imshow(volume[:, :, slice_index], cmap='gray')
    axes[2].set_title(f"Axial / Top-Down (Z={slice_index})")
    axes[2].axis('off')

    plt.show()

widgets.interact(
    show_views,
    slice_index=widgets.IntSlider(min=0, max=min_dim - 1, step=1, value=min_dim // 2)
)

Extracting the voxels from .npy files now for following regions:

*   5: Left Caudate
*   6: Left Putamen
*   7: Left Pallidum
*   8: Brain-Stem
*   16: Right Caudate
*   17: Right Putamen
*   18: Right Pallidum

In [None]:
!pip install nibabel nilearn

In [None]:
import os
import numpy as np
import nibabel as nib
from nilearn import datasets
from nilearn.image import resample_to_img, new_img_like, math_img
import gc  # Garbage collection
from tqdm import tqdm

# ---------------- CONFIG ----------------
data_root = "/content/drive/MyDrive/3D_Numpy_Volumes"
save_root = "/content/drive/MyDrive/Extracted_Regions"
region_indices = [5, 6, 7, 8, 16, 17, 18]
affine = np.array([
    [2, 0, 0, -90],
    [0, 2, 0, -126],
    [0, 0, 2, -72],
    [0, 0, 0, 1]
])
# ----------------------------------------

# Load Harvard-Oxford atlas once
atlas = datasets.fetch_atlas_harvard_oxford('sub-maxprob-thr50-1mm')
atlas_img = atlas.maps
atlas_labels = atlas.labels
atlas_data = atlas_img.get_fdata()

# Traverse both classes: pd and hc
for label in ['pd', 'hc']:
    folder_path = os.path.join(data_root, label)
    npy_files = [f for f in os.listdir(folder_path) if f.endswith('.npy')]

    for npy_file in tqdm(npy_files, desc=f"Processing {label}"):
        subject_id = npy_file.replace('.npy', '')
        npy_path = os.path.join(folder_path, npy_file)

        # Load and preprocess brain array
        brain_array = np.load(npy_path)
        if brain_array.ndim == 4 and brain_array.shape[0] == 1:
          brain_array = brain_array[0]
        elif brain_array.ndim == 4:
          raise ValueError(f"{npy_path} has unexpected 4D shape: {brain_array.shape}")

        # Wrap into Nifti image
        brain_img = nib.Nifti1Image(brain_array, affine)


        # Resample to atlas space
        resampled_brain = resample_to_img(brain_img, atlas_img, interpolation='continuous', force_resample=True)

        for region_index in region_indices:
            region_label = atlas_labels[region_index]
            region_mask_data = (atlas_data == region_index).astype(np.uint8)
            region_mask = new_img_like(atlas_img, region_mask_data)

            # Masking
            region_volume_img = math_img("img1 * img2", img1=resampled_brain, img2=region_mask)
            region_volume = region_volume_img.get_fdata()

            if np.count_nonzero(region_volume) == 0:
                continue

            # Bounding box
            coords = np.nonzero(region_volume)
            zmin, zmax = np.min(coords[2]), np.max(coords[2])
            ymin, ymax = np.min(coords[1]), np.max(coords[1])
            xmin, xmax = np.min(coords[0]), np.max(coords[0])
            region_crop = region_volume[xmin:xmax+1, ymin:ymax+1, zmin:zmax+1]

            # Save
            region_folder = os.path.join(save_root, f"region_{region_index}")
            os.makedirs(region_folder, exist_ok=True)
            save_path = os.path.join(region_folder, f"{label}_{subject_id}.npy")
            np.save(save_path, region_crop)

        # Clean up memory
        del brain_array, brain_img, resampled_brain, region_volume_img, region_volume,
        gc.collect()

visualization of voxels

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os

region_paths = {
    "Left Caudate": "/content/drive/MyDrive/Extracted_Regions/region_5",
    "Right Caudate": "/content/drive/MyDrive/Extracted_Regions/region_16",
    "Left Putamen": "/content/drive/MyDrive/Extracted_Regions/region_6",
    "Right Putamen": "/content/drive/MyDrive/Extracted_Regions/region_17",
    "Left Pallidum": "/content/drive/MyDrive/Extracted_Regions/region_7",
    "Right Pallidum": "/content/drive/MyDrive/Extracted_Regions/region_18",
    "Brain-Stem": "/content/drive/MyDrive/Extracted_Regions/region_8"
}

plt.figure(figsize=(12, 8))

for i, (region_name, path) in enumerate(region_paths.items()):
    # Pick one .npy file from the region
    sample_file = sorted(os.listdir(path))[0]
    data = np.load(os.path.join(path, sample_file))

    # Choose the middle axial slice
    mid_slice = data.shape[2] // 2

    plt.subplot(3, 3, i + 1)
    plt.imshow(data[:, :, mid_slice], cmap='gray')
    plt.title(f"{region_name}\n{sample_file}")
    plt.axis('off')

plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from ipywidgets import interact, IntSlider

# Load a single subject's region file (e.g., Brain-Stem)
region_path = "/content/drive/MyDrive/Extracted_Regions/region_8/hc_hc_subject1.npy"
data = np.load(region_path)

# Viewer
def scroll_slices(slice_idx):
    plt.figure(figsize=(5, 5))
    plt.imshow(data[:, :, slice_idx], cmap='gray')
    plt.title(f"Slice {slice_idx}")
    plt.axis('off')
    plt.show()

# Slider
interact(scroll_slices, slice_idx=IntSlider(min=0, max=data.shape[2]-1, step=1, value=data.shape[2]//2));

3d vit model

In [None]:
import os
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torch import nn
import torch.nn.functional as F
from einops import rearrange, repeat
from torchvision import transforms

In [None]:
from collections import Counter

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, regions, transform=None, target_size=(64, 64, 64)):
        """
        Args:
            root_dir (str): Directory with all the brain region MRI files.
            regions (list): List of brain region folder names.
            transform (callable, optional): Optional transform to be applied on a sample.
            target_size (tuple): The target size to resize the MRI images.
        """
        self.root_dir = root_dir
        self.regions = regions
        self.transform = transform
        self.target_size = target_size
        self.files = []

        # Traverse the directory and collect all MRI files
        for region in regions:
            region_path = os.path.join(root_dir, region)
            # print(f"Checking region: {region_path}")  # Debugging
            for filename in os.listdir(region_path):
                if filename.endswith('.npy'):  # Assuming .npy files
                    # print(f"Found file: {filename}")  # Debugging
                    label = 0 if filename.startswith('hc') else 1  # Label extraction based on filename prefix
                    self.files.append((os.path.join(region_path, filename), label))

        # Print the label distribution
        # labels = [label for _, label in self.files]
        # print("🧠 Dataset label distribution:", Counter(labels))

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path, label = self.files[idx]
        data = np.load(file_path)

        # Resize the data
        data = self.resize_volume(data, self.target_size)

        # Normalize the data (standardization)
        data = (data - np.mean(data)) / (np.std(data) + 1e-5)

        # Add channel dimension
        data = np.expand_dims(data, axis=0)  # Shape: (1, D, H, W)
        data = torch.tensor(data, dtype=torch.float32)

        # Optional transform
        if self.transform:
            data = self.transform(data)

        return data, label

    def resize_volume(self, volume, target_size):
        """
        Resize the 3D MRI volume to the target size using trilinear interpolation.
        """
        volume = torch.tensor(volume).unsqueeze(0).unsqueeze(0)  # Add batch and channel dimensions
        resized = F.interpolate(volume, size=target_size, mode='trilinear', align_corners=False)
        return resized.squeeze(0).squeeze(0).numpy()  # Remove batch and channel dimensions

In [None]:
for data, labels in dataloader:
    print("Data shape:", data.shape)
    print("Label sample:", labels[:5])
    break

In [None]:
# Transformer Model Definition
class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, **kwargs):
        return self.fn(x, **kwargs) + x


class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn

    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)


class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)


class Attention(nn.Module):
    def __init__(self, dim, heads=8, dim_head=64, dropout=0.):
        super().__init__()
        inner_dim = dim_head * heads
        self.heads = heads
        self.scale = dim ** -0.5

        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias=False)
        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x, mask=None):
        b, n, _, h = *x.shape, self.heads
        qkv = self.to_qkv(x).chunk(3, dim=-1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=h), qkv)

        dots = torch.einsum('bhid,bhjd->bhij', q, k) * self.scale
        mask_value = -torch.finfo(dots.dtype).max

        if mask is not None:
            mask = F.pad(mask.flatten(1), (1, 0), value=True)
            assert mask.shape[-1] == dots.shape[-1], 'mask has incorrect dimensions'
            mask = mask[:, None, :] * mask[:, :, None]
            dots.masked_fill_(~mask, mask_value)
            del mask

        attn = dots.softmax(dim=-1)

        out = torch.einsum('bhij,bhjd->bhid', attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        out = self.to_out(out)
        return out


class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                Residual(PreNorm(dim, Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout))),
                Residual(PreNorm(dim, FeedForward(dim, mlp_dim, dropout=dropout)))
            ]))

    def forward(self, x, mask=None):
        for attn, ff in self.layers:
            x = attn(x, mask=mask)
            x = ff(x)
        return x


# class ViT3D(nn.Module):
#     MIN_NUM_PATCHES=16
#     def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool='cls', channels=1, dim_head=64, dropout=0., emb_dropout=0.):
#         super().__init__()
#         assert all([each_dimension % patch_size ==
#                     0 for each_dimension in image_size])
#         num_patches = (image_size[0] // patch_size) * \
#             (image_size[1] // patch_size)*(image_size[2] // patch_size)
#         patch_dim = channels * patch_size ** 3
#         assert num_patches > MIN_NUM_PATCHES, f'your number of patches ({num_patches}) is way too small for attention to be effective (at least 16). Try decreasing your patch size'
#         assert pool in {
#             'cls', 'mean'}, 'pool type must be either cls (cls token) or mean (mean pooling)'

#         self.patch_size = patch_size

#         self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
#         self.patch_to_embedding = nn.Linear(patch_dim, dim)
#         self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
#         self.dropout = nn.Dropout(emb_dropout)

#         self.transformer = Transformer(
#             dim, depth, heads, dim_head, mlp_dim, dropout)

#         self.pool = pool
#         self.to_latent = nn.Identity()

#         self.mlp_head = nn.Sequential(
#             nn.LayerNorm(dim),
#             nn.Linear(dim, num_classes)
#         )

#     def forward(self, img, mask=None):
#         p = self.patch_size

#         # Ensure the input tensor has the correct shape: (batch, channels, depth, height, width)
#         x = rearrange(
#             img, 'b c (x p1) (y p2) (z p3) -> b (x y z) (p1 p2 p3 c)', p1=p, p2=p, p3=p)
#         x = self.patch_to_embedding(x)
#         b, n, _ = x.shape

#         cls_tokens = repeat(self.cls_token, '() n d -> b n d', b=b)
#         x = torch.cat((cls_tokens, x), dim=1)
#         x += self.pos_embedding[:, :(n + 1)]
#         x = self.dropout(x)

#         x = self.transformer(x, mask)

#         x = x.mean(dim=1) if self.pool == 'mean' else x[:, 0]

#         x = self.to_latent(x)
#         return self.mlp_head(x)

In [None]:
class ViT3D(nn.Module):
    MIN_NUM_PATCHES = 16  # Define the minimum number of patches

    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool='cls', channels=1, dim_head=64, dropout=0., emb_dropout=0.):
        super().__init__()
        assert all([each_dimension % patch_size ==
                    0 for each_dimension in image_size])
        num_patches = (image_size[0] // patch_size) * \
            (image_size[1] // patch_size)*(image_size[2] // patch_size)
        patch_dim = channels * patch_size ** 3
        assert num_patches > self.MIN_NUM_PATCHES, f'your number of patches ({num_patches}) is way too small for attention to be effective (at least 16). Try decreasing your patch size'
        assert pool in {
            'cls', 'mean'}, 'pool type must be either cls (cls token) or mean (mean pooling)'

        self.patch_size = patch_size

        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        self.patch_to_embedding = nn.Linear(patch_dim, dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.dropout = nn.Dropout(emb_dropout)

        self.transformer = Transformer(
            dim, depth, heads, dim_head, mlp_dim, dropout)

        self.pool = pool
        self.to_latent = nn.Identity()

        self.mlp_head = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, num_classes)
        )

    def forward(self, img, mask=None):
        p = self.patch_size

        # Ensure the input tensor has the correct shape: (batch, channels, depth, height, width)
        x = rearrange(
            img, 'b c (x p1) (y p2) (z p3) -> b (x y z) (p1 p2 p3 c)', p1=p, p2=p, p3=p)
        x = self.patch_to_embedding(x)
        b, n, _ = x.shape

        cls_tokens = repeat(self.cls_token, '() n d -> b n d', b=b)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.pos_embedding[:, :(n + 1)]
        x = self.dropout(x)

        x = self.transformer(x, mask)

        x = x.mean(dim=1) if self.pool == 'mean' else x[:, 0]

        x = self.to_latent(x)
        return self.mlp_head(x)

In [None]:

# Evaluation Code
def evaluate(model, dataloader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    loss_sum = 0

    with torch.no_grad():
        for data, labels in dataloader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss_sum += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    avg_loss = loss_sum / len(dataloader)

    return accuracy, avg_loss


# Training and Evaluation Code
def train(model, dataloader, optimizer, criterion, device, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()  # Set the model to training mode
        running_loss = 0

        for data, labels in dataloader:
            data, labels = data.to(device), labels.to(device)

            optimizer.zero_grad()  # Zero gradients
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss.backward()  # Backpropagation
            optimizer.step()  # Update model weights

            running_loss += loss.item()

        avg_train_loss = running_loss / len(dataloader)

        # Evaluate after each epoch
        accuracy, avg_loss = evaluate(model, dataloader, criterion, device)

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Training Loss: {avg_train_loss:.4f}")
        print(f"Evaluation Accuracy: {accuracy:.2f}%")
        print(f"Evaluation Loss: {avg_loss:.4f}\n")

In [None]:
def evaluate(model, dataloader, criterion, device):
    model.eval()
    correct = 0
    total = 0
    loss_sum = 0

    with torch.no_grad():
        for data, labels in dataloader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss_sum += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    avg_loss = loss_sum / len(dataloader)
    return accuracy, avg_loss

# Updated Training Function
def train(model, train_loader, val_loader, optimizer, criterion, device, num_epochs=50):
    for epoch in range(num_epochs):
        model.train()
        correct_train = 0
        total_train = 0
        train_loss_sum = 0

        for data, labels in train_loader:
            data, labels = data.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss_sum += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        train_accuracy = 100 * correct_train / total_train
        avg_train_loss = train_loss_sum / len(train_loader)

        # Evaluate on validation set
        val_accuracy, val_loss = evaluate(model, val_loader, criterion, device)

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {avg_train_loss:.4f} | Train Acc: {train_accuracy:.2f}%")
        print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_accuracy:.2f}%\n")

In [None]:
from torch.utils.data import random_split, DataLoader
# Main Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset and DataLoader
root_dir = '/content/drive/MyDrive/Extracted_Regions'  # Replace with the actual path
regions = ['region_5','region_6','region_7','region_8','region_16','region_17','region_18']
transform = None

# dataset = BrainRegionDataset(root_dir=root_dir, regions=regions, transform=transform, target_size=(64, 64, 64))
# dataloader = DataLoader(dataset, batch_size=4, shuffle=True)
dataset_size = len(dataset)
train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
# Initialize the model
image_size = (64, 64, 64)  # Adjust according to your MRI scan size
patch_size = 4
num_classes = 2  # Healthy Control (0) or Parkinson's Disease (1)
dim = 64
depth = 4
heads = 4
mlp_dim = 128
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = ViT3D(image_size=image_size, patch_size=patch_size, num_classes=num_classes, dim=dim, depth=depth, heads=heads, mlp_dim=mlp_dim).to(device)

# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Train the model
train(model, train_loader, val_loader, optimizer, criterion, device, num_epochs=10)

In [None]:
import torch
import torch.nn as nn

class LightweightViT3D(nn.Module):
    def __init__(self, image_size=64, patch_size=4, num_classes=2, dim=64, depth=4, heads=4, mlp_dim=128, dropout=0.1):
        super(LightweightViT3D, self).__init__()

        assert all(i % patch_size == 0 for i in image_size), "Image dimensions must be divisible by patch size"
        num_patches_per_dim = [i // patch_size for i in image_size]
        self.num_patches = np.prod(num_patches_per_dim)  # total number of patches
        self.patch_dim = patch_size ** 3  # assuming one channel


        # Patch embedding via 3D Conv
        self.patch_embedding = nn.Conv3d(1, dim, kernel_size=patch_size, stride=patch_size)
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, dim))
        self.dropout = nn.Dropout(dropout)

        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=dim, nhead=heads, dim_feedforward=mlp_dim, dropout=dropout)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=depth)

        self.norm = nn.LayerNorm(dim)
        self.head = nn.Linear(dim, num_classes)

    def forward(self, x):
        x = self.patch_embedding(x)  # (B, dim, D', H', W')
        x = x.flatten(2).transpose(1, 2)  # (B, num_patches, dim)

        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)  # (B, 1, dim)
        x = torch.cat((cls_tokens, x), dim=1)
        x = x + self.pos_embedding[:, :x.size(1), :]
        x = self.dropout(x)

        x = self.transformer(x)
        x = self.norm(x[:, 0])  # Use CLS token
        return self.head(x)

In [None]:
from torch.utils.data import random_split, DataLoader
# Main Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset and DataLoader
root_dir = '/content/drive/MyDrive/Extracted_Regions'  # Replace with the actual path
regions = ['region_5','region_6','region_7','region_8','region_16','region_17','region_18']
transform = None

# Initialize the dataset
dataset = BrainRegionDataset(root_dir=root_dir, regions=regions, transform=transform, target_size=(64, 64, 64))

# Dataset split
dataset_size = len(dataset)
train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# DataLoader setup
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
# Initialize the model
image_size = (64, 64, 64)  # Adjust according to your MRI scan size
patch_size = 4
num_classes = 2  # Healthy Control (0) or Parkinson's Disease (1)
dim = 64
depth = 4
heads = 4
mlp_dim = 128
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = LightweightViT3D(image_size=image_size, patch_size=patch_size, num_classes=num_classes, dim=dim, depth=depth, heads=heads, mlp_dim=mlp_dim).to(device)

# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Train the model
train(model, train_loader, val_loader, optimizer, criterion, device, num_epochs=10)

In [None]:
from sklearn.utils.class_weight import compute_class_weight

# example:
weights = compute_class_weight('balanced', classes=[0,1], y=labels)
# then use in loss:
criterion = nn.CrossEntropyLoss(weight=torch.tensor(weights).float().to(device))

3d cnn

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Dataset class
class MultiRegionVoxelDataset(Dataset):
    def __init__(self, region_dirs):
        self.file_paths = []
        for region in region_dirs:
            files = [f for f in os.listdir(region) if f.endswith('.npy')]
            self.file_paths += [os.path.join(region, f) for f in files]

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
      path = self.file_paths[idx]
      voxel = np.load(path)

      # Get current shape
      x, y, z = voxel.shape

      # Crop if too big
      voxel = voxel[:64, :64, :64]

      # Pad if too small
      padded = np.zeros((64, 64, 64), dtype=np.float32)
      padded[:voxel.shape[0], :voxel.shape[1], :voxel.shape[2]] = voxel
      voxel = padded

      voxel = torch.tensor(voxel).unsqueeze(0)  # Add channel dim

      label = 0 if os.path.basename(path).startswith('hc') else 1
      return voxel, label

In [None]:
region_folders = [
    '/content/drive/MyDrive/Extracted_Regions/region_5',
    '/content/drive/MyDrive/Extracted_Regions/region_6',
    '/content/drive/MyDrive/Extracted_Regions/region_7',
    '/content/drive/MyDrive/Extracted_Regions/region_8',
    '/content/drive/MyDrive/Extracted_Regions/region_16',
    '/content/drive/MyDrive/Extracted_Regions/region_17',
    '/content/drive/MyDrive/Extracted_Regions/region_18',
]

dataset = MultiRegionVoxelDataset(region_folders)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

In [None]:
class Simple3DCNN(nn.Module):
    def __init__(self):
        super(Simple3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(1, 16, kernel_size=3, padding=1)
        self.pool = nn.MaxPool3d(2)
        self.conv2 = nn.Conv3d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv3d(32, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # [16, 32, 32, 32]
        x = self.pool(F.relu(self.conv2(x)))  # [32, 16, 16, 16]
        x = self.pool(F.relu(self.conv3(x)))  # [64, 8, 8, 8]
        x = x.view(x.size(0), -1)             # flatten
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
def train_eval(model, dataloader, optimizer, criterion, device, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(x)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        avg_train_loss = running_loss / len(dataloader)

        # Evaluation
        model.eval()
        correct, total, eval_loss = 0, 0, 0
        with torch.no_grad():
            for x, y in dataloader:
                x, y = x.to(device), y.to(device)
                output = model(x)
                loss = criterion(output, y)
                eval_loss += loss.item()
                pred = output.argmax(dim=1)
                correct += (pred == y).sum().item()
                total += y.size(0)

        eval_acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {avg_train_loss:.4f} | Eval Acc: {eval_acc:.2f}% | Eval Loss: {eval_loss / len(dataloader):.4f}")


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Root folder where all region folders are stored
root_dir = '/content/drive/MyDrive/Extracted_Regions'

# Automatically collect full paths of all subfolders inside root_dir
region_folders = [os.path.join(root_dir, d) for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]

# Now pass this list to the dataset
dataset = MultiRegionVoxelDataset(region_folders)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

model = Simple3DCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train_eval(model, dataloader, optimizer, criterion, device, num_epochs=10)


In [None]:
class Lightweight3DCNN(nn.Module):
    def __init__(self):
        super(Lightweight3DCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv3d(1, 16, kernel_size=3, stride=1, padding=1),  # (B, 16, 64, 64, 64)
            nn.BatchNorm3d(16),
            nn.ReLU(),
            nn.MaxPool3d(2),  # (B, 16, 32, 32, 32)

            nn.Conv3d(16, 32, kernel_size=3, stride=1, padding=1),  # (B, 32, 32, 32, 32)
            nn.BatchNorm3d(32),
            nn.ReLU(),
            nn.MaxPool3d(2),  # (B, 32, 16, 16, 16)

            nn.Conv3d(32, 64, kernel_size=3, stride=1, padding=1),  # (B, 64, 16, 16, 16)
            nn.BatchNorm3d(64),
            nn.ReLU(),
            nn.AdaptiveAvgPool3d((4, 4, 4))  # (B, 64, 4, 4, 4)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),  # (B, 64*4*4*4)
            nn.Linear(64 * 4 * 4 * 4, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 2)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Root folder where all region folders are stored
root_dir = '/content/drive/MyDrive/Extracted_Regions'

# Automatically collect full paths of all subfolders inside root_dir
region_folders = [os.path.join(root_dir, d) for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]

# Now pass this list to the dataset
dataset = MultiRegionVoxelDataset(region_folders)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

model = Lightweight3DCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train_eval(model, dataloader, optimizer, criterion, device, num_epochs=10)

In [None]:
from sklearn.utils.class_weight import compute_class_weight

# After loading your dataset
labels = [int(f.split('_')[-1].split('.')[0]) for f in all_file_names]
weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
weights = torch.tensor(weights, dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=weights)


#3d resnet model

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from sklearn.model_selection import train_test_split

In [None]:
# ✅ RUN THIS ONCE to preprocess and save resized volumes
def resize_volume(volume, target_shape=(64, 64, 64)):
    tensor = torch.tensor(volume, dtype=torch.float32).unsqueeze(0).unsqueeze(0)  # [1, 1, D, H, W]
    resized = F.interpolate(tensor, size=target_shape, mode='trilinear', align_corners=False)
    return resized.squeeze().numpy()

input_root = "/content/drive/MyDrive/Extracted_Regions"
output_root = "/content/drive/MyDrive/Resized_Regions"
os.makedirs(output_root, exist_ok=True)

region_names = ['region_5', 'region_6', 'region_7', 'region_8', 'region_16', 'region_17', 'region_18']

for region in region_names:
    in_path = os.path.join(input_root, region)
    out_path = os.path.join(output_root, region)
    os.makedirs(out_path, exist_ok=True)

    for fname in os.listdir(in_path):
        if fname.endswith(".npy"):
            vol = np.load(os.path.join(in_path, fname))
            resized = resize_volume(vol)
            np.save(os.path.join(out_path, fname), resized)

print("✅ Preprocessing done.")

In [None]:
from torch.utils.data import Subset
import torchvision.transforms as transforms

# You can add transformations like random rotations and flips
transform = transforms.Compose([
    transforms.RandomRotation(30),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
])

# Add it to your dataset:
class MultiRegionDataset(Dataset):
    def __init__(self, root_dir, region_names, transform=None):
        self.root_dir = root_dir
        self.region_names = region_names
        self.transform = transform
        self.samples = self._collect_samples()

    def __getitem__(self, idx):
        sample = self.samples[idx]
        regions = []
        for region in self.region_names:
            path = os.path.join(self.root_dir, region, sample["name"])
            vol = np.load(path)
            vol_tensor = torch.tensor(vol, dtype=torch.float32).unsqueeze(0)  # [1, D, H, W]
            regions.append(vol_tensor)
        x = torch.stack(regions)  # [R, 1, D, H, W]
        label = 0 if name.startswith('hc') else 1

        if self.transform:
            x = self.transform(x)

        return x, label

# Dataset
# class MultiRegionDataset(Dataset):
#     def __init__(self, root_dir, region_names):
#         self.root_dir = root_dir
#         self.region_names = region_names
#         self.samples = self._collect_samples()

#     def _collect_samples(self):
#         sample_names = os.listdir(os.path.join(self.root_dir, self.region_names[0]))
#         samples = []
#         for name in sample_names:
#             if name.endswith(".npy"):
#                 label = 0 if name.startswith('hc') else 1
#                 samples.append({"name": name, "label": label})
#         return samples

#     def __len__(self):
#         return len(self.samples)

#     def __getitem__(self, idx):
#         sample = self.samples[idx]
#         regions = []
#         for region in self.region_names:
#             path = os.path.join(self.root_dir, region, sample["name"])
#             vol = np.load(path)
#             vol_tensor = torch.tensor(vol, dtype=torch.float32).unsqueeze(0)  # [1, D, H, W]
#             regions.append(vol_tensor)
#         x = torch.stack(regions)  # [R, 1, D, H, W]
#         label = torch.tensor(sample["label"], dtype=torch.long)
#         return x, label

In [None]:
# Basic 3D ResNet block
class BasicBlock3D(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = nn.Conv3d(in_channels, out_channels, 3, padding=1)
        self.bn1 = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv3d(out_channels, out_channels, 3, padding=1)
        self.bn2 = nn.BatchNorm3d(out_channels)
        self.skip = nn.Conv3d(in_channels, out_channels, 1) if in_channels != out_channels else nn.Identity()

    def forward(self, x):
        identity = self.skip(x)
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        return self.relu(x + identity)

# Small ResNet Model for each region
class RegionResNet3D(nn.Module):
    def __init__(self):
        super().__init__()
        self.block1 = BasicBlock3D(1, 16)
        self.block2 = BasicBlock3D(16, 32)
        self.pool = nn.AdaptiveAvgPool3d(1)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.pool(x).flatten(1)
        return x  # shape: [B, 32]

In [None]:
# Main Ensemble Model
class MultiRegionResNetEnsemble(nn.Module):
    def __init__(self, num_regions):
        super().__init__()
        self.region_models = nn.ModuleList([RegionResNet3D() for _ in range(num_regions)])
        self.classifier = nn.Sequential(
            nn.Linear(32 * num_regions, 64),
            nn.ReLU(),
            nn.Linear(64, 2)
        )

    def forward(self, x):
        features = [self.region_models[i](x[:, i]) for i in range(x.size(1))]
        combined = torch.cat(features, dim=1)
        return self.classifier(combined)

In [None]:
# Training & Eval
def train(model, loader, optimizer, criterion, device):
    model.train()
    total_loss, correct = 0, 0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * x.size(0)
        correct += (out.argmax(1) == y).sum().item()
    return total_loss / len(loader.dataset), correct / len(loader.dataset)

def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss, correct = 0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y)
            total_loss += loss.item() * x.size(0)
            correct += (out.argmax(1) == y).sum().item()
    return total_loss / len(loader.dataset), correct / len(loader.dataset)

In [None]:
# Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
region_names = ['region_5', 'region_6', 'region_7', 'region_8', 'region_16', 'region_17', 'region_18']
dataset = MultiRegionDataset("/content/drive/MyDrive/Resized_Regions", region_names)

# Split
indices = list(range(len(dataset)))
train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=42)
train_loader = DataLoader(Subset(dataset, train_idx), batch_size=4, shuffle=True, num_workers=2)
val_loader = DataLoader(Subset(dataset, val_idx), batch_size=4, shuffle=False, num_workers=2)

# Model
model = MultiRegionResNetEnsemble(num_regions=len(region_names)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)  # Lower learning rate
criterion = nn.CrossEntropyLoss()

# Train
for epoch in range(10):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    print(f"Epoch {epoch+1} | Train Loss: {train_loss:.4f} | Acc: {train_acc*100:.2f}% | Val Loss: {val_loss:.4f} | Acc: {val_acc*100:.2f}%")

deeper resnet

In [None]:
pip install torchio


In [None]:
import torchio as tio

transform = tio.Compose([
    tio.RandomFlip(axes=(0, 1, 2)),
    tio.RandomAffine(scales=0.1, degrees=10),
    tio.RandomNoise(mean=0, std=0.01),
])

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset

class MultiRegionDataset(Dataset):
    def __init__(self, root_dir, region_names):
        self.region_names = region_names
        self.root_dir = root_dir
        self.data = {region: [] for region in region_names}
        self.labels = []

        # Assume all regions have same filenames, just in their respective folders
        filenames = sorted(os.listdir(os.path.join(root_dir, region_names[0])))

        for file in filenames:
            if not file.endswith(".npy"):
                continue

            # Label from filename
            if file.startswith("hc_hc"):
                label = 0
            elif file.startswith("pd_pd"):
                label = 1
            else:
                raise ValueError(f"Unknown label prefix in file: {file}")

            self.labels.append(label)

            for region in region_names:
                full_path = os.path.join(root_dir, region, file)
                self.data[region].append(full_path)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        sample = []
        for region in self.region_names:
            arr = np.load(self.data[region][idx])
            arr = torch.tensor(arr, dtype=torch.float32).unsqueeze(0)  # Shape: [1, D, H, W]
            sample.append(arr)
        x = torch.cat(sample, dim=0)  # Shape: [num_regions, D, H, W]
        y = torch.tensor(self.labels[idx], dtype=torch.long)
        return x, y

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset

class MultiRegionDataset(Dataset):
    def __init__(self, root_dir, region_names):
        self.region_names = region_names
        self.root_dir = root_dir
        self.data = {region: [] for region in region_names}
        self.labels = []

        # Assume all regions have same filenames, just in their respective folders
        filenames = sorted(os.listdir(os.path.join(root_dir, region_names[0])))

        for file in filenames:
            if not file.endswith(".npy"):
                continue

            # Label from filename
            if file.startswith("hc_hc"):
                label = 0
            elif file.startswith("pd_pd"):
                label = 1
            else:
                raise ValueError(f"Unknown label prefix in file: {file}")

            self.labels.append(label)

            for region in region_names:
                full_path = os.path.join(root_dir, region, file)
                self.data[region].append(full_path)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        sample = []
        for region in self.region_names:
            arr = np.load(self.data[region][idx])
            arr = torch.tensor(arr, dtype=torch.float32).unsqueeze(0)  # Shape: [1, D, H, W]
            sample.append(arr)
        x = torch.cat(sample, dim=0)  # Shape: [num_regions, D, H, W]
        y = torch.tensor(self.labels[idx], dtype=torch.long)
        return x, y

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# -------------------------------
# 3D Basic Residual Block
# -------------------------------
class BasicBlock3D(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock3D, self).__init__()
        self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm3d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv3d(planes, planes, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm3d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

# -------------------------------
# 3D ResNet Architecture
# -------------------------------
class ResNet3D(nn.Module):
    def __init__(self, block, layers, in_channels=1, num_classes=2):
        super(ResNet3D, self).__init__()
        self.inplanes = 64

        self.conv1 = nn.Conv3d(in_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm3d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        self.apply(self._weights_init)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv3d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm3d(planes * block.expansion),
            )

        layers = [block(self.inplanes, planes, stride, downsample)]
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def _weights_init(self, m):
        if isinstance(m, nn.Conv3d):
            nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        elif isinstance(m, nn.BatchNorm3d):
            nn.init.constant_(m.weight, 1)
            nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

In [None]:
# Training function
def train_model(model, train_loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
    return epoch_loss, epoch_acc

# Evaluation function
def evaluate_model(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    val_loss = running_loss / len(val_loader)
    val_acc = 100 * correct / total
    return val_loss, val_acc

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split

# Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# If using a single-region dataset like Brainstem only:
# region_names = ['region_5','region_6','region_7','region_8','region_16','region_17','region_18']  # or any one region, since you're not using ensemble now
region_names = ['region_8']
dataset = MultiRegionDataset("/content/drive/MyDrive/Resized_Regions", region_names)

# Split
indices = list(range(len(dataset)))
train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=42)
train_loader = DataLoader(Subset(dataset, train_idx), batch_size=4, shuffle=True, num_workers=2)
val_loader = DataLoader(Subset(dataset, val_idx), batch_size=4, shuffle=False, num_workers=2)
for x, y in train_loader:
    print(x.shape)  # Should be [batch_size, channels, D, H, W]
    break
# Model
def resnet18_3d(in_channels=1, num_classes=2):
    return ResNet3D(BasicBlock3D, [2, 2, 2, 2], in_channels=in_channels, num_classes=num_classes)
# Example: Single region
# Create model instance
model = resnet18_3d(in_channels=1, num_classes=2)
model = model.to('cuda' if torch.cuda.is_available() else 'cpu')

# Print model summary (optional)
from torchsummary import summary
summary(model, input_size=(1, 64, 64, 64))  # assuming single-channel 64³ input

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Training loop
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss, correct = 0.0, 0
    for inputs, labels in dataloader:
        inputs = inputs.unsqueeze(1)  # inputs is a tuple of 1 tensor (since len(region_names) = 1)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        preds = outputs.argmax(dim=1)
        correct += (preds == labels).sum().item()

    avg_loss = total_loss / len(dataloader.dataset)
    acc = correct / len(dataloader.dataset)
    return avg_loss, acc

# Evaluation loop
def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss, correct = 0.0, 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs[0].to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            total_loss += loss.item() * labels.size(0)
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()

    avg_loss = total_loss / len(dataloader.dataset)
    acc = correct / len(dataloader.dataset)
    return avg_loss, acc

# Train
for epoch in range(10):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    print(f"Epoch {epoch+1} | Train Loss: {train_loss:.4f} | Acc: {train_acc*100:.2f}% | Val Loss: {val_loss:.4f} | Acc: {val_acc*100:.2f}%")

#2d resnet model

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms

class MultiSlice2DDataset(Dataset):
    def __init__(self, root_dir, num_slices=5, max_files_per_class=20):
        self.samples = []
        self.num_slices = num_slices
        self.transform = transforms.Compose([
            transforms.ToTensor(),  # Converts to [C, H, W] (assuming single channel)
            transforms.Resize((224, 224)),  # Resize to 224x224
        ])

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')][:max_files_per_class]
        for file in hc_files:
            self.samples.append((file, 0))

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')][:max_files_per_class]
        for file in pd_files:
            self.samples.append((file, 1))

        self.slice_samples = []
        for file_path, label in self.samples:
            volume = np.load(file_path)
            mid = volume.shape[0] // 2
            half = num_slices // 2
            selected_slices = range(mid - half, mid + half + 1)
            for slice_idx in selected_slices:
                if 0 <= slice_idx < volume.shape[0]:
                    self.slice_samples.append((file_path, slice_idx, label))

    def __len__(self):
        return len(self.slice_samples)

    def __getitem__(self, idx):
        file_path, slice_idx, label = self.slice_samples[idx]
        volume = np.load(file_path)
        slice_2d = volume[slice_idx]

        # Normalize the slice between 0 and 1
        slice_2d = (slice_2d - np.min(slice_2d)) / (np.max(slice_2d) - np.min(slice_2d) + 1e-8)

        # Convert the 2D numpy slice to a PIL image for transformation
        slice_2d_pil = Image.fromarray(slice_2d)

        # Apply transformations
        slice_2d = self.transform(slice_2d_pil)  # [1, 224, 224]

        label = torch.tensor(label).long()
        return slice_2d, label

In [None]:
import torchvision.models as models
import torch.nn as nn

def get_resnet18_2d():
    model = models.resnet18(pretrained=False)

    # Modify the first convolution layer to accept 1-channel (grayscale) input
    model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

    # Modify the final fully connected layer to output 2 classes for binary classification
    model.fc = nn.Linear(model.fc.in_features, 2)

    return model

In [None]:
import torch
import torchvision.models as models
import torch.nn as nn
from torch.utils.data import DataLoader, random_split

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load dataset
dataset = MultiSlice2DDataset('/content/drive/MyDrive/3D_Numpy_Volumes')
train_len = int(0.8 * len(dataset))
val_len = len(dataset) - train_len
train_set, val_set = random_split(dataset, [train_len, val_len])
train_loader = DataLoader(train_set, batch_size=4, shuffle=True)
val_loader = DataLoader(val_set, batch_size=4, shuffle=False)

# Initialize model
model = get_resnet18_2d().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Training loop
def train(model, dataloader):
    model.train()
    total_loss, correct = 0, 0
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * y.size(0)
        correct += (out.argmax(1) == y).sum().item()
    return total_loss / len(dataloader.dataset), correct / len(dataloader.dataset)

# Evaluation loop
def evaluate(model, dataloader):
    model.eval()
    total_loss, correct = 0, 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y)
            total_loss += loss.item() * y.size(0)
            correct += (out.argmax(1) == y).sum().item()
    return total_loss / len(dataloader.dataset), correct / len(dataloader.dataset)

# Run training
for epoch in range(20):
    train_loss, train_acc = train(model, train_loader)
    val_loss, val_acc = evaluate(model, val_loader)
    print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Acc={train_acc*100:.2f}% | Val Loss={val_loss:.4f}, Acc={val_acc*100:.2f}%")

#3d mid brain region

In [None]:
class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(30, 30, 30), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)

        # Step 2: Extract 3D midbrain region
        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        # Ensure midbrain_region is 3D (if it has a single channel, don't squeeze it)
        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        # Step 3: Resize the extracted region to a fixed size
        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        # Step 4: Normalize and apply transformations
        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)  # Normalize to [0, 1]
        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)  # Add channel dimension (1, 64, 64, 64)

        if self.transform:
            resized_region = self.transform(resized_region)

        label = torch.tensor(label).long()
        return resized_region, label

In [None]:
# Step 2: Define the 3D ResNet model
class ResNet3D(nn.Module):
    def __init__(self, num_classes=2):
        super(ResNet3D, self).__init__()
        model = models.resnet18(pretrained=False)
        model.conv1 = nn.Conv3d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)  # 3D convolution
        model.fc = nn.Linear(model.fc.in_features, num_classes)
        self.model = model

    def forward(self, x):
        return self.model(x)


In [None]:
# Step 3: Define the 3D ViT model
class ViT3D(nn.Module):
    def __init__(self, num_classes=2):
        super(ViT3D, self).__init__()
        # You can implement a 3D Vision Transformer model here or use a pre-existing one
        # You may need a library like `timm` for ViT models if available
        pass

    def forward(self, x):
        # Implement the forward pass for 3D Vision Transformer
        pass

In [None]:
# Step 4: Train and evaluate the model
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss, correct = 0, 0
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * y.size(0)
        correct += (out.argmax(1) == y).sum().item()
    return total_loss / len(dataloader.dataset), correct / len(dataloader.dataset)


def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss, correct = 0, 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y)
            total_loss += loss.item() * y.size(0)
            correct += (out.argmax(1) == y).sum().item()
    return total_loss / len(dataloader.dataset), correct / len(dataloader.dataset)

In [None]:
# Step 5: Set up data loaders and training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
root_dir = "/content/drive/MyDrive/3D_Numpy_Volumes"  # Replace with your dataset path
midbrain_center = (100, 120, 150)  # Replace with actual center coordinates of the midbrain
transform = transforms.Compose([transforms.Resize((64, 64))])

# Initialize dataset and dataloaders
dataset = BrainRegionDataset(root_dir, midbrain_center, region_size=(30, 30, 30), transform=transform)
train_len = int(0.8 * len(dataset))
val_len = len(dataset) - train_len
train_set, val_set = torch.utils.data.random_split(dataset, [train_len, val_len])

train_loader = DataLoader(train_set, batch_size=4, shuffle=True)
val_loader = DataLoader(val_set, batch_size=4, shuffle=False)

# Initialize model, optimizer, and loss function
model = ResNet3D(num_classes=2).to(device)  # Change to ViT3D(num_classes=2) if using ViT
optimizer = Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Step 6: Training and evaluation loop
for epoch in range(20):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Acc={train_acc*100:.2f}% | Val Loss={val_loss:.4f}, Acc={val_acc*100:.2f}%")


In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os

import numpy as np
import torch
import scipy.ndimage

import numpy as np
import torch
import scipy.ndimage

from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(30, 30, 30), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        # Extract a 3D region based on the given center and size
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        # Step 1: Ensure volume is 3D (if it's 4D, remove the channel dimension)
        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present (e.g., shape: (1, d, h, w) -> (d, h, w))

        # Step 2: Extract 3D midbrain region (ensure it's 3D)
        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        # Ensure midbrain_region is 3D (should not be 4D)
        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        # Step 3: Resize the extracted region to a fixed size (64x64x64)
        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        # Ensure resized_region is 3D after resizing
        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        # Step 4: Normalize to range [0, 1]
        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)  # Normalize to [0, 1]

        # Step 5: Add channel dimension (1, 64, 64, 64)
        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)  # Add the channel dimension

        # Apply any additional transformations (e.g., data augmentation)
        if self.transform:
            resized_region = self.transform(resized_region)

        # Convert label to tensor
        label = torch.tensor(label).long()

        return resized_region, label

# Modify the first convolution layer of the model to accept 1 input channel
class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()
        # Access and modify the first convolution layer (conv1) inside the BasicStem
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model

    def forward(self, x):
        return self.model(x)

# Load a pre-trained 3D ResNet model
original_model = models.video.r3d_18(pretrained=True)
model = CustomResNet3D(original_model)

# Move the model to the appropriate device (e.g., CUDA if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Initialize dataset and dataloaders
# Initialize your dataset
root_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'
train_set = BrainRegionDataset(root_dir=root_dir, midbrain_center=(64, 64, 64))

# Split the dataset into training and validation sets (80% train, 20% validation)
train_indices, val_indices = train_test_split(range(len(train_set)), test_size=0.2, random_state=42)

# Create training and validation subsets using the indices
train_subset = Subset(train_set, train_indices)
val_subset = Subset(train_set, val_indices)

# Define the batch size
batch_size = 4

# Create data loaders for training and validation
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

# Check the dataset sizes
print(f"Training set size: {len(train_loader.dataset)}")
print(f"Validation set size: {len(val_loader.dataset)}")

# Training loop
# def train(model, dataloader, criterion, optimizer, device):
#     model.train()
#     total_loss, correct = 0, 0
#     for x, y in dataloader:
#         x, y = x.to(device), y.to(device)
#         optimizer.zero_grad()
#         out = model(x)
#         loss = criterion(out, y)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item() * y.size(0)
#         correct += (out.argmax(1) == y).sum().item()
#     return total_loss / len(dataloader.dataset), correct / len(dataloader.dataset)

# # Run training
# for epoch in range(20):
#     train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
#     print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Acc={train_acc*100:.2f}%")

def train_and_validate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=20):
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss, train_correct = 0.0, 0
        train_total = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            train_correct += (predicted == labels).sum().item()
            train_total += labels.size(0)

        train_loss /= train_total
        train_accuracy = train_correct / train_total

        # Validation phase
        model.eval()  # Set the model to evaluation mode
        val_loss, val_correct = 0.0, 0
        val_total = 0
        with torch.no_grad():  # Disable gradient calculation for validation
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)

        val_loss /= val_total
        val_accuracy = val_correct / val_total

        # Print metrics for this epoch
        print(f"Epoch {epoch+1}/{num_epochs}, "
              f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy*100:.2f}%, "
              f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy*100:.2f}%")

train_and_validate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=20)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet50
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os

import numpy as np
import torch
import scipy.ndimage

import numpy as np
import torch
import scipy.ndimage

from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(30, 30, 30), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        # Extract a 3D region based on the given center and size
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        # Step 1: Ensure volume is 3D (if it's 4D, remove the channel dimension)
        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present (e.g., shape: (1, d, h, w) -> (d, h, w))

        # Step 2: Extract 3D midbrain region (ensure it's 3D)
        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        # Ensure midbrain_region is 3D (should not be 4D)
        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        # Step 3: Resize the extracted region to a fixed size (64x64x64)
        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        # Ensure resized_region is 3D after resizing
        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        # Step 4: Normalize to range [0, 1]
        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)  # Normalize to [0, 1]

        # Step 5: Add channel dimension (1, 64, 64, 64)
        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)  # Add the channel dimension

        # Apply any additional transformations (e.g., data augmentation)
        if self.transform:
            resized_region = self.transform(resized_region)

        # Convert label to tensor
        label = torch.tensor(label).long()

        return resized_region, label

# Modify the first convolution layer of the model to accept 1 input channel
import torch
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights

class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()

        # Modify the first convolution layer (conv1) to use Conv3d
        original_model.conv1 = nn.Conv3d(1, original_model.conv1.out_channels,
                                         kernel_size=(7, 7, 7), stride=(2, 2, 2), padding=(3, 3, 3), bias=False)

        # Modify the max pooling layer to handle 3D input
        original_model.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=(2, 2, 2), padding=(1, 1, 1))

        # Replace BatchNorm2d with BatchNorm3d for all batchnorm layers in the model
        for name, module in original_model.named_modules():
            if isinstance(module, nn.BatchNorm2d):
                in_channels = module.num_features
                # Create a new BatchNorm3d and assign it
                batch_norm_3d = nn.BatchNorm3d(in_channels)
                setattr(original_model, name, batch_norm_3d)

        # Now assign the model
        self.model = original_model

    def forward(self, x):
        return self.model(x)

# Move the model to the appropriate device (e.g., CUDA if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)  # Adding weight decay
criterion = nn.CrossEntropyLoss()

# Initialize dataset and dataloaders
# Initialize your dataset
root_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'
train_set = BrainRegionDataset(root_dir=root_dir, midbrain_center=(64, 64, 64))

# Split the dataset into training and validation sets (80% train, 20% validation)
train_indices, val_indices = train_test_split(range(len(train_set)), test_size=0.2, random_state=42)

# Create training and validation subsets using the indices
train_subset = Subset(train_set, train_indices)
val_subset = Subset(train_set, val_indices)

# Define the batch size
batch_size = 4

# Create data loaders for training and validation
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

# Check the dataset sizes
print(f"Training set size: {len(train_loader.dataset)}")
print(f"Validation set size: {len(val_loader.dataset)}")

import torch
import torch.nn as nn
from sklearn.model_selection import KFold
from torch.utils.data import DataLoader, Subset
from torchvision.models import resnet50  # Example of using ResNet50, you can choose others like resnet101

# Assuming BrainRegionDataset is your custom dataset
# Assuming CustomResNet3D is your modified model class

def k_fold_validation(model_class, dataset, k=5, num_epochs=20, batch_size=4, device='cuda'):
    kfold = KFold(n_splits=k, shuffle=True)

    # Prepare an array to store results
    fold_accuracies = []

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Training fold {fold + 1}/{k}...")

        # Create train and validation subsets
        train_subsampler = Subset(dataset, train_idx)
        val_subsampler = Subset(dataset, val_idx)

        # DataLoaders for train and validation sets
        train_loader = DataLoader(train_subsampler, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_subsampler, batch_size=batch_size, shuffle=False)

        # Initialize model
        original_model = resnet50(pretrained=True)  # You can use resnet34, resnet101, resnet152, etc.
        model = model_class(original_model).to(device)  # Custom ResNet3D model

        # Loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

        # Training and validation loop
        for epoch in range(num_epochs):
            model.train()  # Set the model to training mode
            train_loss, train_correct = 0.0, 0
            train_total = 0

            # Training phase
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                train_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                train_correct += (predicted == labels).sum().item()
                train_total += labels.size(0)

            # Calculate training metrics
            train_loss /= train_total
            train_accuracy = train_correct / train_total

            # Validation phase
            model.eval()  # Set the model to evaluation mode
            val_loss, val_correct = 0.0, 0
            val_total = 0

            with torch.no_grad():  # Disable gradient calculation during validation
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)

                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    val_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    val_correct += (predicted == labels).sum().item()
                    val_total += labels.size(0)

            # Calculate validation metrics
            val_loss /= val_total
            val_accuracy = val_correct / val_total

            print(f"Epoch {epoch + 1}/{num_epochs}, "
                  f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy*100:.2f}%, "
                  f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy*100:.2f}%")

        # Store the accuracy for each fold
        fold_accuracies.append(val_accuracy)

    # Calculate average validation accuracy across all folds
    avg_accuracy = sum(fold_accuracies) / k
    print(f"Average Validation Accuracy across {k} folds: {avg_accuracy * 100:.2f}%")

# Example usage
k_fold_validation(CustomResNet3D, train_set, k=5, num_epochs=20, batch_size=4, device='cuda')

# def train_and_validate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=20):
#     for epoch in range(num_epochs):
#         # Training phase
#         model.train()
#         train_loss, train_correct = 0.0, 0
#         train_total = 0
#         for inputs, labels in train_loader:
#             inputs, labels = inputs.to(device), labels.to(device)

#             optimizer.zero_grad()
#             outputs = model(inputs)
#             loss = criterion(outputs, labels)
#             loss.backward()
#             optimizer.step()

#             train_loss += loss.item() * inputs.size(0)
#             _, predicted = torch.max(outputs, 1)
#             train_correct += (predicted == labels).sum().item()
#             train_total += labels.size(0)

#         train_loss /= train_total
#         train_accuracy = train_correct / train_total

#         # Validation phase
#         model.eval()  # Set the model to evaluation mode
#         val_loss, val_correct = 0.0, 0
#         val_total = 0
#         with torch.no_grad():  # Disable gradient calculation for validation
#             for inputs, labels in val_loader:
#                 inputs, labels = inputs.to(device), labels.to(device)

#                 outputs = model(inputs)
#                 loss = criterion(outputs, labels)

#                 val_loss += loss.item() * inputs.size(0)
#                 _, predicted = torch.max(outputs, 1)
#                 val_correct += (predicted == labels).sum().item()
#                 val_total += labels.size(0)

#         val_loss /= val_total
#         val_accuracy = val_correct / val_total

#         # Print metrics for this epoch
#         print(f"Epoch {epoch+1}/{num_epochs}, "
#               f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy*100:.2f}%, "
#               f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy*100:.2f}%")

# train_and_validate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=20)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights
import numpy as np
import scipy.ndimage
import os
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset

# Dataset class
class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(30, 30, 30), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        # Extract a 3D region based on the given center and size
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        # Ensure volume is 3D (if it's 4D, remove the channel dimension)
        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present (e.g., shape: (1, d, h, w) -> (d, h, w))

        # Extract 3D midbrain region
        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        # Resize the extracted region to a fixed size (64x64x64)
        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        # Normalize to range [0, 1]
        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)

        # Add channel dimension (1, 64, 64, 64)
        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)  # Add the channel dimension

        # Apply any additional transformations (e.g., data augmentation)
        if self.transform:
            resized_region = self.transform(resized_region)

        # Convert label to tensor
        label = torch.tensor(label).long()

        return resized_region, label

import torch
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights

# Custom ResNet3D Model Class
class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()

        # Modify the first convolution layer (conv1) to use Conv3d
        self.conv1 = nn.Conv3d(1, original_model.conv1.out_channels,
                               kernel_size=(7, 7, 7), stride=(2, 2, 2), padding=(3, 3, 3), bias=False)

        # BatchNorm3d should now expect 5D input (batch_size, channels, depth, height, width)
        self.bn1 = original_model.bn1
        self.relu = original_model.relu
        self.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=(2, 2, 2), padding=(1, 1, 1))

        # Rest of the layers
        self.layer1 = original_model.layer1
        self.layer2 = original_model.layer2
        self.layer3 = original_model.layer3
        self.layer4 = original_model.layer4

        # Final fully connected layer
        self.fc = original_model.fc

    def forward(self, x):
        # Ensure the input is 5D: (batch_size, 1, depth, height, width)
        # Conv3D and BatchNorm3D expect this shape
        x = self.conv1(x)  # (batch_size, channels, depth, height, width)
        x = self.bn1(x)  # (batch_size, channels, depth, height, width)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Example usage:
# Load ResNet50 with pre-trained weights
original_model = resnet50(weights=ResNet50_Weights.DEFAULT)

# Create the custom 3D model
model = CustomResNet3D(original_model).to('cuda')  # Move to device (e.g., 'cuda')

# Check the model
print(model)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)  # Adding weight decay
criterion = nn.CrossEntropyLoss()

# Dataset initialization
root_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'
train_set = BrainRegionDataset(root_dir=root_dir, midbrain_center=(64, 64, 64))

# Split dataset into training and validation sets (80% train, 20% validation)
train_indices, val_indices = train_test_split(range(len(train_set)), test_size=0.2, random_state=42)
train_subset = Subset(train_set, train_indices)
val_subset = Subset(train_set, val_indices)

# DataLoader initialization
batch_size = 4
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

# Check the dataset sizes
print(f"Training set size: {len(train_loader.dataset)}")
print(f"Validation set size: {len(val_loader.dataset)}")

# Training loop
def train(model, train_loader, val_loader, criterion, optimizer, num_epochs=20):
    for epoch in range(num_epochs):
        model.train()
        train_loss, train_correct = 0.0, 0
        train_total = 0

        # Training phase
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            train_correct += (predicted == labels).sum().item()
            train_total += labels.size(0)

        train_loss /= train_total
        train_accuracy = train_correct / train_total

        # Validation phase
        model.eval()
        val_loss, val_correct = 0.0, 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)

        val_loss /= val_total
        val_accuracy = val_correct / val_total

        print(f"Epoch {epoch + 1}/{num_epochs}, "
              f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy*100:.2f}%, "
              f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy*100:.2f}%")

# Train the model
train(model, train_loader, val_loader, criterion, optimizer, num_epochs=20)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torchvision import models

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(30, 30, 30), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        # Extract a 3D region based on the given center and size
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        # Step 1: Ensure volume is 3D (if it's 4D, remove the channel dimension)
        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present (e.g., shape: (1, d, h, w) -> (d, h, w))

        # Step 2: Extract 3D midbrain region (ensure it's 3D)
        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        # Ensure midbrain_region is 3D (should not be 4D)
        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        # Step 3: Resize the extracted region to a fixed size (64x64x64)
        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        # Ensure resized_region is 3D after resizing
        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        # Step 4: Normalize to range [0, 1]
        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)  # Normalize to [0, 1]

        # Step 5: Add channel dimension (1, 64, 64, 64)
        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)  # Add the channel dimension

        # Apply any additional transformations (e.g., data augmentation)
        if self.transform:
            resized_region = self.transform(resized_region)

        # Convert label to tensor
        label = torch.tensor(label).long()

        return resized_region, label

# Modify the first convolution layer of the model to accept 1 input channel
class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()
        # Access and modify the first convolution layer (conv1) inside the BasicStem
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model

    def forward(self, x):
        return self.model(x)

# Load a pre-trained 3D ResNet model
original_model = models.video.r3d_18(pretrained=True)
model = CustomResNet3D(original_model)

# Move the model to the appropriate device (e.g., CUDA if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# K-Fold Cross Validation
from sklearn.model_selection import KFold

def k_fold_train_and_validate(model_class, dataset, k=5, num_epochs=20):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    results = []

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Training fold {fold+1}/{k}...")

        # Create train and validation subsets
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        # DataLoader for training and validation
        train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

        # Initialize the model
        model = model_class(original_model)
        model.to(device)

        # Training and validation loop
        for epoch in range(num_epochs):
            model.train()
            train_loss, train_correct = 0.0, 0
            train_total = 0
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                train_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                train_correct += (predicted == labels).sum().item()
                train_total += labels.size(0)

            train_loss /= train_total
            train_accuracy = train_correct / train_total

            # Validation phase
            model.eval()
            val_loss, val_correct = 0.0, 0
            val_total = 0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    val_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    val_correct += (predicted == labels).sum().item()
                    val_total += labels.size(0)

            val_loss /= val_total
            val_accuracy = val_correct / val_total

            print(f"Epoch {epoch+1}/{num_epochs}, "
                  f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy*100:.2f}%, "
                  f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy*100:.2f}%")

        results.append((train_loss, train_accuracy, val_loss, val_accuracy))

    return results

# Run K-Fold cross-validation
k = 5
results = k_fold_train_and_validate(CustomResNet3D, dataset, k=k, num_epochs=20)
print("K-Fold Results:", results)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torchvision import models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelBinarizer

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(30, 30, 30), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present

        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)

        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

        if self.transform:
            resized_region = self.transform(resized_region)

        label = torch.tensor(label).long()

        return resized_region, label

class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model

    def forward(self, x):
        return self.model(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pre-trained model
original_model = models.video.r3d_18(pretrained=True)
model = CustomResNet3D(original_model)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

def k_fold_train_and_validate(model_class, dataset, k=5, num_epochs=20):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    results = []

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Training fold {fold+1}/{k}...")

        # Create train and validation subsets
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        # DataLoader for training and validation
        train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

        # Initialize the model
        model = model_class(original_model)
        model.to(device)

        # Initialize accumulators for metrics
        total_train_loss, total_train_correct = 0.0, 0
        total_train_samples = 0
        total_val_loss, total_val_correct = 0.0, 0
        total_val_samples = 0

        # Training and validation loop
        for epoch in range(num_epochs):
            model.train()
            epoch_train_loss, epoch_train_correct = 0.0, 0
            epoch_train_samples = 0
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                epoch_train_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                epoch_train_correct += (predicted == labels).sum().item()
                epoch_train_samples += labels.size(0)

            total_train_loss += epoch_train_loss
            total_train_correct += epoch_train_correct
            total_train_samples += epoch_train_samples

            # Validation phase
            model.eval()
            epoch_val_loss, epoch_val_correct = 0.0, 0
            epoch_val_samples = 0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    epoch_val_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    epoch_val_correct += (predicted == labels).sum().item()
                    epoch_val_samples += labels.size(0)

            total_val_loss += epoch_val_loss
            total_val_correct += epoch_val_correct
            total_val_samples += epoch_val_samples

        # Compute final metrics after all epochs
        avg_train_loss = total_train_loss / total_train_samples
        avg_train_accuracy = total_train_correct / total_train_samples * 100
        avg_val_loss = total_val_loss / total_val_samples
        avg_val_accuracy = total_val_correct / total_val_samples * 100

        print(f"Fold {fold+1} results: ")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_accuracy:.2f}%")
        print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {avg_val_accuracy:.2f}%")

        results.append((avg_train_loss, avg_train_accuracy, avg_val_loss, avg_val_accuracy))

    return results

# Run K-Fold cross-validation
data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'  # Replace with your actual data directory

# Assuming you have the center of the midbrain defined, for example:
midbrain_center = (64,64,64)  # Replace with your actual midbrain center coordinates

# Create the dataset instance
dataset = BrainRegionDataset(root_dir=data_dir, midbrain_center=midbrain_center)

# Set k for K-Fold cross-validation
k = 5

# Run K-Fold cross-validation with the custom model
results = k_fold_train_and_validate(CustomResNet3D, dataset, k=k, num_epochs=20)


In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torchvision import models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelBinarizer
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(30, 30, 30), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present

        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)

        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

        if self.transform:
            resized_region = self.transform(resized_region)

        label = torch.tensor(label).long()

        return resized_region, label

class CustomResNet3D(nn.Module):
    def __init__(self, original_model, dropout_rate=0.5):
        super(CustomResNet3D, self).__init__()
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model
        self.dropout = nn.Dropout3d(p=dropout_rate)

    def forward(self, x):
      x = self.model(x)
      x = self.dropout(x)
      return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pre-trained model
original_model = models.video.r3d_18(pretrained=True)
model = CustomResNet3D(original_model)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

def k_fold_train_and_validate(model_class, dataset, k=5, num_epochs=20, patience=5):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    results = []

    early_stopping = EarlyStopping(patience=patience)

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Training fold {fold+1}/{k}...")

        # Create train and validation subsets
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        # DataLoader for training and validation
        train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

        # Initialize the model
        model = model_class(original_model)
        model.to(device)

        # Initialize optimizer, loss function, and scheduler
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        scheduler = ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)  # Learning rate scheduler
        criterion = nn.CrossEntropyLoss()

        # Initialize accumulators for metrics
        total_train_loss, total_train_correct = 0.0, 0
        total_train_samples = 0
        total_val_loss, total_val_correct = 0.0, 0
        total_val_samples = 0

        # Initialize lists to accumulate true labels and predictions for metrics
        all_train_labels = []
        all_train_preds = []
        all_val_labels = []
        all_val_preds = []

        # Training and validation loop
        for epoch in range(num_epochs):
            model.train()
            epoch_train_loss, epoch_train_correct = 0.0, 0
            epoch_train_samples = 0
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                epoch_train_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                epoch_train_correct += (predicted == labels).sum().item()
                epoch_train_samples += labels.size(0)

                all_train_labels.extend(labels.cpu().numpy())
                all_train_preds.extend(predicted.cpu().numpy())

            total_train_loss += epoch_train_loss
            total_train_correct += epoch_train_correct
            total_train_samples += epoch_train_samples

            # Validation phase
            model.eval()
            epoch_val_loss, epoch_val_correct = 0.0, 0
            epoch_val_samples = 0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    epoch_val_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    epoch_val_correct += (predicted == labels).sum().item()
                    epoch_val_samples += labels.size(0)

                    all_val_labels.extend(labels.cpu().numpy())
                    all_val_preds.extend(predicted.cpu().numpy())

            total_val_loss += epoch_val_loss
            total_val_correct += epoch_val_correct
            total_val_samples += epoch_val_samples

            # Early stopping check
            early_stopping(total_val_loss / total_val_samples)

            if early_stopping.early_stop:
                print(f"Early stopping at epoch {epoch+1}")
                break

            # Learning rate scheduler step
            scheduler.step(total_val_loss / total_val_samples)

        # Compute final metrics after all epochs
        avg_train_loss = total_train_loss / total_train_samples
        avg_train_accuracy = total_train_correct / total_train_samples * 100
        avg_val_loss = total_val_loss / total_val_samples
        avg_val_accuracy = total_val_correct / total_val_samples * 100

        # Calculate additional metrics (precision, recall, F1, etc.)
        train_precision = precision_score(all_train_labels, all_train_preds, average='macro')
        train_recall = recall_score(all_train_labels, all_train_preds, average='macro')
        train_f1 = f1_score(all_train_labels, all_train_preds, average='macro')
        val_precision = precision_score(all_val_labels, all_val_preds, average='macro')
        val_recall = recall_score(all_val_labels, all_val_preds, average='macro')
        val_f1 = f1_score(all_val_labels, all_val_preds, average='macro')

        print(f"Fold {fold+1} results: ")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_accuracy:.2f}%")
        print(f"Train Precision: {train_precision:.4f}, Train Recall: {train_recall:.4f}, Train F1: {train_f1:.4f}")
        print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {avg_val_accuracy:.2f}%")
        print(f"Validation Precision: {val_precision:.4f}, Validation Recall: {val_recall:.4f}, Validation F1: {val_f1:.4f}")

        # Store the metrics for later analysis
        results.append({
            'train_loss': avg_train_loss,
            'train_accuracy': avg_train_accuracy,
            'train_precision': train_precision,
            'train_recall': train_recall,
            'train_f1': train_f1,
            'val_loss': avg_val_loss,
            'val_accuracy': avg_val_accuracy,
            'val_precision': val_precision,
            'val_recall': val_recall,
            'val_f1': val_f1
        })

    return results

# Run K-Fold cross-validation with early stopping, dropout, and learning rate scheduler
data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'  # Replace with your actual data directory
midbrain_center = (64, 64, 64)  # Replace with your actual midbrain center coordinates
dataset = BrainRegionDataset(root_dir=data_dir, midbrain_center=midbrain_center)

k = 5
results = k_fold_train_and_validate(CustomResNet3D, dataset, k=k, num_epochs=50, patience=5)

# Print the final K-Fold results
print("K-Fold Results:", results)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torchvision import models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelBinarizer

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(50, 50, 50), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present

        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)

        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

        if self.transform:
            resized_region = self.transform(resized_region)

        label = torch.tensor(label).long()

        return resized_region, label

class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model

    def forward(self, x):
        return self.model(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pre-trained model
original_model = models.video.r3d_18(pretrained=True)
model = CustomResNet3D(original_model)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

def k_fold_train_and_validate(model_class, dataset, k=5, num_epochs=20):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    results = []

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Training fold {fold+1}/{k}...")

        # Create train and validation subsets
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        # DataLoader for training and validation
        train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

        # Initialize the model
        model = model_class(original_model)
        model.to(device)

        # Initialize accumulators for metrics
        total_train_loss, total_train_correct = 0.0, 0
        total_train_samples = 0
        total_val_loss, total_val_correct = 0.0, 0
        total_val_samples = 0

        # Training and validation loop
        for epoch in range(num_epochs):
            model.train()
            epoch_train_loss, epoch_train_correct = 0.0, 0
            epoch_train_samples = 0
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                epoch_train_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                epoch_train_correct += (predicted == labels).sum().item()
                epoch_train_samples += labels.size(0)

            total_train_loss += epoch_train_loss
            total_train_correct += epoch_train_correct
            total_train_samples += epoch_train_samples

            # Validation phase
            model.eval()
            epoch_val_loss, epoch_val_correct = 0.0, 0
            epoch_val_samples = 0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    epoch_val_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    epoch_val_correct += (predicted == labels).sum().item()
                    epoch_val_samples += labels.size(0)

            total_val_loss += epoch_val_loss
            total_val_correct += epoch_val_correct
            total_val_samples += epoch_val_samples

        # Compute final metrics after all epochs
        avg_train_loss = total_train_loss / total_train_samples
        avg_train_accuracy = total_train_correct / total_train_samples * 100
        avg_val_loss = total_val_loss / total_val_samples
        avg_val_accuracy = total_val_correct / total_val_samples * 100

        print(f"Fold {fold+1} results: ")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_accuracy:.2f}%")
        print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {avg_val_accuracy:.2f}%")

        results.append((avg_train_loss, avg_train_accuracy, avg_val_loss, avg_val_accuracy))

    return results

# Run K-Fold cross-validation
data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'  # Replace with your actual data directory

# Assuming you have the center of the midbrain defined, for example:
midbrain_center = (64,64,64)  # Replace with your actual midbrain center coordinates

# Create the dataset instance
dataset = BrainRegionDataset(root_dir=data_dir, midbrain_center=midbrain_center)

# Set k for K-Fold cross-validation
k = 5

# Run K-Fold cross-validation with the custom model
results = k_fold_train_and_validate(CustomResNet3D, dataset, k=k, num_epochs=25)

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torchvision import models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelBinarizer
from torch.utils.data import Dataset

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(50, 50, 50), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)

        if volume.ndim == 4:
            volume = volume.squeeze(0)

        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)

        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

        if self.transform:
            resized_region = self.transform(resized_region)

        label = torch.tensor(label).long()

        return resized_region, label

class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model
        # Add Dropout after the stem
        self.dropout = nn.Dropout3d(p=0.3)

    def forward(self, x):
        x = self.model.stem(x)
        x = self.dropout(x)   # Apply dropout early
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)
        x = x.flatten(1)
        x = self.model.fc(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pre-trained model
original_model = models.video.r3d_18(pretrained=True)

def k_fold_train_and_validate(model_class, dataset, k=5, num_epochs=20, patience=5):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    results = []

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Training fold {fold+1}/{k}...")

        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

        model = model_class(models.video.r3d_18(pretrained=True))
        model.to(device)

        # Apply weight decay (L2 regularization)
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
        criterion = nn.CrossEntropyLoss()

        best_val_loss = np.inf
        early_stop_counter = 0

        for epoch in range(num_epochs):
            model.train()
            epoch_train_loss, epoch_train_correct = 0.0, 0
            epoch_train_samples = 0
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                epoch_train_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                epoch_train_correct += (predicted == labels).sum().item()
                epoch_train_samples += labels.size(0)

            model.eval()
            epoch_val_loss, epoch_val_correct = 0.0, 0
            epoch_val_samples = 0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    epoch_val_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    epoch_val_correct += (predicted == labels).sum().item()
                    epoch_val_samples += labels.size(0)

            avg_train_loss = epoch_train_loss / epoch_train_samples
            avg_train_accuracy = epoch_train_correct / epoch_train_samples * 100
            avg_val_loss = epoch_val_loss / epoch_val_samples
            avg_val_accuracy = epoch_val_correct / epoch_val_samples * 100

            print(f"Epoch [{epoch+1}/{num_epochs}] Train Loss: {avg_train_loss:.4f} | Train Acc: {avg_train_accuracy:.2f}% | Val Loss: {avg_val_loss:.4f} | Val Acc: {avg_val_accuracy:.2f}%")

            # Early Stopping Check
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                early_stop_counter = 0
            else:
                early_stop_counter += 1
                if early_stop_counter >= patience:
                    print("Early stopping triggered.")
                    break

        results.append((avg_train_loss, avg_train_accuracy, avg_val_loss, avg_val_accuracy))

    return results

# Example usage:
data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'
midbrain_center = (64, 64, 64)

dataset = BrainRegionDataset(root_dir=data_dir, midbrain_center=midbrain_center)

k = 5
results = k_fold_train_and_validate(CustomResNet3D, dataset, k=k, num_epochs=35, patience=5)

In [None]:
import torch
import torch.nn as nn
from torchvision import models
from torch.utils.data import Dataset, DataLoader, Subset
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import KFold
import random

# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(64, 64, 64), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]

        # Shuffle the samples (to prevent any loading bias)
        self.samples = [(f, 0) for f in hc_files] + [(f, 1) for f in pd_files]
        random.shuffle(self.samples)

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)

        if volume.ndim == 4:
            volume = volume.squeeze(0)

        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)
        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)
        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

        if self.transform:
            resized_region = self.transform(resized_region)

        label = torch.tensor(label).long()
        return resized_region, label

class CustomResNet3D(nn.Module):
    def __init__(self, original_model):
        super(CustomResNet3D, self).__init__()
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model
        self.dropout = nn.Dropout3d(p=0.3)

    def forward(self, x):
        x = self.model.stem(x)
        x = self.dropout(x)
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)
        x = x.flatten(1)
        x = self.model.fc(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def k_fold_train_and_validate(model_class, dataset, k=5, num_epochs=20, patience=5):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    results = []

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"\n--- Training Fold {fold+1}/{k} ---")

        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

        model = model_class(models.video.r3d_18(pretrained=True)).to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)
        criterion = nn.CrossEntropyLoss()

        best_val_loss = np.inf
        early_stop_counter = 0

        for epoch in range(num_epochs):
            model.train()
            running_train_loss, train_correct, train_total = 0.0, 0, 0

            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_train_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                train_correct += (preds == labels).sum().item()
                train_total += labels.size(0)

            train_loss = running_train_loss / train_total
            train_acc = train_correct / train_total * 100

            model.eval()
            running_val_loss, val_correct, val_total = 0.0, 0, 0

            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    running_val_loss += loss.item() * inputs.size(0)
                    _, preds = torch.max(outputs, 1)
                    val_correct += (preds == labels).sum().item()
                    val_total += labels.size(0)

            val_loss = running_val_loss / val_total
            val_acc = val_correct / val_total * 100

            scheduler.step(val_loss)

            print(f"Epoch [{epoch+1}/{num_epochs}] | Train Loss: {train_loss:.4f}, Acc: {train_acc:.2f}% | Val Loss: {val_loss:.4f}, Acc: {val_acc:.2f}%")

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model_wts = model.state_dict().copy()
                early_stop_counter = 0
            else:
                early_stop_counter += 1
                if early_stop_counter >= patience:
                    print(f"Early stopping triggered at epoch {epoch+1}")
                    break

        model.load_state_dict(best_model_wts)
        results.append((train_loss, train_acc, val_loss, val_acc))

    return results

# Example usage:
data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'
midbrain_center = (64, 64, 64)

dataset = BrainRegionDataset(root_dir=data_dir, midbrain_center=midbrain_center)

k = 5
results = k_fold_train_and_validate(CustomResNet3D, dataset, k=k, num_epochs=35, patience=5)


In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from torchvision import models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.preprocessing import LabelBinarizer
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(64, 64, 64), transform=None):
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        for file in hc_files:
            self.samples.append((file, 0))  # Label 0 for Healthy controls

        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]
        for file in pd_files:
            self.samples.append((file, 1))  # Label 1 for Parkinson's patients

    def extract_midbrain_region(self, volume, center, size=(30, 30, 30)):
        x_center, y_center, z_center = center
        d, h, w = size
        x_start, y_start, z_start = max(x_center - d//2, 0), max(y_center - h//2, 0), max(z_center - w//2, 0)
        x_end, y_end, z_end = min(x_center + d//2, volume.shape[0]), min(y_center + h//2, volume.shape[1]), min(z_center + w//2, volume.shape[2])
        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]
        volume = np.load(file_path)  # Load the 3D MRI volume

        if volume.ndim == 4:
            volume = volume.squeeze(0)  # Drop the channel dimension if present

        midbrain_region = self.extract_midbrain_region(volume, self.midbrain_center, self.region_size)

        if midbrain_region.ndim != 3:
            raise ValueError(f"Expected a 3D array, but got {midbrain_region.ndim}D")

        resized_region = scipy.ndimage.zoom(midbrain_region, np.array((64, 64, 64)) / np.array(midbrain_region.shape), order=1)

        if resized_region.ndim != 3:
            raise ValueError(f"Expected a 3D array after resizing, but got {resized_region.ndim}D")

        resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region) + 1e-8)

        resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

        if self.transform:
            resized_region = self.transform(resized_region)

        label = torch.tensor(label).long()

        return resized_region, label

class CustomResNet3D(nn.Module):
    def __init__(self, original_model, dropout_rate=0.5):
        super(CustomResNet3D, self).__init__()
        original_model.stem[0] = nn.Conv3d(1, original_model.stem[0].out_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.model = original_model
        self.dropout = nn.Dropout3d(p=dropout_rate)

    def forward(self, x):
      x = self.model(x)
      x = self.dropout(x)
      return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pre-trained model
original_model = models.video.r3d_18(pretrained=True)
model = CustomResNet3D(original_model)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

def k_fold_train_and_validate(model_class, dataset, k=5, num_epochs=20, patience=5):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    results = []

    early_stopping = EarlyStopping(patience=patience)

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Training fold {fold+1}/{k}...")

        # Create train and validation subsets
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        # DataLoader for training and validation
        train_loader = DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=4, shuffle=False)

        # Initialize the model
        model = model_class(original_model)
        model.to(device)

        # Initialize optimizer, loss function, and scheduler
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        scheduler = ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)  # Learning rate scheduler
        criterion = nn.CrossEntropyLoss()

        # Initialize accumulators for metrics
        total_train_loss, total_train_correct = 0.0, 0
        total_train_samples = 0
        total_val_loss, total_val_correct = 0.0, 0
        total_val_samples = 0

        # Initialize lists to accumulate true labels and predictions for metrics
        all_train_labels = []
        all_train_preds = []
        all_val_labels = []
        all_val_preds = []

        # Training and validation loop
        for epoch in range(num_epochs):
            model.train()
            epoch_train_loss, epoch_train_correct = 0.0, 0
            epoch_train_samples = 0
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                epoch_train_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                epoch_train_correct += (predicted == labels).sum().item()
                epoch_train_samples += labels.size(0)

                all_train_labels.extend(labels.cpu().numpy())
                all_train_preds.extend(predicted.cpu().numpy())

            total_train_loss += epoch_train_loss
            total_train_correct += epoch_train_correct
            total_train_samples += epoch_train_samples

            # Validation phase
            model.eval()
            epoch_val_loss, epoch_val_correct = 0.0, 0
            epoch_val_samples = 0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    epoch_val_loss += loss.item() * inputs.size(0)
                    _, predicted = torch.max(outputs, 1)
                    epoch_val_correct += (predicted == labels).sum().item()
                    epoch_val_samples += labels.size(0)

                    all_val_labels.extend(labels.cpu().numpy())
                    all_val_preds.extend(predicted.cpu().numpy())

            total_val_loss += epoch_val_loss
            total_val_correct += epoch_val_correct
            total_val_samples += epoch_val_samples

            # Early stopping check
            early_stopping(total_val_loss / total_val_samples)

            if early_stopping.early_stop:
                print(f"Early stopping at epoch {epoch+1}")
                break

            # Learning rate scheduler step
            scheduler.step(total_val_loss / total_val_samples)

        # Compute final metrics after all epochs
        avg_train_loss = total_train_loss / total_train_samples
        avg_train_accuracy = total_train_correct / total_train_samples * 100
        avg_val_loss = total_val_loss / total_val_samples
        avg_val_accuracy = total_val_correct / total_val_samples * 100

        # Calculate additional metrics (precision, recall, F1, etc.)
        train_precision = precision_score(all_train_labels, all_train_preds, average='binary')
        train_recall = recall_score(all_train_labels, all_train_preds, average='binary')
        train_f1 = f1_score(all_train_labels, all_train_preds, average='binary')
        val_precision = precision_score(all_val_labels, all_val_preds, average='binary')
        val_recall = recall_score(all_val_labels, all_val_preds, average='binary')
        val_f1 = f1_score(all_val_labels, all_val_preds, average='binary')

        print(f"Fold {fold+1} results: ")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_accuracy:.2f}%")
        print(f"Train Precision: {train_precision:.4f}, Train Recall: {train_recall:.4f}, Train F1: {train_f1:.4f}")
        print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {avg_val_accuracy:.2f}%")
        print(f"Validation Precision: {val_precision:.4f}, Validation Recall: {val_recall:.4f}, Validation F1: {val_f1:.4f}")

        # Store the metrics for later analysis
        results.append({
            'train_loss': avg_train_loss,
            'train_accuracy': avg_train_accuracy,
            'train_precision': train_precision,
            'train_recall': train_recall,
            'train_f1': train_f1,
            'val_loss': avg_val_loss,
            'val_accuracy': avg_val_accuracy,
            'val_precision': val_precision,
            'val_recall': val_recall,
            'val_f1': val_f1
        })

    return results

# Run K-Fold cross-validation with early stopping, dropout, and learning rate scheduler
data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'  # Replace with your actual data directory
midbrain_center = (64, 64, 64)  # Replace with your actual midbrain center coordinates
dataset = BrainRegionDataset(root_dir=data_dir, midbrain_center=midbrain_center)

k = 5
results = k_fold_train_and_validate(CustomResNet3D, dataset, k=k, num_epochs=25, patience=5)

# Print the final K-Fold results
print("K-Fold Results:", results)

In [None]:
import numpy as np

# Replace with your fold results:
fold_accuracies = [88.89, 100, 77.78, 77.78, 75.00]
fold_losses = [1.0395, 0.1230, 1.6371, 1.3380, 0.8132]

mean_accuracy = np.mean(fold_accuracies)
std_accuracy = np.std(fold_accuracies)

mean_loss = np.mean(fold_losses)
std_loss = np.std(fold_losses)

print(f"Mean Validation Accuracy: {mean_accuracy:.4f}")
print(f"Std Dev of Validation Accuracy: {std_accuracy:.4f}")
print(f"Mean Validation Loss: {mean_loss:.4f}")
print(f"Std Dev of Validation Loss: {std_loss:.4f}")

#finding the mid brain region and estimating coordinated for substantia nigra

In [None]:
import numpy as np

# Load the .npy file
volume = np.load("/content/drive/MyDrive/mri_hc/hc_subject1.npy")  # should be shape like (H, W, D)
print(volume.shape)

standard mni coordinates=(10,15,10) hence they can be assumed as (x,y,z)=(101,94,81) based on mni dimensions of (182,218,182)

In [None]:
def scale_coords(mni_coord, mni_shape, target_shape):
    return tuple(
        int(mni_coord[i] * target_shape[i] / mni_shape[i])
        for i in range(3)
    )

mni_shape = (182, 218, 182)          # standard MNI shape (X, Y, Z)
target_shape = (256, 256, 170)[::-1]  # (Z, Y, X) → flip to (X, Y, Z)

mni_coord = (101, 94, 81)  # approximate SN center in MNI space
scaled_center = scale_coords(mni_coord, mni_shape, target_shape)
print("Approx SN center in your volume:", scaled_center[::-1])  # convert back to (Z, Y, X)

In [None]:
import numpy as np

volume = np.load("/content/drive/MyDrive/mri_hc/hc_subject1.npy")
z, y, x = scaled_center[::-1]  # since your volume is (Z, Y, X)
half = 32

patch = volume[
    z - half:z + half,
    y - half:y + half,
    x - half:x + half
]

print("Patch shape:", patch.shape)  # should be (64, 64, 64)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Load the patch if not already loaded
# patch = np.load("your_64x64x64_patch.npy")  # or use the patch variable from before

# Get center indices
center = patch.shape[0] // 2  # 32 for 64x64x64

# Plot central slices
fig, axes = plt.subplots(1, 3, figsize=(12, 4))

# Axial (Z)
axes[0].imshow(patch[center, :, :], cmap='gray')
axes[0].set_title("Axial Slice (Z)")

# Coronal (Y)
axes[1].imshow(patch[:, center, :], cmap='gray')
axes[1].set_title("Coronal Slice (Y)")

# Sagittal (X)
axes[2].imshow(patch[:, :, center], cmap='gray')
axes[2].set_title("Sagittal Slice (X)")

for ax in axes:
    ax.axis("off")

plt.tight_layout()
plt.show()

In [None]:
z, y, x = 85,130,110  # Midbrain region
half = 32

patch = volume[
    z - half:z + half,
    y - half:y + half,
    x - half:x + half
]

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Load the patch if not already loaded
# patch = np.load("your_64x64x64_patch.npy")  # or use the patch variable from before

# Get center indices
center = patch.shape[0] // 2  # 32 for 64x64x64

# Plot central slices
fig, axes = plt.subplots(1, 3, figsize=(12, 4))

# Axial (Z)
axes[0].imshow(patch[center, :, :], cmap='gray')
axes[0].set_title("Axial Slice (Z)")

# Coronal (Y)
axes[1].imshow(patch[:, center, :], cmap='gray')
axes[1].set_title("Coronal Slice (Y)")

# Sagittal (X)
axes[2].imshow(patch[:, :, center], cmap='gray')
axes[2].set_title("Sagittal Slice (X)")

for ax in axes:
    ax.axis("off")

plt.tight_layout()
plt.show()

In [None]:
import numpy as np

# Load the .npy file
volume = np.load("/content/drive/MyDrive/mri_pd/pd_subject1.npy")  # should be shape like (H, W, D)
volume=volume[0]
print(volume.shape)

In [None]:
z,y,x=85,130,110
half=32
patch = volume[
    z - half:z + half,
    y - half:y + half,
    x - half:x + half
]

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Load the patch if not already loaded
# patch = np.load("your_64x64x64_patch.npy")  # or use the patch variable from before

# Get center indices
center = patch.shape[0] // 2  # 32 for 64x64x64

# Plot central slices
fig, axes = plt.subplots(1, 3, figsize=(12, 4))

# Axial (Z)
axes[0].imshow(patch[center, :, :], cmap='gray')
axes[0].set_title("Axial Slice (Z)")

# Coronal (Y)
axes[1].imshow(patch[:, center, :], cmap='gray')
axes[1].set_title("Coronal Slice (Y)")

# Sagittal (X)
axes[2].imshow(patch[:, :, center], cmap='gray')
axes[2].set_title("Sagittal Slice (X)")

for ax in axes:
    ax.axis("off")

plt.tight_layout()
plt.show()

In [None]:
import torch
import torch.nn as nn
from torchvision import models
from torch.utils.data import Dataset, DataLoader, Subset
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import StratifiedKFold
import random
import matplotlib.pyplot as plt

# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(64, 64, 64), transform=None):
        """
        Dataset for brain region extraction focused on the substantia nigra.

        Args:
            root_dir: Directory containing 'hc' and 'pd' subdirectories
            midbrain_center: Coordinates of the midbrain center (x, y, z)
            region_size: Size of the region to extract
            transform: Optional transforms to apply
        """
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        # Check if directories exist
        if not os.path.exists(hc_folder) or not os.path.exists(pd_folder):
            raise FileNotFoundError(f"HC or PD folder not found in {root_dir}")

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]

        print(f"Found {len(hc_files)} healthy control samples and {len(pd_files)} Parkinson's disease samples")

        # Shuffle the samples
        self.samples = [(f, 0) for f in hc_files] + [(f, 1) for f in pd_files]
        random.shuffle(self.samples)

    def extract_midbrain_region(self, volume, center=None, size=None):
        """
        Extract region around substantia nigra from a brain volume.

        Args:
            volume: 3D numpy array of brain MRI
            center: Center coordinates (x, y, z), defaults to self.midbrain_center
            size: Size of region to extract (d, h, w), defaults to self.region_size

        Returns:
            3D numpy array of extracted region
        """
        if center is None:
            center = self.midbrain_center
        if size is None:
            size = self.region_size

        x_center, y_center, z_center = center
        d, h, w = size

        # Ensure center coordinates are within volume boundaries
        x_center = min(max(x_center, d//2), volume.shape[0] - d//2)
        y_center = min(max(y_center, h//2), volume.shape[1] - h//2)
        z_center = min(max(z_center, w//2), volume.shape[2] - w//2)

        x_start, y_start, z_start = x_center - d//2, y_center - h//2, z_center - w//2
        x_end, y_end, z_end = x_start + d, y_start + h, z_start + w

        # Double-check boundaries
        x_start, y_start, z_start = max(x_start, 0), max(y_start, 0), max(z_start, 0)
        x_end = min(x_end, volume.shape[0])
        y_end = min(y_end, volume.shape[1])
        z_end = min(z_end, volume.shape[2])

        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]

        try:
            volume = np.load(file_path)

            # Handle different volume formats
            if volume.ndim == 4 and volume.shape[0] == 1:
                volume = volume.squeeze(0)  # Remove batch dimension if present
            elif volume.ndim > 3:
                raise ValueError(f"Unexpected volume shape: {volume.shape}")

            # Extract region around substantia nigra
            midbrain_region = self.extract_midbrain_region(volume)

            # Check if region was extracted properly
            if 0 in midbrain_region.shape:
                raise ValueError(f"Extracted region has invalid shape: {midbrain_region.shape}")

            # Resize to consistent dimensions
            target_shape = (64, 64, 64)
            scale_factors = np.array(target_shape) / np.array(midbrain_region.shape)
            resized_region = scipy.ndimage.zoom(midbrain_region, scale_factors, order=1)

            # Normalize to [0, 1]
            if np.max(resized_region) > np.min(resized_region):
                resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region))

            # Convert to tensor with channel dimension
            resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

            if self.transform:
                resized_region = self.transform(resized_region)

            return resized_region, torch.tensor(label, dtype=torch.long)

        except Exception as e:
            print(f"Error loading {file_path}: {str(e)}")
            # Return a placeholder and keep track of errors
            # In a production system, you'd want better error handling
            placeholder = torch.zeros((1, 64, 64, 64), dtype=torch.float32)
            return placeholder, torch.tensor(label, dtype=torch.long)


class CustomResNet3D(nn.Module):
    def __init__(self, pretrained=True, num_classes=2):
        """
        3D ResNet model adapted for brain MRI classification.

        Args:
            pretrained: Whether to use pretrained weights
            num_classes: Number of output classes
        """
        super(CustomResNet3D, self).__init__()

        # Start with pretrained R3D_18 model
        base_model = models.video.r3d_18(pretrained=pretrained)

        # Modify first conv layer to accept single-channel input
        base_model.stem[0] = nn.Conv3d(
            1,
            base_model.stem[0].out_channels,
            kernel_size=7,
            stride=2,
            padding=3,
            bias=False
        )

        # Modify final FC layer for binary classification
        num_ftrs = base_model.fc.in_features
        base_model.fc = nn.Linear(num_ftrs, num_classes)

        self.model = base_model

        # Add regularization
        self.dropout = nn.Dropout3d(p=0.3)

    def forward(self, x):
        x = self.model.stem(x)
        x = self.dropout(x)
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)
        x = x.flatten(1)
        x = self.model.fc(x)
        return x


def train_epoch(model, train_loader, criterion, optimizer, device):
    """Run one training epoch"""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Statistics
        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    return running_loss / total, correct / total


def validate(model, val_loader, criterion, device):
    """Validate the model"""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # Save predictions and labels for metrics
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return running_loss / total, correct / total, np.array(all_preds), np.array(all_labels)


def k_fold_train_and_validate(dataset, k=5, num_epochs=20, batch_size=4,
                             learning_rate=1e-4, weight_decay=1e-4, patience=5):
    """
    Train and validate using k-fold cross validation.

    Args:
        dataset: BrainRegionDataset instance
        k: Number of folds
        num_epochs: Maximum number of training epochs
        batch_size: Batch size for training
        learning_rate: Initial learning rate
        weight_decay: L2 regularization strength
        patience: Early stopping patience

    Returns:
        Dictionary of results for each fold
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Extract labels for stratified splitting
    labels = [label for _, label in dataset.samples]

    # Initialize StratifiedKFold
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)

    # Store results
    results = {
        'train_losses': [],
        'train_accs': [],
        'val_losses': [],
        'val_accs': [],
        'best_models': []
    }

    # Class weights for imbalanced data
    class_counts = np.bincount(labels)
    class_weights = torch.tensor(len(labels) / (len(class_counts) * class_counts), dtype=torch.float32)

    # Train and validate for each fold
    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(labels)), labels)):
        print(f"\n{'='*40}\nTraining Fold {fold+1}/{k}\n{'='*40}")

        # Create data subsets and loaders
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True,
                                 num_workers=4, pin_memory=True)
        val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False,
                               num_workers=4, pin_memory=True)

        # Initialize model, optimizer, and criterion
        model = CustomResNet3D(pretrained=True).to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=2, verbose=True
        )

        # Use weighted loss for imbalanced data
        criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))

        # Track best model and metrics
        best_val_loss = float('inf')
        early_stop_counter = 0

        # Training history
        fold_train_losses = []
        fold_train_accs = []
        fold_val_losses = []
        fold_val_accs = []

        # Training loop
        for epoch in range(num_epochs):
            # Train
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            fold_train_losses.append(train_loss)
            fold_train_accs.append(train_acc)

            # Validate
            val_loss, val_acc, val_preds, val_labels = validate(model, val_loader, criterion, device)
            fold_val_losses.append(val_loss)
            fold_val_accs.append(val_acc)

            # Update learning rate
            scheduler.step(val_loss)

            # Print metrics
            print(f"Epoch [{epoch+1}/{num_epochs}] | "
                  f"Train Loss: {train_loss:.4f}, Acc: {train_acc*100:.2f}% | "
                  f"Val Loss: {val_loss:.4f}, Acc: {val_acc*100:.2f}%")

            # Check if this is the best model
            if val_loss < best_val_loss:
                print(f"Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}")
                best_val_loss = val_loss
                best_model_wts = model.state_dict().copy()
                early_stop_counter = 0
            else:
                early_stop_counter += 1
                print(f"Validation loss did not improve. Early stopping counter: {early_stop_counter}/{patience}")
                if early_stop_counter >= patience:
                    print(f"Early stopping triggered after epoch {epoch+1}")
                    break

        # Load best model weights
        model.load_state_dict(best_model_wts)

        # Final validation with best model
        final_val_loss, final_val_acc, final_preds, final_labels = validate(model, val_loader, criterion, device)
        print(f"\nFinal validation - Loss: {final_val_loss:.4f}, Acc: {final_val_acc*100:.2f}%")

        # Save fold results
        results['train_losses'].append(fold_train_losses)
        results['train_accs'].append(fold_train_accs)
        results['val_losses'].append(fold_val_losses)
        results['val_accs'].append(fold_val_accs)
        results['best_models'].append(model.state_dict().copy())

        # Save model for this fold
        torch.save({
            'fold': fold,
            'model_state_dict': model.state_dict(),
            'val_acc': final_val_acc,
            'val_loss': final_val_loss
        }, f'best_model_fold_{fold}.pth')

    # Calculate average metrics across folds
    avg_val_acc = np.mean([results['val_accs'][i][-1] for i in range(k)])
    print(f"\nAverage validation accuracy across {k} folds: {avg_val_acc*100:.2f}%")

    return results


def visualize_region(dataset, idx=0, slice_dim=0, slice_pos=32):
    """
    Visualize a brain region from the dataset.

    Args:
        dataset: BrainRegionDataset instance
        idx: Index of sample to visualize
        slice_dim: Dimension to slice (0=sagittal, 1=coronal, 2=axial)
        slice_pos: Position of slice
    """
    volume, label = dataset[idx]
    volume = volume.squeeze(0).numpy()  # Remove channel dimension

    class_name = "Parkinson's Disease" if label.item() == 1 else "Healthy Control"

    # Get a slice
    if slice_dim == 0:
        slice_name = "Sagittal"
        img_slice = volume[slice_pos, :, :]
    elif slice_dim == 1:
        slice_name = "Coronal"
        img_slice = volume[:, slice_pos, :]
    else:
        slice_name = "Axial"
        img_slice = volume[:, :, slice_pos]

    plt.figure(figsize=(6, 6))
    plt.imshow(img_slice, cmap='gray')
    plt.title(f"{slice_name} Slice - {class_name}")
    plt.colorbar()
    plt.tight_layout()
    plt.show()


# Example usage
if __name__ == "__main__":
    # Set data directory and substantia nigra center coordinates
    data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'

    # These should be the coordinates of the substantia nigra in your dataset
    # You might need to adjust these based on your specific dataset
    substantia_nigra_center = (96, 128, 128)  # Example coordinates (x, y, z)
    region_size = (64, 64, 64)  # Size of region to extract

    # Create dataset
    dataset = BrainRegionDataset(
        root_dir=data_dir,
        midbrain_center=substantia_nigra_center,
        region_size=region_size
    )

    # Visualize a few samples to verify
    for i in range(min(3, len(dataset))):
        print(f"\nVisualizing sample {i}")
        # View axial slice
        visualize_region(dataset, idx=i, slice_dim=2, slice_pos=32)

    # Set training parameters
    params = {
        'k': 5,
        'num_epochs': 35,
        'batch_size': 4,
        'learning_rate': 1e-4,
        'weight_decay': 1e-4,
        'patience': 5
    }

    # Run k-fold cross-validation
    results = k_fold_train_and_validate(dataset, **params)

    # Plot learning curves
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    for fold in range(params['k']):
        plt.plot(results['train_losses'][fold], label=f'Fold {fold+1} Train')
        plt.plot(results['val_losses'][fold], label=f'Fold {fold+1} Val')
    plt.title('Loss vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    for fold in range(params['k']):
        plt.plot(np.array(results['train_accs'][fold])*100, label=f'Fold {fold+1} Train')
        plt.plot(np.array(results['val_accs'][fold])*100, label=f'Fold {fold+1} Val')
    plt.title('Accuracy vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
import torch
import torch.nn as nn
from torchvision import models
from torch.utils.data import Dataset, DataLoader, Subset
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import StratifiedKFold
import random
import matplotlib.pyplot as plt

# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(64, 64, 64), transform=None):
        """
        Dataset for brain region extraction focused on the substantia nigra.

        Args:
            root_dir: Directory containing 'hc' and 'pd' subdirectories
            midbrain_center: Coordinates of the midbrain center (x, y, z)
            region_size: Size of the region to extract
            transform: Optional transforms to apply
        """
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        # Check if directories exist
        if not os.path.exists(hc_folder) or not os.path.exists(pd_folder):
            raise FileNotFoundError(f"HC or PD folder not found in {root_dir}")

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]

        print(f"Found {len(hc_files)} healthy control samples and {len(pd_files)} Parkinson's disease samples")

        # Shuffle the samples
        self.samples = [(f, 0) for f in hc_files] + [(f, 1) for f in pd_files]
        random.shuffle(self.samples)

    def extract_midbrain_region(self, volume, center=None, size=None):
        """
        Extract region around substantia nigra from a brain volume.

        Args:
            volume: 3D numpy array of brain MRI
            center: Center coordinates (x, y, z), defaults to self.midbrain_center
            size: Size of region to extract (d, h, w), defaults to self.region_size

        Returns:
            3D numpy array of extracted region
        """
        if center is None:
            center = self.midbrain_center
        if size is None:
            size = self.region_size

        x_center, y_center, z_center = center
        d, h, w = size

        # Ensure center coordinates are within volume boundaries
        x_center = min(max(x_center, d//2), volume.shape[0] - d//2)
        y_center = min(max(y_center, h//2), volume.shape[1] - h//2)
        z_center = min(max(z_center, w//2), volume.shape[2] - w//2)

        x_start, y_start, z_start = x_center - d//2, y_center - h//2, z_center - w//2
        x_end, y_end, z_end = x_start + d, y_start + h, z_start + w

        # Double-check boundaries
        x_start, y_start, z_start = max(x_start, 0), max(y_start, 0), max(z_start, 0)
        x_end = min(x_end, volume.shape[0])
        y_end = min(y_end, volume.shape[1])
        z_end = min(z_end, volume.shape[2])

        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]

        try:
            volume = np.load(file_path)

            # Handle different volume formats
            if volume.ndim == 4 and volume.shape[0] == 1:
                volume = volume.squeeze(0)  # Remove batch dimension if present
            elif volume.ndim > 3:
                raise ValueError(f"Unexpected volume shape: {volume.shape}")

            # Extract region around substantia nigra
            midbrain_region = self.extract_midbrain_region(volume)

            # Check if region was extracted properly
            if 0 in midbrain_region.shape:
                raise ValueError(f"Extracted region has invalid shape: {midbrain_region.shape}")

            # Resize to consistent dimensions
            target_shape = (64, 64, 64)
            scale_factors = np.array(target_shape) / np.array(midbrain_region.shape)
            resized_region = scipy.ndimage.zoom(midbrain_region, scale_factors, order=1)

            # Normalize to [0, 1]
            if np.max(resized_region) > np.min(resized_region):
                resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region))

            # Convert to tensor with channel dimension
            resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

            if self.transform:
                resized_region = self.transform(resized_region)

            return resized_region, torch.tensor(label, dtype=torch.long)

        except Exception as e:
            print(f"Error loading {file_path}: {str(e)}")
            # Return a placeholder and keep track of errors
            # In a production system, you'd want better error handling
            placeholder = torch.zeros((1, 64, 64, 64), dtype=torch.float32)
            return placeholder, torch.tensor(label, dtype=torch.long)


class CustomResNet3D(nn.Module):
    def __init__(self, pretrained=True, num_classes=2):
        """
        3D ResNet model adapted for brain MRI classification.

        Args:
            pretrained: Whether to use pretrained weights
            num_classes: Number of output classes
        """
        super(CustomResNet3D, self).__init__()

        # Start with pretrained R3D_18 model
        base_model = models.video.r3d_18(pretrained=pretrained)

        # Modify first conv layer to accept single-channel input
        base_model.stem[0] = nn.Conv3d(
            1,
            base_model.stem[0].out_channels,
            kernel_size=7,
            stride=2,
            padding=3,
            bias=False
        )

        # Modify final FC layer for binary classification
        num_ftrs = base_model.fc.in_features
        base_model.fc = nn.Linear(num_ftrs, num_classes)

        self.model = base_model

        # Add regularization
        self.dropout = nn.Dropout3d(p=0.3)

    def forward(self, x):
        x = self.model.stem(x)
        x = self.dropout(x)
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)
        x = x.flatten(1)
        x = self.model.fc(x)
        return x


def train_epoch(model, train_loader, criterion, optimizer, device):
    """Run one training epoch"""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Statistics
        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    return running_loss / total, correct / total


def validate(model, val_loader, criterion, device):
    """Validate the model"""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # Save predictions and labels for metrics
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return running_loss / total, correct / total, np.array(all_preds), np.array(all_labels)


def k_fold_train_and_validate(dataset, k=5, num_epochs=20, batch_size=4,
                             learning_rate=1e-4, weight_decay=1e-4, patience=5):
    """
    Train and validate using k-fold cross validation.

    Args:
        dataset: BrainRegionDataset instance
        k: Number of folds
        num_epochs: Maximum number of training epochs
        batch_size: Batch size for training
        learning_rate: Initial learning rate
        weight_decay: L2 regularization strength
        patience: Early stopping patience

    Returns:
        Dictionary of results for each fold
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Extract labels for stratified splitting
    labels = [label for _, label in dataset.samples]

    # Initialize StratifiedKFold
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)

    # Store results
    results = {
        'train_losses': [],
        'train_accs': [],
        'val_losses': [],
        'val_accs': [],
        'best_models': []
    }

    # Class weights for imbalanced data
    class_counts = np.bincount(labels)
    class_weights = torch.tensor(len(labels) / (len(class_counts) * class_counts), dtype=torch.float32)

    # Train and validate for each fold
    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(labels)), labels)):
        print(f"\n{'='*40}\nTraining Fold {fold+1}/{k}\n{'='*40}")

        # Create data subsets and loaders
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True,
                                 num_workers=4, pin_memory=True)
        val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False,
                               num_workers=4, pin_memory=True)

        # Initialize model, optimizer, and criterion
        model = CustomResNet3D(pretrained=True).to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=2, verbose=True
        )

        # Use weighted loss for imbalanced data
        criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))

        # Track best model and metrics
        best_val_loss = float('inf')
        early_stop_counter = 0

        # Training history
        fold_train_losses = []
        fold_train_accs = []
        fold_val_losses = []
        fold_val_accs = []

        # Training loop
        for epoch in range(num_epochs):
            # Train
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            fold_train_losses.append(train_loss)
            fold_train_accs.append(train_acc)

            # Validate
            val_loss, val_acc, val_preds, val_labels = validate(model, val_loader, criterion, device)
            fold_val_losses.append(val_loss)
            fold_val_accs.append(val_acc)

            # Update learning rate
            scheduler.step(val_loss)

            # Print metrics
            print(f"Epoch [{epoch+1}/{num_epochs}] | "
                  f"Train Loss: {train_loss:.4f}, Acc: {train_acc*100:.2f}% | "
                  f"Val Loss: {val_loss:.4f}, Acc: {val_acc*100:.2f}%")

            # Check if this is the best model
            if val_loss < best_val_loss:
                print(f"Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}")
                best_val_loss = val_loss
                best_model_wts = model.state_dict().copy()
                early_stop_counter = 0
            else:
                early_stop_counter += 1
                print(f"Validation loss did not improve. Early stopping counter: {early_stop_counter}/{patience}")
                if early_stop_counter >= patience:
                    print(f"Early stopping triggered after epoch {epoch+1}")
                    break

        # Load best model weights
        model.load_state_dict(best_model_wts)

        # Final validation with best model
        final_val_loss, final_val_acc, final_preds, final_labels = validate(model, val_loader, criterion, device)
        print(f"\nFinal validation - Loss: {final_val_loss:.4f}, Acc: {final_val_acc*100:.2f}%")

        # Save fold results
        results['train_losses'].append(fold_train_losses)
        results['train_accs'].append(fold_train_accs)
        results['val_losses'].append(fold_val_losses)
        results['val_accs'].append(fold_val_accs)
        results['best_models'].append(model.state_dict().copy())

        # Save model for this fold
        torch.save({
            'fold': fold,
            'model_state_dict': model.state_dict(),
            'val_acc': final_val_acc,
            'val_loss': final_val_loss
        }, f'best_model2_fold_{fold}.pth')

    # Calculate average metrics across folds
    avg_val_acc = np.mean([results['val_accs'][i][-1] for i in range(k)])
    print(f"\nAverage validation accuracy across {k} folds: {avg_val_acc*100:.2f}%")

    return results


def visualize_region(dataset, idx=0, slice_dim=0, slice_pos=32):
    """
    Visualize a brain region from the dataset.

    Args:
        dataset: BrainRegionDataset instance
        idx: Index of sample to visualize
        slice_dim: Dimension to slice (0=sagittal, 1=coronal, 2=axial)
        slice_pos: Position of slice
    """
    volume, label = dataset[idx]
    volume = volume.squeeze(0).numpy()  # Remove channel dimension

    class_name = "Parkinson's Disease" if label.item() == 1 else "Healthy Control"

    # Get a slice
    if slice_dim == 0:
        slice_name = "Sagittal"
        img_slice = volume[slice_pos, :, :]
    elif slice_dim == 1:
        slice_name = "Coronal"
        img_slice = volume[:, slice_pos, :]
    else:
        slice_name = "Axial"
        img_slice = volume[:, :, slice_pos]

    plt.figure(figsize=(6, 6))
    plt.imshow(img_slice, cmap='gray')
    plt.title(f"{slice_name} Slice - {class_name}")
    plt.colorbar()
    plt.tight_layout()
    plt.show()


# Example usage
if __name__ == "__main__":
    # Set data directory and substantia nigra center coordinates
    data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'

    # These should be the coordinates of the substantia nigra in your dataset
    # You might need to adjust these based on your specific dataset
    substantia_nigra_center = (85, 130, 110)  # Example coordinates (x, y, z)
    region_size = (64, 64, 64)  # Size of region to extract

    # Create dataset
    dataset = BrainRegionDataset(
        root_dir=data_dir,
        midbrain_center=substantia_nigra_center,
        region_size=region_size
    )

    # Visualize a few samples to verify
    for i in range(min(3, len(dataset))):
        print(f"\nVisualizing sample {i}")
        # View axial slice
        visualize_region(dataset, idx=i, slice_dim=2, slice_pos=32)

    # Set training parameters
    params = {
        'k': 5,
        'num_epochs': 35,
        'batch_size': 4,
        'learning_rate': 1e-4,
        'weight_decay': 1e-4,
        'patience': 5
    }

    # Run k-fold cross-validation
    results = k_fold_train_and_validate(dataset, **params)

    # Plot learning curves
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    for fold in range(params['k']):
        plt.plot(results['train_losses'][fold], label=f'Fold {fold+1} Train')
        plt.plot(results['val_losses'][fold], label=f'Fold {fold+1} Val')
    plt.title('Loss vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    for fold in range(params['k']):
        plt.plot(np.array(results['train_accs'][fold])*100, label=f'Fold {fold+1} Train')
        plt.plot(np.array(results['val_accs'][fold])*100, label=f'Fold {fold+1} Val')
    plt.title('Accuracy vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
import torch
import torch.nn as nn
from torchvision import models
from torch.utils.data import Dataset, DataLoader, Subset
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import StratifiedKFold
import random
import matplotlib.pyplot as plt

# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(64, 64, 64), transform=None):
        """
        Dataset for brain region extraction focused on the substantia nigra.

        Args:
            root_dir: Directory containing 'hc' and 'pd' subdirectories
            midbrain_center: Coordinates of the midbrain center (x, y, z)
            region_size: Size of the region to extract
            transform: Optional transforms to apply
        """
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        # Check if directories exist
        if not os.path.exists(hc_folder) or not os.path.exists(pd_folder):
            raise FileNotFoundError(f"HC or PD folder not found in {root_dir}")

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]

        print(f"Found {len(hc_files)} healthy control samples and {len(pd_files)} Parkinson's disease samples")

        # Shuffle the samples
        self.samples = [(f, 0) for f in hc_files] + [(f, 1) for f in pd_files]
        random.shuffle(self.samples)

    def extract_midbrain_region(self, volume, center=None, size=None):
        """
        Extract region around substantia nigra from a brain volume.

        Args:
            volume: 3D numpy array of brain MRI
            center: Center coordinates (x, y, z), defaults to self.midbrain_center
            size: Size of region to extract (d, h, w), defaults to self.region_size

        Returns:
            3D numpy array of extracted region
        """
        if center is None:
            center = self.midbrain_center
        if size is None:
            size = self.region_size

        x_center, y_center, z_center = center
        d, h, w = size

        # Ensure center coordinates are within volume boundaries
        x_center = min(max(x_center, d//2), volume.shape[0] - d//2)
        y_center = min(max(y_center, h//2), volume.shape[1] - h//2)
        z_center = min(max(z_center, w//2), volume.shape[2] - w//2)

        x_start, y_start, z_start = x_center - d//2, y_center - h//2, z_center - w//2
        x_end, y_end, z_end = x_start + d, y_start + h, z_start + w

        # Double-check boundaries
        x_start, y_start, z_start = max(x_start, 0), max(y_start, 0), max(z_start, 0)
        x_end = min(x_end, volume.shape[0])
        y_end = min(y_end, volume.shape[1])
        z_end = min(z_end, volume.shape[2])

        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]

        try:
            volume = np.load(file_path)

            # Handle different volume formats
            if volume.ndim == 4 and volume.shape[0] == 1:
                volume = volume.squeeze(0)  # Remove batch dimension if present
            elif volume.ndim > 3:
                raise ValueError(f"Unexpected volume shape: {volume.shape}")

            # Extract region around substantia nigra
            midbrain_region = self.extract_midbrain_region(volume)

            # Check if region was extracted properly
            if 0 in midbrain_region.shape:
                raise ValueError(f"Extracted region has invalid shape: {midbrain_region.shape}")

            # Resize to consistent dimensions
            target_shape = (64, 64, 64)
            scale_factors = np.array(target_shape) / np.array(midbrain_region.shape)
            resized_region = scipy.ndimage.zoom(midbrain_region, scale_factors, order=1)

            # Normalize to [0, 1]
            if np.max(resized_region) > np.min(resized_region):
                resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region))

            # Convert to tensor with channel dimension
            resized_region = torch.tensor(resized_region, dtype=torch.float32).unsqueeze(0)

            if self.transform:
                resized_region = self.transform(resized_region)

            return resized_region, torch.tensor(label, dtype=torch.long)

        except Exception as e:
            print(f"Error loading {file_path}: {str(e)}")
            # Return a placeholder and keep track of errors
            # In a production system, you'd want better error handling
            placeholder = torch.zeros((1, 64, 64, 64), dtype=torch.float32)
            return placeholder, torch.tensor(label, dtype=torch.long)


class CustomResNet3D(nn.Module):
    def __init__(self, pretrained=True, num_classes=2):
        """
        3D ResNet model adapted for brain MRI classification.

        Args:
            pretrained: Whether to use pretrained weights
            num_classes: Number of output classes
        """
        super(CustomResNet3D, self).__init__()

        # Start with pretrained R3D_18 model
        base_model = models.video.r3d_18(pretrained=pretrained)

        # Modify first conv layer to accept single-channel input
        base_model.stem[0] = nn.Conv3d(
            1,
            base_model.stem[0].out_channels,
            kernel_size=7,
            stride=2,
            padding=3,
            bias=False
        )

        # Modify final FC layer for binary classification
        num_ftrs = base_model.fc.in_features
        base_model.fc = nn.Linear(num_ftrs, num_classes)

        self.model = base_model

        # Add regularization
        self.dropout = nn.Dropout3d(p=0.45)

    def forward(self, x):
        x = self.model.stem(x)
        x = self.dropout(x)
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)
        x = x.flatten(1)
        x = self.model.fc(x)
        return x


def train_epoch(model, train_loader, criterion, optimizer, device):
    """Run one training epoch"""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Statistics
        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    return running_loss / total, correct / total


def validate(model, val_loader, criterion, device):
    """Validate the model"""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # Save predictions and labels for metrics
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return running_loss / total, correct / total, np.array(all_preds), np.array(all_labels)


def k_fold_train_and_validate(dataset, k=5, num_epochs=20, batch_size=8,
                             learning_rate=5e-5, weight_decay=1e-4, patience=5):
    """
    Train and validate using k-fold cross validation.

    Args:
        dataset: BrainRegionDataset instance
        k: Number of folds
        num_epochs: Maximum number of training epochs
        batch_size: Batch size for training
        learning_rate: Initial learning rate
        weight_decay: L2 regularization strength
        patience: Early stopping patience

    Returns:
        Dictionary of results for each fold
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Extract labels for stratified splitting
    labels = [label for _, label in dataset.samples]

    # Initialize StratifiedKFold
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)

    # Store results
    results = {
        'train_losses': [],
        'train_accs': [],
        'val_losses': [],
        'val_accs': [],
        'best_models': []
    }

    # Class weights for imbalanced data
    class_counts = np.bincount(labels)
    class_weights = torch.tensor(len(labels) / (len(class_counts) * class_counts), dtype=torch.float32)

    # Train and validate for each fold
    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(labels)), labels)):
        print(f"\n{'='*40}\nTraining Fold {fold+1}/{k}\n{'='*40}")

        # Create data subsets and loaders
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True,
                                 num_workers=4, pin_memory=True)
        val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False,
                               num_workers=4, pin_memory=True)

        # Initialize model, optimizer, and criterion
        model = CustomResNet3D(pretrained=True).to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=3, verbose=True
        )

        # Use weighted loss for imbalanced data
        criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))

        # Track best model and metrics
        best_val_loss = float('inf')
        early_stop_counter = 0

        # Training history
        fold_train_losses = []
        fold_train_accs = []
        fold_val_losses = []
        fold_val_accs = []

        # Training loop
        for epoch in range(num_epochs):
            # Train
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            fold_train_losses.append(train_loss)
            fold_train_accs.append(train_acc)

            # Validate
            val_loss, val_acc, val_preds, val_labels = validate(model, val_loader, criterion, device)
            fold_val_losses.append(val_loss)
            fold_val_accs.append(val_acc)

            # Update learning rate
            scheduler.step(val_loss)

            # Print metrics
            print(f"Epoch [{epoch+1}/{num_epochs}] | "
                  f"Train Loss: {train_loss:.4f}, Acc: {train_acc*100:.2f}% | "
                  f"Val Loss: {val_loss:.4f}, Acc: {val_acc*100:.2f}%")

            # Check if this is the best model
            if val_loss < best_val_loss:
                print(f"Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}")
                best_val_loss = val_loss
                best_model_wts = model.state_dict().copy()
                early_stop_counter = 0
            else:
                early_stop_counter += 1
                print(f"Validation loss did not improve. Early stopping counter: {early_stop_counter}/{patience}")
                if early_stop_counter >= patience:
                    print(f"Early stopping triggered after epoch {epoch+1}")
                    break

        # Load best model weights
        model.load_state_dict(best_model_wts)

        # Final validation with best model
        final_val_loss, final_val_acc, final_preds, final_labels = validate(model, val_loader, criterion, device)
        print(f"\nFinal validation - Loss: {final_val_loss:.4f}, Acc: {final_val_acc*100:.2f}%")

        # Save fold results
        results['train_losses'].append(fold_train_losses)
        results['train_accs'].append(fold_train_accs)
        results['val_losses'].append(fold_val_losses)
        results['val_accs'].append(fold_val_accs)
        results['best_models'].append(model.state_dict().copy())

        # Save model for this fold
        torch.save({
            'fold': fold,
            'model_state_dict': model.state_dict(),
            'val_acc': final_val_acc,
            'val_loss': final_val_loss
        }, f'best_model2_fold_{fold}.pth')

    # Calculate average metrics across folds
    avg_val_acc = np.mean([results['val_accs'][i][-1] for i in range(k)])
    print(f"\nAverage validation accuracy across {k} folds: {avg_val_acc*100:.2f}%")

    return results


def visualize_region(dataset, idx=0, slice_dim=0, slice_pos=32):
    """
    Visualize a brain region from the dataset.

    Args:
        dataset: BrainRegionDataset instance
        idx: Index of sample to visualize
        slice_dim: Dimension to slice (0=sagittal, 1=coronal, 2=axial)
        slice_pos: Position of slice
    """
    volume, label = dataset[idx]
    volume = volume.squeeze(0).numpy()  # Remove channel dimension

    class_name = "Parkinson's Disease" if label.item() == 1 else "Healthy Control"

    # Get a slice
    if slice_dim == 0:
        slice_name = "Sagittal"
        img_slice = volume[slice_pos, :, :]
    elif slice_dim == 1:
        slice_name = "Coronal"
        img_slice = volume[:, slice_pos, :]
    else:
        slice_name = "Axial"
        img_slice = volume[:, :, slice_pos]

    plt.figure(figsize=(6, 6))
    plt.imshow(img_slice, cmap='gray')
    plt.title(f"{slice_name} Slice - {class_name}")
    plt.colorbar()
    plt.tight_layout()
    plt.show()


# Example usage
if __name__ == "__main__":
    # Set data directory and substantia nigra center coordinates
    data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'

    # These should be the coordinates of the substantia nigra in your dataset
    # You might need to adjust these based on your specific dataset
    substantia_nigra_center = (85, 130, 110)  # Example coordinates (x, y, z)
    region_size = (64, 64, 64)  # Size of region to extract

    # Create dataset
    dataset = BrainRegionDataset(
        root_dir=data_dir,
        midbrain_center=substantia_nigra_center,
        region_size=region_size
    )

    # Visualize a few samples to verify
    for i in range(min(3, len(dataset))):
        print(f"\nVisualizing sample {i}")
        # View axial slice
        visualize_region(dataset, idx=i, slice_dim=2, slice_pos=32)

    # Set training parameters
    params = {
        'k': 5,
        'num_epochs': 35,
        'batch_size': 4,
        'learning_rate': 1e-4,
        'weight_decay': 1e-4,
        'patience': 5
    }

    # Run k-fold cross-validation
    results = k_fold_train_and_validate(dataset, **params)

    # Plot learning curves
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    for fold in range(params['k']):
        plt.plot(results['train_losses'][fold], label=f'Fold {fold+1} Train')
        plt.plot(results['val_losses'][fold], label=f'Fold {fold+1} Val')
    plt.title('Loss vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    for fold in range(params['k']):
        plt.plot(np.array(results['train_accs'][fold])*100, label=f'Fold {fold+1} Train')
        plt.plot(np.array(results['val_accs'][fold])*100, label=f'Fold {fold+1} Val')
    plt.title('Accuracy vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torch.utils.data import Dataset, DataLoader, Subset
import numpy as np
import scipy.ndimage
import os
from sklearn.model_selection import StratifiedKFold
import random
import matplotlib.pyplot as plt
from torch.utils.data.sampler import WeightedRandomSampler
from tqdm import tqdm
import torchvision.transforms as transforms
import time
import copy

# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

class BrainRegionDataset(Dataset):
    def __init__(self, root_dir, midbrain_center, region_size=(64, 64, 64), transform=None, augment=False):
        """
        Dataset for brain region extraction focused on the substantia nigra.

        Args:
            root_dir: Directory containing 'hc' and 'pd' subdirectories
            midbrain_center: Coordinates of the midbrain center (x, y, z)
            region_size: Size of the region to extract
            transform: Optional transforms to apply
            augment: Whether to apply data augmentation
        """
        self.samples = []
        self.midbrain_center = midbrain_center
        self.region_size = region_size
        self.transform = transform
        self.augment = augment

        # For multi-region extraction
        self.additional_regions = [
            (-3, 0, 0),   # Left shift
            (3, 0, 0),    # Right shift
            (0, -3, 0),   # Anterior shift
            (0, 3, 0),    # Posterior shift
            (0, 0, -3),   # Superior shift
            (0, 0, 3)     # Inferior shift
        ]

        hc_folder = os.path.join(root_dir, 'hc')
        pd_folder = os.path.join(root_dir, 'pd')

        # Check if directories exist
        if not os.path.exists(hc_folder) or not os.path.exists(pd_folder):
            raise FileNotFoundError(f"HC or PD folder not found in {root_dir}")

        hc_files = [os.path.join(hc_folder, f) for f in os.listdir(hc_folder) if f.endswith('.npy')]
        pd_files = [os.path.join(pd_folder, f) for f in os.listdir(pd_folder) if f.endswith('.npy')]

        print(f"Found {len(hc_files)} healthy control samples and {len(pd_files)} Parkinson's disease samples")

        # Shuffle the samples
        self.samples = [(f, 0) for f in hc_files] + [(f, 1) for f in pd_files]
        random.shuffle(self.samples)

    def extract_midbrain_region(self, volume, center=None, size=None):
        """
        Extract region around substantia nigra from a brain volume.

        Args:
            volume: 3D numpy array of brain MRI
            center: Center coordinates (x, y, z), defaults to self.midbrain_center
            size: Size of region to extract (d, h, w), defaults to self.region_size

        Returns:
            3D numpy array of extracted region
        """
        if center is None:
            center = self.midbrain_center
        if size is None:
            size = self.region_size

        x_center, y_center, z_center = center
        d, h, w = size

        # Ensure center coordinates are within volume boundaries
        x_center = min(max(x_center, d//2), volume.shape[0] - d//2)
        y_center = min(max(y_center, h//2), volume.shape[1] - h//2)
        z_center = min(max(z_center, w//2), volume.shape[2] - w//2)

        x_start, y_start, z_start = x_center - d//2, y_center - h//2, z_center - w//2
        x_end, y_end, z_end = x_start + d, y_start + h, z_start + w

        # Double-check boundaries
        x_start, y_start, z_start = max(x_start, 0), max(y_start, 0), max(z_start, 0)
        x_end = min(x_end, volume.shape[0])
        y_end = min(y_end, volume.shape[1])
        z_end = min(z_end, volume.shape[2])

        return volume[x_start:x_end, y_start:y_end, z_start:z_end]

    def apply_augmentation(self, volume):
        """Apply random augmentations to the volume"""
        # Random flip
        if random.random() > 0.5:
            volume = np.flip(volume, axis=0).copy()

        # Random rotation ±10 degrees around each axis
        for axis in range(3):
            if random.random() > 0.5:
                angle = random.uniform(-10, 10)
                volume = scipy.ndimage.rotate(volume, angle, axes=((axis+1)%3, (axis+2)%3),
                                            reshape=False, order=1, mode='constant')

        # Random brightness adjustment
        if random.random() > 0.5:
            factor = random.uniform(0.9, 1.1)
            volume = volume * factor
            volume = np.clip(volume, 0, 1)

        # Random gaussian noise
        if random.random() > 0.5:
            sigma = random.uniform(0.01, 0.03)
            noise = np.random.normal(0, sigma, volume.shape)
            volume = volume + noise
            volume = np.clip(volume, 0, 1)

        # Random contrast adjustment
        if random.random() > 0.5:
            factor = random.uniform(0.9, 1.1)
            mean = volume.mean()
            volume = (volume - mean) * factor + mean
            volume = np.clip(volume, 0, 1)

        # Random gamma adjustment
        if random.random() > 0.5:
            gamma = random.uniform(0.8, 1.2)
            volume = np.power(volume, gamma)

        return volume

    def preprocess_volume(self, volume):
        """Preprocess the volume"""
        # Check dimensions
        if volume.ndim == 4 and volume.shape[0] == 1:
            volume = volume.squeeze(0)  # Remove batch dimension if present
        elif volume.ndim > 3:
            raise ValueError(f"Unexpected volume shape: {volume.shape}")

        # Extract region around substantia nigra
        midbrain_region = self.extract_midbrain_region(volume)

        # Check if region was extracted properly
        if 0 in midbrain_region.shape:
            raise ValueError(f"Extracted region has invalid shape: {midbrain_region.shape}")

        # Resize to consistent dimensions
        target_shape = (64, 64, 64)
        scale_factors = np.array(target_shape) / np.array(midbrain_region.shape)
        resized_region = scipy.ndimage.zoom(midbrain_region, scale_factors, order=1)

        # Normalize to [0, 1]
        if np.max(resized_region) > np.min(resized_region):
            resized_region = (resized_region - np.min(resized_region)) / (np.max(resized_region) - np.min(resized_region))

        return resized_region

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file_path, label = self.samples[idx]

        try:
            volume = np.load(file_path)

            # Preprocess volume
            processed_region = self.preprocess_volume(volume)

            # Apply data augmentation if specified
            if self.augment:
                processed_region = self.apply_augmentation(processed_region)

            # Convert to tensor with channel dimension
            processed_tensor = torch.tensor(processed_region, dtype=torch.float32).unsqueeze(0)

            if self.transform:
                processed_tensor = self.transform(processed_tensor)

            return processed_tensor, torch.tensor(label, dtype=torch.long)

        except Exception as e:
            print(f"Error loading {file_path}: {str(e)}")
            placeholder = torch.zeros((1, 64, 64, 64), dtype=torch.float32)
            return placeholder, torch.tensor(label, dtype=torch.long)

class CustomResNet3D(nn.Module):
    """Custom 3D ResNet model for brain MRI classification"""
    def __init__(self, pretrained=True, num_classes=2):
        super(CustomResNet3D, self).__init__()

        # Start with pretrained R3D_18 model
        base_model = models.video.r3d_18(pretrained=pretrained)

        # Modify first conv layer to accept single-channel input
        base_model.stem[0] = nn.Conv3d(
            1,
            base_model.stem[0].out_channels,
            kernel_size=7,
            stride=2,
            padding=3,
            bias=False
        )

        # Remove the final fc layer
        self.features = nn.Sequential(*list(base_model.children())[:-1])

        # Create a new fully connected layer
        self.classifier = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        # Feature extraction
        x = self.features(x)

        # Flatten
        x = x.view(x.size(0), -1)

        # Classification
        x = self.classifier(x)

        return x

class SELayer3D(nn.Module):
    """Squeeze-and-Excitation attention block for 3D data"""
    def __init__(self, channel, reduction=16):
        super(SELayer3D, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool3d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1, 1)
        return x * y.expand_as(x)


class SelfAdaptiveBlock(nn.Module):
    """Self-adaptive block with dynamic filter adjustment"""
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, reduction=16):
        super(SelfAdaptiveBlock, self).__init__()

        self.conv = nn.Conv3d(in_channels, out_channels, kernel_size,
                           stride=stride, padding=kernel_size//2, bias=False)
        self.bn = nn.BatchNorm3d(out_channels)
        self.se = SELayer3D(out_channels, reduction)

        # Adaptive parameters layer
        self.adaptive_params = nn.Sequential(
            nn.AdaptiveAvgPool3d(1),
            nn.Conv3d(in_channels, out_channels, 1, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Generate adaptive parameters based on input
        params = self.adaptive_params(x)

        # Apply convolution
        out = self.conv(x)

        # Apply batch normalization
        out = self.bn(out)

        # Dynamic scale the features using adaptive parameters
        out = out * params

        # Apply squeeze-excitation attention
        out = self.se(out)

        # Apply activation
        out = F.relu(out, inplace=True)

        return out


class SelfAdaptiveCNN(nn.Module):
    """Self-adaptive CNN model for 3D brain MRI classification"""
    def __init__(self, in_channels=1, num_classes=2, base_filters=32):
        super(SelfAdaptiveCNN, self).__init__()

        self.input_conv = nn.Conv3d(in_channels, base_filters, kernel_size=7,
                                   stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm3d(base_filters)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)

        # Self-adaptive blocks
        self.sa_block1 = SelfAdaptiveBlock(base_filters, base_filters*2)
        self.sa_block2 = SelfAdaptiveBlock(base_filters*2, base_filters*4, stride=2)
        self.sa_block3 = SelfAdaptiveBlock(base_filters*4, base_filters*8, stride=2)
        self.sa_block4 = SelfAdaptiveBlock(base_filters*8, base_filters*16, stride=2)

        # Global attention pooling
        self.gap = nn.AdaptiveAvgPool3d(1)

        # Fully connected layers
        self.fc1 = nn.Linear(base_filters*16, 512)
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 128)
        self.dropout2 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(128, num_classes)

    def forward(self, x):
        # Initial convolution
        x = self.input_conv(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        # Self-adaptive blocks
        x = self.sa_block1(x)
        x = self.sa_block2(x)
        x = self.sa_block3(x)
        x = self.sa_block4(x)

        # Global pooling
        x = self.gap(x)
        x = x.view(x.size(0), -1)

        # Fully connected layers
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        x = self.fc3(x)

        return x


class AutoAdaptCNNClassifier(nn.Module):
    """Advanced model combining ResNet3D backbone with self-adaptive mechanisms"""
    def __init__(self, pretrained=True, num_classes=2):
        super(AutoAdaptCNNClassifier, self).__init__()

        # Start with pretrained R3D_18 model
        base_model = models.video.r3d_18(pretrained=pretrained)

        # Modify first conv layer to accept single-channel input
        base_model.stem[0] = nn.Conv3d(
            1,
            base_model.stem[0].out_channels,
            kernel_size=7,
            stride=2,
            padding=3,
            bias=False
        )

        # Remove the final fc layer
        self.backbone = nn.Sequential(*list(base_model.children())[:-1])

        # Add self-adaptive blocks to enhance feature extraction
        self.sa_layer1 = SELayer3D(64, reduction=8)
        self.sa_layer2 = SELayer3D(128, reduction=16)
        self.sa_layer3 = SELayer3D(256, reduction=16)
        self.sa_layer4 = SELayer3D(512, reduction=16)

        # Adaptive pooling
        self.adaptive_pool = nn.AdaptiveAvgPool3d(1)

        # Feature dimension after backbone and pooling
        self.feature_dim = 512

        # Classifier head
        self.classifier = nn.Sequential(
            nn.Linear(self.feature_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.4),
            nn.Linear(128, num_classes)
        )

        # For adaptive learning rate scaling
        self.confidence_estimator = nn.Sequential(
            nn.Linear(self.feature_dim, 64),
            nn.ReLU(inplace=True),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def extract_features(self, x):
        features = []

        # Extract features from backbone while applying attention at each layer
        x = self.backbone[0](x)  # stem

        x = self.backbone[1](x)  # layer1
        x = self.sa_layer1(x)
        features.append(x)

        x = self.backbone[2](x)  # layer2
        x = self.sa_layer2(x)
        features.append(x)

        x = self.backbone[3](x)  # layer3
        x = self.sa_layer3(x)
        features.append(x)

        x = self.backbone[4](x)  # layer4
        x = self.sa_layer4(x)
        features.append(x)

        x = self.backbone[5](x)  # avg_pool

        # Flatten
        x = x.view(x.size(0), -1)

        return x, features

    def forward(self, x):
        # Extract features
        x, _ = self.extract_features(x)

        # Get confidence score
        confidence = self.confidence_estimator(x)

        # Classification
        logits = self.classifier(x)

        return logits, confidence


def train_epoch(model, train_loader, criterion, optimizer, device, adaptive_learning=True):
    """Run one training epoch with self-adaptive learning rate scaling"""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    progress_bar = tqdm(train_loader, desc="Training")

    for inputs, labels in progress_bar:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        if adaptive_learning:
            outputs, confidence = model(inputs)

            # Adaptive loss weighting based on confidence
            loss = criterion(outputs, labels)

            # Scale loss inversely with confidence for hard samples
            confidence_factor = 1.0 - 0.5 * confidence.mean()
            loss = loss * confidence_factor
        else:
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()

        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        # Statistics
        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs if isinstance(outputs, torch.Tensor) else outputs[0], 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

        # Update progress bar
        progress_bar.set_postfix({
            'loss': f"{loss.item():.4f}",
            'acc': f"{100 * correct / total:.2f}%"
        })

    return running_loss / total, correct / total


def validate(model, val_loader, criterion, device):
    """Validate the model"""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            if isinstance(model, AutoAdaptCNNClassifier):
                outputs, _ = model(inputs)
                probs = F.softmax(outputs, dim=1)
            else:
                outputs = model(inputs)
                probs = F.softmax(outputs, dim=1)

            loss = criterion(outputs, labels)

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # Save predictions and labels for metrics
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    return running_loss / total, correct / total, np.array(all_preds), np.array(all_labels), np.array(all_probs)


def create_dataloaders(dataset, train_idx, val_idx, batch_size, balance_classes=True):
    """Create balanced data loaders for training and validation"""
    train_subset = Subset(dataset, train_idx)
    val_subset = Subset(dataset, val_idx)

    # Balance classes for training if requested
    if balance_classes:
        # Get labels for training subset
        train_labels = [dataset.samples[i][1] for i in train_idx]
        class_counts = np.bincount(train_labels)
        class_weights = 1.0 / class_counts

        # Assign weights to each sample
        sample_weights = [class_weights[label] for label in train_labels]
        sample_weights = torch.DoubleTensor(sample_weights)

        # Create weighted sampler
        sampler = WeightedRandomSampler(
            weights=sample_weights,
            num_samples=len(train_subset),
            replacement=True
        )

        train_loader = DataLoader(
            train_subset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=4,
            pin_memory=True
        )
    else:
        train_loader = DataLoader(
            train_subset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=4,
            pin_memory=True
        )

    val_loader = DataLoader(
        val_subset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )

    return train_loader, val_loader


def k_fold_train_and_validate(dataset, k=5, num_epochs=40, batch_size=8,
                             learning_rate=1e-4, weight_decay=2e-4, patience=8,
                             model_type="auto_adapt"):
    """
    Train and validate using k-fold cross validation with improved handling.

    Args:
        dataset: BrainRegionDataset instance
        k: Number of folds
        num_epochs: Maximum number of training epochs
        batch_size: Batch size for training
        learning_rate: Initial learning rate
        weight_decay: L2 regularization strength
        patience: Early stopping patience
        model_type: Type of model to use ("auto_adapt", "self_adaptive", or "resnet3d")

    Returns:
        Dictionary of results for each fold
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Extract labels for stratified splitting
    labels = [label for _, label in dataset.samples]

    # Initialize StratifiedKFold
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)

    # Store results
    results = {
        'train_losses': [],
        'train_accs': [],
        'val_losses': [],
        'val_accs': [],
        'best_models': [],
        'fold_metrics': []
    }

    # Class weights for imbalanced data
    class_counts = np.bincount(labels)
    class_weights = torch.tensor(len(labels) / (len(class_counts) * class_counts), dtype=torch.float32)

    # Train and validate for each fold
    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(labels)), labels)):
        print(f"\n{'='*40}\nTraining Fold {fold+1}/{k}\n{'='*40}")

        # Create data loaders
        train_loader, val_loader = create_dataloaders(
            dataset, train_idx, val_idx, batch_size, balance_classes=True
        )

        # Initialize model based on type
        if model_type == "auto_adapt":
            model = AutoAdaptCNNClassifier(pretrained=True).to(device)
            adaptive_learning = True
        elif model_type == "self_adaptive":
            model = SelfAdaptiveCNN(in_channels=1, num_classes=2).to(device)
            adaptive_learning = True
        else:  # resnet3d
            model = CustomResNet3D(pretrained=True).to(device)
            adaptive_learning = False

        # Optimizer with weight decay
        optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

        # Learning rate scheduler
        scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
            optimizer, T_0=5, T_mult=2, eta_min=learning_rate/10
        )

        # Use weighted loss for imbalanced data
        criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))

        # Track best model and metrics
        best_val_loss = float('inf')
        early_stop_counter = 0
        best_model_wts = None
        best_epoch = 0

        # Training history
        fold_train_losses = []
        fold_train_accs = []
        fold_val_losses = []
        fold_val_accs = []

        # Training loop
        for epoch in range(num_epochs):
            # Train
            start_time = time.time()
            train_loss, train_acc = train_epoch(
                model, train_loader, criterion, optimizer, device, adaptive_learning
            )
            train_time = time.time() - start_time

            fold_train_losses.append(train_loss)
            fold_train_accs.append(train_acc)

            # Validate
            val_loss, val_acc, val_preds, val_labels, val_probs = validate(
                model, val_loader, criterion, device
            )
            fold_val_losses.append(val_loss)
            fold_val_accs.append(val_acc)

            # Update learning rate
            scheduler.step()
            current_lr = optimizer.param_groups[0]['lr']

            # Print metrics
            print(f"Epoch [{epoch+1}/{num_epochs}] | "
                  f"Train Loss: {train_loss:.4f}, Acc: {train_acc*100:.2f}% | "
                  f"Val Loss: {val_loss:.4f}, Acc: {val_acc*100:.2f}% | "
                  f"LR: {current_lr:.6f} | "
                  f"Time: {train_time:.1f}s")

            # Check if this is the best model
            if val_loss < best_val_loss:
                print(f"Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}")
                best_val_loss = val_loss
                best_model_wts = copy.deepcopy(model.state_dict())
                best_epoch = epoch
                early_stop_counter = 0
            else:
                early_stop_counter += 1
                print(f"Validation loss did not improve. Early stopping counter: {early_stop_counter}/{patience}")
                if early_stop_counter >= patience:
                    print(f"Early stopping triggered after epoch {epoch+1}")
                    break

        # Load best model weights
        model.load_state_dict(best_model_wts)

        # Final validation with best model
        final_val_loss, final_val_acc, final_preds, final_labels, final_probs = validate(
            model, val_loader, criterion, device
        )
        print(f"\nFinal validation - Loss: {final_val_loss:.4f}, Acc: {final_val_acc*100:.2f}%")
        print(f"Best model was from epoch {best_epoch+1}")

        # Calculate confusion matrix
        from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score
        conf_matrix = confusion_matrix(final_labels, final_preds)

        # Calculate sensitivity and specificity
        tn, fp, fn, tp = conf_matrix.ravel()
        sensitivity = tp / (tp + fn)
        specificity = tn / (tn + fp)

        # Calculate AUC
        if final_probs.shape[1] > 1:  # Use probability of positive class
            auc = roc_auc_score(final_labels, final_probs[:, 1])
        else:
            auc = roc_auc_score(final_labels, final_probs)

        print(f"Sensitivity: {sensitivity:.4f}, Specificity: {specificity:.4f}, AUC: {auc:.4f}")
        print("\nClassification Report:")
        print(classification_report(final_labels, final_preds))

        # Store fold metrics
        fold_metrics = {
            'fold': fold,
            'val_acc': final_val_acc,
            'val_loss': final_val_loss,
            'sensitivity': sensitivity,
            'specificity': specificity,
            'auc': auc,
            'conf_matrix': conf_matrix,
            'best_epoch': best_epoch
        }

        # Save fold results
        results['train_losses'].append(fold_train_losses)
        results['train_accs'].append(fold_train_accs)
        results['val_losses'].append(fold_val_losses)
        results['val_accs'].append(fold_val_accs)
        results['best_models'].append(model.state_dict().copy())
        results['fold_metrics'].append(fold_metrics)

        # Save model for this fold
        torch.save({
            'fold': fold,
            'model_state_dict': model.state_dict(),
            'val_acc': final_val_acc,
            'val_loss': final_val_loss,
            'metrics': fold_metrics
        }, f'best_model_improved_fold_{fold}.pth')

    # Calculate average metrics across folds
    avg_val_acc = np.mean([metrics['val_acc'] for metrics in results['fold_metrics']])
    avg_auc = np.mean([metrics['auc'] for metrics in results['fold_metrics']])
    avg_sensitivity = np.mean([metrics['sensitivity'] for metrics in results['fold_metrics']])
    avg_specificity = np.mean([metrics['specificity'] for metrics in results['fold_metrics']])

    print(f"\n{'='*40}\nAverage metrics across {k} folds:\n{'='*40}")
    print(f"Accuracy: {avg_val_acc*100:.2f}%")
    print(f"AUC: {avg_auc:.4f}")
    print(f"Sensitivity: {avg_sensitivity:.4f}")
    print(f"Specificity: {avg_specificity:.4f}")

    # Find best fold based on validation accuracy
    best_fold_idx = np.argmax([metrics['val_acc'] for metrics in results['fold_metrics']])
    best_fold = results['fold_metrics'][best_fold_idx]

    print(f"\nBest fold: {best_fold['fold']+1} with validation accuracy: {best_fold['val_acc']*100:.2f}%")

    # Plot training history
    plt.figure(figsize=(12, 10))

    # Plot training & validation loss
    plt.subplot(2, 1, 1)
    for fold in range(k):
        plt.plot(results['train_losses'][fold], label=f'Fold {fold+1} Train')
        plt.plot(results['val_losses'][fold], label=f'Fold {fold+1} Val', linestyle='--')
    plt.title('Loss per Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    # Plot training & validation accuracy
    plt.subplot(2, 1, 2)
    for fold in range(k):
        plt.plot(np.array(results['train_accs'][fold]) * 100, label=f'Fold {fold+1} Train')
        plt.plot(np.array(results['val_accs'][fold]) * 100, label=f'Fold {fold+1} Val', linestyle='--')
    plt.title('Accuracy per Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()

    # Save plot
    plt.savefig('training_history.png')
    plt.close()

    # Save the best model
    torch.save({
        'fold': best_fold['fold'],
        'model_state_dict': results['best_models'][best_fold['fold']],
        'val_acc': best_fold['val_acc'],
        'val_loss': best_fold['val_loss'],
        'metrics': best_fold
    }, 'best_model_overall.pth')

    return results
def main():
    """Main function to run the training and evaluation process"""
    # Set fixed random seeds for reproducibility
    torch.manual_seed(42)
    np.random.seed(42)
    random.seed(42)

    # Define data directory and parameters
    data_root = "/content/drive/MyDrive/3D_Numpy_Volumes"  # Update this path to your data location

    # Define midbrain center coordinates (these would be determined based on your specific dataset)
    # These coordinates should point to the substantia nigra region
    midbrain_center = (91, 109, 91)  # Example coordinates, adjust based on your data

    # Create dataset
    print("Creating dataset...")
    try:
        dataset = BrainRegionDataset(
            root_dir=data_root,
            midbrain_center=midbrain_center,
            region_size=(64, 64, 64),
            augment=True  # Enable data augmentation for training
        )
    except FileNotFoundError as e:
        print(f"Error: {e}")
        print("Please make sure the data directory exists with 'hc' and 'pd' subdirectories.")
        return

    # Set training parameters
    params = {
        'k': 5,                  # Number of folds
        'num_epochs': 50,        # Maximum number of epochs
        'batch_size': 8,         # Batch size
        'learning_rate': 1e-4,   # Initial learning rate
        'weight_decay': 2e-4,    # Weight decay for regularization
        'patience': 8,           # Early stopping patience
        'model_type': "auto_adapt"  # Model type (options: "auto_adapt", "self_adaptive", "resnet3d")
    }

    # Run k-fold cross-validation
    print(f"Starting {params['k']}-fold cross-validation with {params['model_type']} model...")
    results = k_fold_train_and_validate(dataset, **params)

    # These variables are calculated in k_fold_train_and_validate and returned in results
    avg_val_acc = np.mean([metrics['val_acc'] for metrics in results['fold_metrics']])
    avg_auc = np.mean([metrics['auc'] for metrics in results['fold_metrics']])
    avg_sensitivity = np.mean([metrics['sensitivity'] for metrics in results['fold_metrics']])
    avg_specificity = np.mean([metrics['specificity'] for metrics in results['fold_metrics']])

    print(f"\n{'='*40}\nOverall Results\n{'='*40}")
    print(f"Average validation accuracy: {avg_val_acc*100:.2f}%")
    print(f"Average AUC: {avg_auc:.4f}")
    print(f"Average sensitivity: {avg_sensitivity:.4f}")
    print(f"Average specificity: {avg_specificity:.4f}")

    # Save results
    torch.save(results, 'training_results.pth')

    # Load best model for inference
    print("Loading best model for inference...")
    best_model_data = torch.load('best_model_overall.pth')
    best_fold = best_model_data['fold']

    # Initialize the model with the same architecture
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if params['model_type'] == "auto_adapt":
        model = AutoAdaptCNNClassifier(pretrained=False).to(device)
    elif params['model_type'] == "self_adaptive":
        model = SelfAdaptiveCNN(in_channels=1, num_classes=2).to(device)
    else:  # resnet3d
        model = CustomResNet3D(pretrained=False).to(device)

    # Load best model weights
    model.load_state_dict(best_model_data['model_state_dict'])
    model.eval()

    print(f"Best model loaded from fold {best_fold+1} with validation accuracy: {best_model_data['val_acc']*100:.2f}%")

    # Example of how to use the model for inference
    def predict_sample(model, sample_path, midbrain_center, device):
        """Make prediction on a single sample"""
        try:
            # Load the volume
            volume = np.load(sample_path)

            # Create a temporary dataset with same preprocessing
            temp_dataset = BrainRegionDataset(
                root_dir="./",  # Not used here
                midbrain_center=midbrain_center,
                region_size=(64, 64, 64),
                augment=False
            )

            # Preprocess the volume
            processed_region = temp_dataset.preprocess_volume(volume)
            processed_tensor = torch.tensor(processed_region, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
            processed_tensor = processed_tensor.to(device)

            # Make prediction
            with torch.no_grad():
                if isinstance(model, AutoAdaptCNNClassifier):
                    outputs, confidence = model(processed_tensor)
                    probabilities = F.softmax(outputs, dim=1)
                    _, predicted = torch.max(outputs, 1)
                    return predicted.item(), probabilities[0, 1].item(), confidence.item()
                else:
                    outputs = model(processed_tensor)
                    probabilities = F.softmax(outputs, dim=1)
                    _, predicted = torch.max(outputs, 1)
                    return predicted.item(), probabilities[0, 1].item(), None

        except Exception as e:
            print(f"Error during prediction: {str(e)}")
            return None, None, None

    # Example test for visualization (if you have test samples)
    print("\nTesting model on example cases:")
    test_samples = [
        # Add paths to test samples here
        # ("/content/drive/MyDrive/3D_Numpy_Volumes/hc/hc_subject13.npy", "HC"),
        # ("/content/drive/MyDrive/3D_Numpy_Volumes/pd/pd_subject13.npy", "PD")
    ]

    for sample_path, true_label in test_samples:
        pred_class, prob, conf = predict_sample(model, sample_path, midbrain_center, device)

        if pred_class is not None:
            pred_label = "PD" if pred_class == 1 else "HC"
            print(f"Sample: {sample_path}")
            print(f"True label: {true_label}, Predicted: {pred_label}")
            print(f"Probability of PD: {prob:.4f}")
            if conf is not None:
                print(f"Model confidence: {conf:.4f}")
            print("---")

    print("Training and evaluation completed successfully!")


if __name__ == "__main__":
    main()

Creating dataset...
Found 49 healthy control samples and 49 Parkinson's disease samples
Starting 5-fold cross-validation with auto_adapt model...
Using device: cpu

Training Fold 1/5


Training: 100%|██████████| 10/10 [06:17<00:00, 37.74s/it, loss=0.4919, acc=55.13%]


Epoch [1/50] | Train Loss: 0.5098, Acc: 55.13% | Val Loss: 0.6947, Acc: 50.00% | LR: 0.000091 | Time: 377.4s
Validation loss improved from inf to 0.6947


Training:  30%|███       | 3/10 [01:59<04:24, 37.78s/it, loss=0.4882, acc=58.33%]

In [None]:
def main():
    """Main function to run the training and evaluation process"""
    # Set fixed random seeds for reproducibility
    torch.manual_seed(42)
    np.random.seed(42)
    random.seed(42)

    # Define data directory and parameters
    data_root = "/content/drive/MyDrive/3D_Numpy_Volumes"  # Update this path to your data location

    # Define midbrain center coordinates (these would be determined based on your specific dataset)
    # These coordinates should point to the substantia nigra region
    midbrain_center = (91, 109, 91)  # Example coordinates, adjust based on your data

    # Create dataset
    print("Creating dataset...")
    try:
        dataset = BrainRegionDataset(
            root_dir=data_root,
            midbrain_center=midbrain_center,
            region_size=(64, 64, 64),
            augment=True  # Enable data augmentation for training
        )
    except FileNotFoundError as e:
        print(f"Error: {e}")
        print("Please make sure the data directory exists with 'hc' and 'pd' subdirectories.")
        return

    # Set training parameters
    params = {
        'k': 5,                  # Number of folds
        'num_epochs': 50,        # Maximum number of epochs
        'batch_size': 8,         # Batch size
        'learning_rate': 1e-4,   # Initial learning rate
        'weight_decay': 2e-4,    # Weight decay for regularization
        'patience': 8,           # Early stopping patience
        'model_type': "auto_adapt"  # Model type (options: "auto_adapt", "self_adaptive", "resnet3d")
    }

    # Run k-fold cross-validation
    print(f"Starting {params['k']}-fold cross-validation with {params['model_type']} model...")
    results = k_fold_train_and_validate(dataset, **params)

    # These variables are calculated in k_fold_train_and_validate and returned in results
    avg_val_acc = np.mean([metrics['val_acc'] for metrics in results['fold_metrics']])
    avg_auc = np.mean([metrics['auc'] for metrics in results['fold_metrics']])
    avg_sensitivity = np.mean([metrics['sensitivity'] for metrics in results['fold_metrics']])
    avg_specificity = np.mean([metrics['specificity'] for metrics in results['fold_metrics']])

    print(f"\n{'='*40}\nOverall Results\n{'='*40}")
    print(f"Average validation accuracy: {avg_val_acc*100:.2f}%")
    print(f"Average AUC: {avg_auc:.4f}")
    print(f"Average sensitivity: {avg_sensitivity:.4f}")
    print(f"Average specificity: {avg_specificity:.4f}")

    # Save results
    torch.save(results, 'training_results.pth')

    # Load best model for inference
    print("Loading best model for inference...")
    best_model_data = torch.load('best_model_overall.pth')
    best_fold = best_model_data['fold']

    # Initialize the model with the same architecture
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if params['model_type'] == "auto_adapt":
        model = AutoAdaptCNNClassifier(pretrained=False).to(device)
    elif params['model_type'] == "self_adaptive":
        model = SelfAdaptiveCNN(in_channels=1, num_classes=2).to(device)
    else:  # resnet3d
        model = CustomResNet3D(pretrained=False).to(device)

    # Load best model weights
    model.load_state_dict(best_model_data['model_state_dict'])
    model.eval()

    print(f"Best model loaded from fold {best_fold+1} with validation accuracy: {best_model_data['val_acc']*100:.2f}%")

    # Example of how to use the model for inference
    def predict_sample(model, sample_path, midbrain_center, device):
        """Make prediction on a single sample"""
        try:
            # Load the volume
            volume = np.load(sample_path)

            # Create a temporary dataset with same preprocessing
            temp_dataset = BrainRegionDataset(
                root_dir="./",  # Not used here
                midbrain_center=midbrain_center,
                region_size=(64, 64, 64),
                augment=False
            )

            # Preprocess the volume
            processed_region = temp_dataset.preprocess_volume(volume)
            processed_tensor = torch.tensor(processed_region, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
            processed_tensor = processed_tensor.to(device)

            # Make prediction
            with torch.no_grad():
                if isinstance(model, AutoAdaptCNNClassifier):
                    outputs, confidence = model(processed_tensor)
                    probabilities = F.softmax(outputs, dim=1)
                    _, predicted = torch.max(outputs, 1)
                    return predicted.item(), probabilities[0, 1].item(), confidence.item()
                else:
                    outputs = model(processed_tensor)
                    probabilities = F.softmax(outputs, dim=1)
                    _, predicted = torch.max(outputs, 1)
                    return predicted.item(), probabilities[0, 1].item(), None

        except Exception as e:
            print(f"Error during prediction: {str(e)}")
            return None, None, None

    # Example test for visualization (if you have test samples)
    print("\nTesting model on example cases:")
    test_samples = [
        # Add paths to test samples here
        # ("/content/drive/MyDrive/3D_Numpy_Volumes/hc/hc_subject13.npy", "HC"),
        # ("/content/drive/MyDrive/3D_Numpy_Volumes/pd/pd_subject13.npy", "PD")
    ]

    for sample_path, true_label in test_samples:
        pred_class, prob, conf = predict_sample(model, sample_path, midbrain_center, device)

        if pred_class is not None:
            pred_label = "PD" if pred_class == 1 else "HC"
            print(f"Sample: {sample_path}")
            print(f"True label: {true_label}, Predicted: {pred_label}")
            print(f"Probability of PD: {prob:.4f}")
            if conf is not None:
                print(f"Model confidence: {conf:.4f}")
            print("---")

    print("Training and evaluation completed successfully!")


if __name__ == "__main__":
    main()

Creating dataset...
Found 49 healthy control samples and 49 Parkinson's disease samples
Starting 5-fold cross-validation with auto_adapt model...
Using device: cpu

Training Fold 1/5


Training: 100%|██████████| 10/10 [05:16<00:00, 31.64s/it, loss=0.4919, acc=55.13%]


Epoch [1/50] | Train Loss: 0.5098, Acc: 55.13% | Val Loss: 0.6947, Acc: 50.00% | LR: 0.000091 | Time: 316.4s
Validation loss improved from inf to 0.6947


Training:  60%|██████    | 6/10 [03:16<02:03, 30.75s/it, loss=0.5152, acc=50.00%]

In [None]:
import torch
from torch.utils.data import DataLoader
import time

def train(model, train_dataset, val_dataset, batch_size=16, num_epochs=50, learning_rate=5e-5, device='cuda'):
    """
    Train the 3D ResNet model on the provided training and validation datasets.

    Args:
        model: The model to train.
        train_dataset: The training dataset.
        val_dataset: The validation dataset.
        batch_size: The batch size.
        num_epochs: The number of training epochs.
        learning_rate: The learning rate.
        device: The device to use ('cuda' or 'cpu').

    Returns:
        The trained model.
    """

    # DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Loss function and optimizer
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Move model to device
    model.to(device)

    # Tracking metrics
    best_val_acc = 0
    best_model_wts = copy.deepcopy(model.state_dict())

    # Training loop
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        running_loss = 0.0
        correct_preds = 0
        total_preds = 0

        # Training phase
        for inputs, labels in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()  # Zero the gradients

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Calculate accuracy
            _, preds = torch.max(outputs, 1)
            correct_preds += torch.sum(preds == labels.data)
            total_preds += labels.size(0)

        # Calculate training accuracy and loss
        train_loss = running_loss / len(train_loader)
        train_acc = correct_preds.double() / total_preds

        # Validation phase
        model.eval()  # Set model to evaluation mode
        val_loss = 0.0
        val_correct_preds = 0
        val_total_preds = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Forward pass
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                # Calculate accuracy
                _, preds = torch.max(outputs, 1)
                val_correct_preds += torch.sum(preds == labels.data)
                val_total_preds += labels.size(0)

        # Calculate validation accuracy and loss
        val_loss = val_loss / len(val_loader)
        val_acc = val_correct_preds.double() / val_total_preds

        # Print metrics
        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}")

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())

    # Load the best model weights
    model.load_state_dict(best_model_wts)
    print(f"Training complete. Best validation accuracy: {best_val_acc:.4f}")

    return model

# Example usage
# train(model, train_dataset, val_dataset, batch_size=16, num_epochs=50, learning_rate=1e-4, device='cuda')

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

# # Import your modules
# from improved_model import(
#     BrainRegionDataset,
#     AutoAdaptCNNClassifier,
#     SelfAdaptiveCNN,
#     CustomResNet3D,
# )

def main():
    # Set data directory and substantia nigra center coordinates
    data_dir = '/content/drive/MyDrive/3D_Numpy_Volumes'  # Update this to your data path

    # These should be the coordinates of the substantia nigra in your dataset
    # You might need to adjust these based on your specific dataset
    substantia_nigra_center = (85, 130, 110)  # Example coordinates (x, y, z)
    region_size = (64, 64, 64)  # Size of region to extract

    # Create dataset with augmentation for training
    dataset = BrainRegionDataset(
        root_dir=data_dir,
        midbrain_center=substantia_nigra_center,
        region_size=region_size,
        augment=True  # Enable data augmentation
    )

    # Set improved training parameters
    params = {
        'k': 5,
        'num_epochs': 50,
        'batch_size': 8,
        'learning_rate': 5e-5,
        'weight_decay': 2e-4,
        'patience': 8,
        'model_type': "auto_adapt"  # Use the self-adaptive model
    }

    # Print dataset information
    print(f"Dataset size: {len(dataset)}")
    print(f"Sample shape: {dataset[0][0].shape}")

    # Print model architecture
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = AutoAdaptCNNClassifier(pretrained=True).to(device)
    sample_input = torch.zeros((1, 1, 64, 64, 64)).to(device)
    _ = model(sample_input)  # Forward pass to initialize

    # Print model parameters
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")

    # Load a pre-trained model if available
    try:
        checkpoint = torch.load('best_model_improved_fold_0.pth', map_location=device)
        model.load_state_dict(checkpoint['model_state_dict'])
        print(f"Loaded pre-trained model with validation accuracy: {checkpoint['val_acc']*100:.2f}%")

        # Make predictions on a few samples
        model.eval()
        with torch.no_grad():
            for i in range(min(5, len(dataset))):
                sample, label = dataset[i]
                sample = sample.unsqueeze(0).to(device)
                output, confidence = model(sample)
                pred = torch.argmax(output, dim=1).item()
                conf_score = confidence.item()

                class_names = ["Healthy Control", "Parkinson's Disease"]
                print(f"Sample {i}: True: {class_names[label.item()]}, "
                      f"Predicted: {class_names[pred]}, Confidence: {conf_score:.4f}")

                if i < 3:  # Visualize first 3 samples
                    visualize_region(dataset, idx=i)

    except FileNotFoundError:
        print("No pre-trained model found. Please train the model first.")


def compare_models(dataset, device=None):
    """
    Compare different model architectures on the same dataset
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Create the models
    models = {
        "Self-Adaptive CNN": AutoAdaptCNNClassifier(pretrained=True).to(device),
        "Simple CNN": SelfAdaptiveCNN(in_channels=1, num_classes=2).to(device),
        "ResNet3D": CustomResNet3D(pretrained=True).to(device)
    }

    # Create a small validation set
    from torch.utils.data import random_split
    val_size = min(int(len(dataset) * 0.2), 20)  # 20% or max 20 samples
    train_size = len(dataset) - val_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

    # Create validation dataloader
    from torch.utils.data import DataLoader
    val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

    # Compare inference speed and memory usage
    results = {}

    for name, model in models.items():
        # Set to evaluation mode
        model.eval()

        # Measure memory usage
        start_mem = torch.cuda.memory_allocated() if torch.cuda.is_available() else 0

        # Measure inference time
        import time
        start_time = time.time()

        all_preds = []
        all_labels = []
        all_probs = []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward pass
                if name == "Self-Adaptive CNN":
                    outputs, _ = model(inputs)
                    probs = torch.softmax(outputs, dim=1)
                else:
                    outputs = model(inputs)
                    probs = torch.softmax(outputs, dim=1)

                _, preds = torch.max(outputs, 1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
                if probs.shape[1] > 1:
                    all_probs.extend(probs[:, 1].cpu().numpy())  # Probability of class 1
                else:
                    all_probs.extend(probs.cpu().numpy())

        # Calculate metrics
        from sklearn.metrics import accuracy_score, roc_auc_score
        accuracy = accuracy_score(all_labels, all_preds)
        auc_score = roc_auc_score(all_labels, all_probs)

        inference_time = time.time() - start_time
        mem_used = (torch.cuda.memory_allocated() - start_mem) / 1024**2 if torch.cuda.is_available() else 0

        results[name] = {
            'accuracy': accuracy,
            'auc': auc_score,
            'inference_time': inference_time,
            'memory_mb': mem_used
        }

    # Display results
    print("\nModel Comparison Results:")
    print(f"{'Model':<20} {'Accuracy':<10} {'AUC':<10} {'Time (s)':<10} {'Memory (MB)':<15}")
    print("-" * 65)
    for name, metrics in results.items():
        print(f"{name:<20} {metrics['accuracy']*100:.2f}% {metrics['auc']:.4f} {metrics['inference_time']:.4f}s {metrics['memory_mb']:.2f}")

    # Plot ROC curves
    plt.figure(figsize=(10, 8))

    for name, model in models.items():
        model.eval()
        all_labels = []
        all_probs = []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward pass
                if name == "Self-Adaptive CNN":
                    outputs, _ = model(inputs)
                    probs = torch.softmax(outputs, dim=1)
                else:
                    outputs = model(inputs)
                    probs = torch.softmax(outputs, dim=1)

                all_labels.extend(labels.cpu().numpy())
                if probs.shape[1] > 1:
                    all_probs.extend(probs[:, 1].cpu().numpy())  # Probability of class 1
                else:
                    all_probs.extend(probs.cpu().numpy())

        # Calculate ROC curve
        fpr, tpr, _ = roc_curve(all_labels, all_probs)
        roc_auc = auc(fpr, tpr)

        # Plot ROC curve
        plt.plot(fpr, tpr, label=f'{name} (AUC = {roc_auc:.4f})')

    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curves')
    plt.legend(loc="lower right")
    plt.show()


if __name__ == "__main__":
    main()


Overall Results
Found 49 healthy control samples and 49 Parkinson's disease samples


NameError: name 'avg_val_acc' is not defined