In [44]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split, Subset
from torchvision import transforms
from PIL import Image
import numpy as np
import torch.nn.functional as F
import re
from sklearn.model_selection import train_test_split
import pydicom
import pandas as pd
import cv2

In [46]:
def clean_and_fill_mask(mask):
    # Ensure mask is a NumPy array
    if isinstance(mask, torch.Tensor):
        mask = mask.numpy()

    mask = mask.astype(np.uint8)
    kernel = np.ones((5, 5), np.uint8)
    mask = cv2.dilate(mask, kernel, iterations=1)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    return mask

def apply_windowing(dcm_data, window_center, window_width):
    """
    Apply windowing to a DICOM image.
    
    :param dcm_data: pydicom Dataset
    :param window_center: Center of the window
    :param window_width: Width of the window
    :return: Windowed image as a numpy array
    """
    intercept = dcm_data.RescaleIntercept if 'RescaleIntercept' in dcm_data else 0
    slope = dcm_data.RescaleSlope if 'RescaleSlope' in dcm_data else 1
    image = dcm_data.pixel_array * slope + intercept

    min_intensity = (window_center - window_width / 2)
    max_intensity = (window_center + window_width / 2)

    windowed_image = np.clip(image, min_intensity, max_intensity)
    windowed_image = ((windowed_image - min_intensity) / (max_intensity - min_intensity)) * 255.0
    return windowed_image.astype(np.uint8)

class HemorrhagicDataset(Dataset):
    def __init__(self, root_dir, csv_file, transform=None, train=True, split_ratio=0.7):
        self.root_dir = root_dir
        self.transform = transform
        self.diagnosis = pd.read_csv(csv_file)
        
        # Gather all patient directories
        self.all_patients = sorted(os.listdir(os.path.join(self.root_dir, "Patients_CT")))
        
        # Split the patient directories into train and validation sets
        split_index = int(len(self.all_patients) * split_ratio)
        if train:
            self.patient_set = self.all_patients[:split_index]
        else:
            self.patient_set = self.all_patients[split_index:]
        
        self.slices = self._gather_slices()

    def _gather_slices(self):
        slices = []
        patients_dir = os.path.join(self.root_dir, "Patients_CT")
        for patient_number in os.listdir(patients_dir):
            patient_dir = os.path.join(patients_dir, patient_number, "brain")
            if os.path.exists(patient_dir):
                for file_name in os.listdir(patient_dir):
                    if file_name.endswith(".jpg") and "_HGE_Seg" not in file_name:
                        slice_number = file_name.split('.')[0]
                        slices.append((patient_dir, patient_number, slice_number))
        return slices

    def __len__(self):
        return len(self.slices)

    def __getitem__(self, idx):
        patient_dir, patient_number, slice_number = self.slices[idx]
        image_path = os.path.join(patient_dir, f"{slice_number}.jpg")
        mask_path = os.path.join(patient_dir, f"{slice_number}_HGE_Seg.jpg")

        # Load the image and mask
        image = Image.open(image_path).convert("L")
        if os.path.exists(mask_path):
            mask = Image.open(mask_path).convert("L")
        else:
            mask = Image.new("L", image.size)

        diag_row = self.diagnosis[(self.diagnosis['PatientNumber'] == int(patient_number))
                                  & (self.diagnosis['SliceNumber'] == int(slice_number))]
        label = "hemorrhagic" if not diag_row.empty and diag_row.iloc[0]['No_Hemorrhage'] == 0 else "normal"

        # mask = (mask != 0).astype(np.float32)
        
        # Convert images and masks to PyTorch tensors
        image = transforms.ToTensor()(image)
        mask = transforms.ToTensor()(mask)

        
        sample = {'image': image, 'mask': mask, 'label': label, 'original_type': 'image'}

        if self.transform:
            sample['image'] = self.transform(sample['image'])
            sample['mask'] = self.transform(sample['mask'])

        return sample

In [56]:
hemo_root_dir = "computed-tomography-images-for-intracranial-hemorrhage-detection-and-segmentation-1.0.0"

In [57]:
transform = transforms.Compose([
    transforms.Resize((512, 512)),
])

In [58]:
csv_file = os.path.join(hemo_root_dir, "hemorrhage_diagnosis.csv")


In [59]:
transform = transforms.Compose([
    transforms.Resize((512, 512)),
])

In [60]:
csv_file = os.path.join(hemo_root_dir, "hemorrhage_diagnosis.csv")


In [61]:
hemo_val_dataset = HemorrhagicDataset(root_dir=hemo_root_dir, csv_file=csv_file, transform=transform, train=False)