<a href="https://colab.research.google.com/github/Soutrik-Chakraborty/cardiac-modeling/blob/main/Untitled7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pydicom nibabel opencv-python numpy torch torchvision scikit-image vedo

Collecting pydicom
  Downloading pydicom-3.0.1-py3-none-any.whl.metadata (9.4 kB)
Collecting vedo
  Downloading vedo-2025.5.4-py3-none-any.whl.metadata (14 kB)
Collecting vtk (from vedo)
  Downloading vtk-9.5.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (5.5 kB)
Downloading pydicom-3.0.1-py3-none-any.whl (2.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m92.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading vedo-2025.5.4-py3-none-any.whl (2.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading vtk-9.5.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (112.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.1/112.1 MB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pydicom, vtk, vedo
Successfully installed pydicom-3.0.1 vedo-2025.5.4 vtk-9.5.0


In [2]:
import os
import zipfile
import glob
import shutil
import numpy as np
import pydicom
import nibabel as nib
import cv2
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from scipy.ndimage import zoom
from skimage import measure
from vedo import Plotter, Mesh, Volume

In [3]:
def unzip_rvsc_dataset(zip_path, extract_to):
    if not os.path.isfile(zip_path):
        raise FileNotFoundError(f"Zip file not found at: {zip_path}")
    os.makedirs(extract_to, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"\u2705 Extracted dataset to: {extract_to}")

In [4]:
def load_dicom_series(dicom_folder):
    slices = []
    for fname in sorted(os.listdir(dicom_folder)):
        if fname.endswith(".dcm"):
            path = os.path.join(dicom_folder, fname)
            dcm = pydicom.dcmread(path)
            slices.append((dcm, int(fname.split('-')[1].split('.')[0])))
    slices.sort(key=lambda x: x[1])
    images = np.stack([s.pixel_array for s, _ in slices])
    spacing = float(slices[0][0].PixelSpacing[0])
    # Calculate slice thickness. Handle potential missing or inconsistent SliceLocation
    thickness = 0.0
    if len(slices) > 1 and hasattr(slices[0][0], 'SliceLocation') and hasattr(slices[1][0], 'SliceLocation'):
        thickness = abs(slices[1][0].SliceLocation - slices[0][0].SliceLocation)
    if thickness == 0.0 and hasattr(slices[0][0], 'SliceThickness'):
         thickness = float(slices[0][0].SliceThickness)
    if thickness == 0.0:
        print("Warning: Could not determine slice thickness from DICOM headers. Using a default value of 1.0.")
        thickness = 1.0 # Default value if thickness cannot be determined


    return images, spacing, thickness, [s[1] for s in slices]

def load_contour_file(file_path):
    with open(file_path, "r") as f:
        coords = [list(map(float, line.strip().split())) for line in f if line.strip()]
    if not coords:
        return np.array([], dtype=np.int32)
    return np.array(coords, dtype=np.int32)

def create_mask(image_shape, contour):
    mask = np.zeros(image_shape, dtype=np.uint8)
    if contour.size > 0:
        contour = contour.reshape((-1, 1, 2)).astype(np.int32)
        cv2.fillPoly(mask, [contour], color=1)
    return mask

def contours_to_mask(contour_folder, dicom_numbers, image_shape, label='i'):
    mask_volume = np.zeros((len(dicom_numbers), *image_shape), dtype=np.uint8)
    for idx, num in enumerate(dicom_numbers):
        fname = f"{label}contour-manual.txt"
        files = glob.glob(os.path.join(contour_folder, f"*{num:04d}*{fname}"))
        if files:
            contour = load_contour_file(files[0])
            mask = create_mask(image_shape, contour)
            mask_volume[idx] = mask
    return mask_volume

def save_as_nifti(volume, output_path, spacing, slice_thickness):
    affine = np.diag([spacing, spacing, slice_thickness, 1])
    # Ensure determinant is not zero for affine decomposition
    if np.linalg.det(affine) == 0:
        print(f"Warning: Affine determinant is zero. Using identity matrix with spacing for NIfTI saving.")
        affine = np.diag([spacing, spacing, slice_thickness, 1]) # Recreate affine just in case, though logic above should prevent det=0
        # Fallback to a robust affine if the above still fails - this might not be ideal for all data but prevents crashing
        if np.linalg.det(affine) == 0:
             affine = np.eye(4)
             affine[0,0] = spacing
             affine[1,1] = spacing
             affine[2,2] = slice_thickness

    nib.save(nib.Nifti1Image(volume.astype(np.int16), affine), output_path)


def convert_all_patients_to_nifti(input_root, output_root):
    os.makedirs(output_root, exist_ok=True)
    patient_dirs = sorted([d for d in os.listdir(input_root) if d.startswith("patient")])
    for patient in patient_dirs:
        p_path = os.path.join(input_root, patient)
        p_num = patient.replace("patient", "")
        dicom_folder = os.path.join(p_path, f"P{p_num}dicom")
        contour_folder = os.path.join(p_path, f"P{p_num}contours-manual")
        output_dir = os.path.join(output_root, f"P{p_num}")
        os.makedirs(output_dir, exist_ok=True)
        try:
            img, spc, thk, nums = load_dicom_series(dicom_folder)
            msk = contours_to_mask(contour_folder, nums, img.shape[1:], label='i')
            save_as_nifti(img, os.path.join(output_dir, "image.nii.gz"), spc, thk)
            save_as_nifti(msk, os.path.join(output_dir, "mask.nii.gz"), spc, thk)
        except Exception as e:
            print(f"❌ Error processing {patient}: {e}")

In [5]:
def convert_nii_gz_to_nii(folder):
    nii_gz_files = glob.glob(os.path.join(folder, "**", "*.nii.gz"), recursive=True)
    for gz in nii_gz_files:
        data = nib.load(gz)
        nii_path = gz.replace(".nii.gz", ".nii")
        nib.save(data, nii_path)
        os.remove(gz)
        print(f"Converted and removed: {gz}")

In [6]:
def organize_dataset_for_training(nifti_root, output_root):
    os.makedirs(os.path.join(output_root, "train", "images"), exist_ok=True)
    os.makedirs(os.path.join(output_root, "train", "masks"), exist_ok=True)
    patient_folders = sorted(glob.glob(os.path.join(nifti_root, "P*")))
    for i, pf in enumerate(patient_folders):
        img = glob.glob(os.path.join(pf, "image.nii"))[0]
        msk = glob.glob(os.path.join(pf, "mask.nii"))[0]
        pid = f"P{str(i+1).zfill(2)}"
        shutil.copy(img, os.path.join(output_root, "train", "images", f"{pid}_image.nii"))
        shutil.copy(msk, os.path.join(output_root, "train", "masks", f"{pid}_mask.nii"))
    print("✅ Dataset organized for training.")

In [7]:
#New Code and run it
class HVSMR3DDataset(Dataset):
    def __init__(self, root, split="train"):
        images = sorted(glob.glob(os.path.join(root, split, "images", "*.nii")))
        masks  = sorted(glob.glob(os.path.join(root, split, "masks", "*.nii")))
        self.pairs = list(zip(images, masks))

    def __len__(self): return len(self.pairs)

    def __getitem__(self, idx):
        img_path, msk_path = self.pairs[idx]
        img = nib.load(img_path).get_fdata().astype(np.float32)
        msk = nib.load(msk_path).get_fdata().astype(np.int64)

        # Resize to a fixed size, e.g., 128x128x128
        target_shape = (128, 128, 128)

        # Ensure zoom factors are calculated correctly
        zoom_factors = np.array(target_shape) / np.array(img.shape)

        # Interpolation: order=1 for image (trilinear), order=0 for mask (nearest neighbor)
        img = zoom(img, zoom_factors, order=1)
        msk = zoom(msk, zoom_factors, order=0)

        # Normalization
        img = (img - img.mean()) / (img.std() + 1e-8) # Add epsilon to avoid division by zero

        # Add channel dimension
        return torch.from_numpy(img).unsqueeze(0), torch.from_numpy(msk)

# Helper module for the U-Net
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""
    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv3d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm3d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv3d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm3d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

# Improved U-Net with skip connections and interpolation-based upsampling
class UNet3D(nn.Module):
    def __init__(self, in_ch=1, out_ch=2):
        super(UNet3D, self).__init__()

        # Encoder Path
        self.inc = DoubleConv(in_ch, 64)
        self.down1 = nn.Sequential(nn.MaxPool3d(2), DoubleConv(64, 128))
        self.down2 = nn.Sequential(nn.MaxPool3d(2), DoubleConv(128, 256))

        # Decoder Path (uses interpolation)
        self.up1 = nn.Upsample(scale_factor=2, mode='trilinear', align_corners=True)
        self.conv1 = DoubleConv(256 + 128, 128)

        self.up2 = nn.Upsample(scale_factor=2, mode='trilinear', align_corners=True)
        self.conv2 = DoubleConv(128 + 64, 64)

        # Output layer
        self.outc = nn.Conv3d(64, out_ch, kernel_size=1)

    def forward(self, x):
        # Encoder
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)

        # Decoder with skip connections
        x = self.up1(x3)
        # Concatenate skip connection from encoder
        x = torch.cat([x, x2], dim=1)
        x = self.conv1(x)

        x = self.up2(x)
        # Concatenate skip connection from encoder
        x = torch.cat([x, x1], dim=1)
        x = self.conv2(x)

        logits = self.outc(x)
        return logits

def train_model(root):
    train_ds = HVSMR3DDataset(root, "train")
    # Using batch_size > 1 is recommended if GPU memory allows
    train_loader = DataLoader(train_ds, batch_size=1, shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    model = UNet3D()
    model.to(device)

    # It's good practice to use a slightly more robust optimizer and scheduler
    optim = torch.optim.Adam(model.parameters(), lr=1e-4) # Lower learning rate
    loss_fn = nn.CrossEntropyLoss()

    # Increased number of epochs for better convergence
    for epoch in range(25): # Increased from 10 to 25
        model.train()
        epoch_loss = 0
        for img, msk in train_loader:
            img, msk = img.to(device), msk.to(device)

            pred = model(img)
            loss = loss_fn(pred, msk)

            optim.zero_grad()
            loss.backward()
            optim.step()
            epoch_loss += loss.item()

        avg_loss = epoch_loss / len(train_loader)
        print(f"Epoch {epoch+1}/25 completed. Average Loss: {avg_loss:.4f}")

    torch.save(model.state_dict(), "rvsc_3d_unet.pth")
    print("✅ Model training complete and saved.")
    return model

In [None]:
#Old Code
#Do not run it
class HVSMR3DDataset(Dataset):
    def __init__(self, root, split="train"):
        images = sorted(glob.glob(os.path.join(root, split, "images", "*.nii")))
        masks  = sorted(glob.glob(os.path.join(root, split, "masks", "*.nii")))
        self.pairs = list(zip(images, masks))

    def __len__(self): return len(self.pairs)

    def __getitem__(self, idx):
        img = nib.load(self.pairs[idx][0]).get_fdata().astype(np.float32)
        msk = nib.load(self.pairs[idx][1]).get_fdata().astype(np.int64)
        # Resize to a fixed size, e.g., 128x128x128
        target_shape = (128, 128, 128)
        img = zoom(img, np.array(target_shape) / img.shape, order=1) # Use order=1 for image interpolation
        msk = zoom(msk, np.array(target_shape) / msk.shape, order=0) # Use order=0 for mask (nearest neighbor)
        img = (img - img.mean()) / img.std()
        return torch.unsqueeze(torch.from_numpy(img),0), torch.from_numpy(msk)

class UNet3D(nn.Module):
    def __init__(self, in_ch=1, out_ch=2):
        super().__init__()
        self.enc = nn.Sequential(
            nn.Conv3d(in_ch,16,3,padding=1), nn.ReLU(),
            nn.Conv3d(16,32,3,padding=1), nn.ReLU())
        self.pool = nn.MaxPool3d(2)
        self.dec = nn.Sequential(
            nn.ConvTranspose3d(32,16,2,stride=2), nn.ReLU(),
            nn.Conv3d(16,out_ch,1))

    def forward(self, x):
        x1 = self.enc(x)
        x2 = self.pool(x1)
        x3 = self.dec(x2)
        return x3

def train_model(root):
    train_ds = HVSMR3DDataset(root, "train")
    train_loader = DataLoader(train_ds, batch_size=1, shuffle=True)
    model = UNet3D()
    optim = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.CrossEntropyLoss()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(10):
        model.train()
        for img, msk in train_loader:
            img, msk = img.to(device), msk.to(device)
            pred = model(img)
            loss = loss_fn(pred, msk)
            optim.zero_grad(); loss.backward(); optim.step()
        print(f"Epoch {epoch+1} completed.")

    torch.save(model.state_dict(), "rvsc_3d_unet.pth")
    return model

In [8]:
def generate_3d_mesh_from_nii(nii_path, label=1):
    img = nib.load(nii_path).get_fdata()
    verts, faces, normals, _ = measure.marching_cubes(img == label, level=0.5)
    mesh = Mesh([verts, faces]).c("cyan").alpha(0.5)
    plt = Plotter(title="3D Mesh", axes=1, bg='black')
    plt.show(mesh)

In [9]:
def main():
    zip_path = "/content/RVSC.zip"
    extract_to = "/content/RVSC_unzipped"
    unzip_rvsc_dataset(zip_path, extract_to)

    # List contents of the extracted directory to diagnose the error
    print("Contents of extracted directory:")
    print(os.listdir(extract_to))

    convert_all_patients_to_nifti(
        input_root=os.path.join(extract_to, "RVSC", "TrainingSet"),
        output_root="/content/nifti_output"
    )

    convert_nii_gz_to_nii("/content/nifti_output")
    organize_dataset_for_training("/content/nifti_output", "/content/rvsc_dataset")

    model = train_model("/content/rvsc_dataset")

    sample_mask = glob.glob("/content/rvsc_dataset/train/masks/*.nii")[0] # Moved this line
    generate_3d_mesh_from_nii(sample_mask, label=1)
main()

✅ Extracted dataset to: /content/RVSC_unzipped
Contents of extracted directory:
['RVSC']
Converted and removed: /content/nifti_output/P10/mask.nii.gz
Converted and removed: /content/nifti_output/P10/image.nii.gz
Converted and removed: /content/nifti_output/P04/mask.nii.gz
Converted and removed: /content/nifti_output/P04/image.nii.gz
Converted and removed: /content/nifti_output/P07/mask.nii.gz
Converted and removed: /content/nifti_output/P07/image.nii.gz
Converted and removed: /content/nifti_output/P14/mask.nii.gz
Converted and removed: /content/nifti_output/P14/image.nii.gz
Converted and removed: /content/nifti_output/P16/mask.nii.gz
Converted and removed: /content/nifti_output/P16/image.nii.gz
Converted and removed: /content/nifti_output/P05/mask.nii.gz
Converted and removed: /content/nifti_output/P05/image.nii.gz
Converted and removed: /content/nifti_output/P08/mask.nii.gz
Converted and removed: /content/nifti_output/P08/image.nii.gz
Converted and removed: /content/nifti_output/P13/m

In [10]:
# Get the path to a sample image file from the organized dataset
sample_image = glob.glob("/content/rvsc_dataset/train/images/*.nii")[0]

# Get the path to the ground truth mask file for the sample image
# Assumes the sample_image variable is still holding the path to the image file
# We need to find the corresponding mask file.
# Based on the organize_dataset_for_training function, the mask file has the same prefix
# as the image file but is in the 'masks' directory.
sample_image_filename = os.path.basename(sample_image)
sample_mask_filename = sample_image_filename.replace("_image.nii", "_mask.nii")
sample_mask_path = os.path.join("/content/rvsc_dataset/train/masks", sample_mask_filename)

def generate_3d_mesh_from_prediction(model_path, image_path, label=1, save_path=None):
    model = UNet3D()
    model.load_state_dict(torch.load(model_path))
    model.eval()
    img = nib.load(image_path).get_fdata().astype(np.float32)
    # Preprocess the image similar to the dataset class
    target_shape = (128, 128, 128)
    img = zoom(img, np.array(target_shape) / img.shape, order=1)
    img = (img - img.mean()) / img.std()
    img = torch.unsqueeze(torch.unsqueeze(torch.from_numpy(img), 0), 0) # Add batch and channel dims

    with torch.no_grad():
        output = model(img)
        pred = torch.argmax(output, dim=1).squeeze(0).numpy().astype(np.int16) # Cast to int16

    print(f"Prediction data range: min={pred.min()}, max={pred.max()}") # Print the data range

    # Adjust the level for marching cubes based on the range of prediction values
    # The prediction is a mask with values 0 or 1.
    # The level should be between 0 and 1 to create a surface at the boundary.
    # A level of 0.5 is appropriate for binary masks.

    # Check if the target label is present in the prediction
    if label not in np.unique(pred):
        print(f"Warning: Target label {label} not found in the prediction. Cannot generate mesh.")
        return # Exit the function if the label is not found

    verts, faces, normals, _ = measure.marching_cubes(pred == label, level=0.5)

    mesh = Mesh([verts, faces]).c("cyan").alpha(0.5)
    if save_path:
        mesh.write(save_path)
        print(f"Saved mesh to {save_path}")
    plt = Plotter(title="3D Mesh from Prediction", axes=1, bg='black')
    plt.show(mesh)


# Generate the mesh from the ground truth mask
generate_3d_mesh_from_nii(sample_mask_path, label=1)

# Generate the mesh from the model prediction
generate_3d_mesh_from_prediction(
    "/content/rvsc_3d_unet.pth",
    sample_image,
    label=1,
    save_path="/content/predicted_heart.ply" )
# or .stl

Prediction data range: min=0, max=0


In [None]:
#import glob
#import os

# Get the path to a sample image file from the organized dataset
#sample_image = glob.glob("/content/rvsc_dataset/train/images/*.nii")[0]

#generate_3d_mesh_from_prediction(
#    "/content/rvsc_3d_unet.pth",
#    sample_image,
#    label=1,
#    save_path="/content/predicted_heart.ply" )
# or .stl