In [2]:
import numpy as np
import pandas as pd

#import torch
#import torch.nn as nn
#import torch.optim as optim
#from torch.utils.data import Dataset, DataLoader

from scipy.ndimage import rotate, shift,zoom , map_coordinates, gaussian_filter
import random
import os 
import glob
import ast

import matplotlib.pyplot as plt
#import pydicom

from tqdm import tqdm

In [3]:
def get_instance_number(patient_path):

    numero_patient=os.path.basename(patient_path)
    numero_coupe=df_loc[df_loc['SeriesInstanceUID']==numero_patient]['SOPInstanceUID'].iloc[0]
    exemple_path = os.path.join(series_path,
                                  f"{numero_patient}/{numero_coupe}.dcm")
    #print('Récupération des coordonnées du voxel...')
    ds = pydicom.dcmread(exemple_path)
    
    InstanceNumber=ds.InstanceNumber

    return int(InstanceNumber)

In [4]:
def coordonnee_z(patient_path,InstanceNumber=163):
    
    dicom_files = sorted(glob.glob(patient_path+'/*.dcm'))
    slices = [pydicom.dcmread(f) for f in dicom_files]
    #tri des slices par instance number
    slices.sort(key=lambda s: int(s.InstanceNumber))

    # Trouver l’indice z dans le volume
    z_index = [i for i, s in enumerate(slices) if int(s.InstanceNumber) == InstanceNumber][0]

    return z_index

In [5]:
def get_center(series_path,patient_path,df_loc):

    numero_patient=os.path.basename(patient_path)
    numero_coupe=df_loc[df_loc['SeriesInstanceUID']==numero_patient]['SOPInstanceUID'].iloc[0]
    
    InstanceNumber=get_instance_number(patient_path)
    z=coordonnee_z(patient_path,InstanceNumber)

    coord_str = df_loc[df_loc['SOPInstanceUID'] == numero_coupe]['coordinates'].iloc[0]
    coord_dict = ast.literal_eval(coord_str)
    x = coord_dict['y']
    y = coord_dict['x'] #fait exprès

    center=np.array([x,y,z])
    return center

In [6]:
def get_pixelspacing(path):
    """
    Récupère l'espacement voxel (mm) depuis un fichier DICOM.
    Retourne (row_spacing, col_spacing, slice_thickness)
    """
    dcm = pydicom.dcmread(path)

    # Espacement dans le plan (mm/pixel)
    if "PixelSpacing" in dcm:
        row_spacing, col_spacing = [float(x) for x in dcm.PixelSpacing]
    else:
        row_spacing, col_spacing = None, None

    # Épaisseur de coupe (mm)
    slice_thickness = float(getattr(dcm, "SliceThickness", 1.0))

    return row_spacing, col_spacing, slice_thickness

In [None]:
def get_patient_ID(patient_path):
    return str(os.path.basename(patient_path))

In [None]:
def get_position(df_train,patient_path):
    positions=['Left Infraclinoid Internal Carotid Artery',
       'Right Infraclinoid Internal Carotid Artery',
       'Left Supraclinoid Internal Carotid Artery',
       'Right Supraclinoid Internal Carotid Artery',
       'Left Middle Cerebral Artery', 'Right Middle Cerebral Artery',
       'Anterior Communicating Artery', 'Left Anterior Cerebral Artery',
       'Right Anterior Cerebral Artery', 'Left Posterior Communicating Artery',
       'Right Posterior Communicating Artery', 'Basilar Tip',
       'Other Posterior Circulation']
    row=df_train[df_train["SeriesInstanceUID"] == get_patient_ID(patient_path)][positions].values.flatten()
    return row

In [None]:
def show_middle_slices(volume):
    # volume shape : (X, Y, Z)

    mid_x = volume.shape[0] // 2
    mid_y = volume.shape[1] // 2

    mid_z = volume.shape[2] // 2


    fig, axes = plt.subplots(1, 3, figsize=(12, 4))

    # Coupe axiale (XY plane à profondeur z)
    axes[0].imshow(volume[:, :, mid_z].T, cmap='gray')  # transpose pour que X horizontal, Y vertical
    axes[0].set_title(f'Axiale (z={mid_z})')
    axes[0].axis('on')

    # Coupe coronale (XZ plane à coordonnée y)
    axes[1].imshow(volume[:, mid_y, :].T, cmap='gray')  # transpose pour X horizontal, Z vertical
    axes[1].set_title(f'Coronale (y={mid_y})')
    axes[1].axis('on')

    # Coupe sagittale (YZ plane à coordonnée x)
    axes[2].imshow(volume[mid_x, :, :].T, cmap='gray')  # transpose pour Y horizontal, Z vertical
    axes[2].set_title(f'Sagittale (x={mid_x})')
    axes[2].axis('on')

    plt.tight_layout()
    plt.show()

In [None]:
def show_slice_with_point(volume, coord, plane="axial"):
    x, y, z = coord.astype(int)
    fig, ax = plt.subplots(1, 2, figsize=(10, 5))

    if plane == "axial":       # plan XY à profondeur z

        img = volume[:, :, z].T
        ax[0].imshow(img, cmap="gray")
        ax[1].imshow(img, cmap="gray")
        ax[1].scatter(x, y, c="r", s=40, marker="x")
        title = f"Axial z={z}"

    elif plane == "sagittal":  # plan YZ à abscisse x
        img = volume[x, :, :].T
        ax[0].imshow(img, cmap="gray")
        ax[1].imshow(img, cmap="gray")
        ax[1].scatter(y, z, c="r", s=40, marker="x")

        title = f"Sagittal x={x}"

    elif plane == "coronal":   # plan XZ à ordonnée y
        img = volume[:, y, :].T
        ax[0].imshow(img, cmap="gray")
        ax[1].imshow(img, cmap="gray")
        ax[1].scatter(x, z, c="r", s=40, marker="x")
        title = f"Coronal y={y}"

    else:
        raise ValueError("plane doit être 'axial', 'sagittal' ou 'coronal'")

    for a in ax:
        a.set_title(title)
        a.axis("on")
    plt.tight_layout()
    plt.show()

In [None]:
def ajouter_Modality(df_main,df_info):
    df_merged = df_main.merge(
        df_info[['SeriesInstanceUID', 'Modality']],
        on='SeriesInstanceUID',
        how='left'  # conserve toutes les lignes de df_main
    )
    return df_merged

In [None]:
def dicom_to_numpy(patient_path):
    
    dicom_files = sorted(glob.glob(patient_path+'/*.dcm'))
    slices = [pydicom.dcmread(f) for f in dicom_files]

    #tri des slices par instance number
    slices.sort(key=lambda s: int(s.InstanceNumber))

    
    # On empile les pixel_array en un volume 3D NumPy (X,Y,Z)
    target_shape = slices[0].pixel_array.shape

    slices = [s for s in slices if s.pixel_array.shape == target_shape]
    volume = np.stack([s.pixel_array for s in slices], axis=-1)

    # Récupération du spacing réel
    pixel_spacing = slices[0].PixelSpacing
    dx, dy = pixel_spacing
    dz = getattr(slices[0], 'SliceThickness', 1.0)  # fallback si manquant
    
    return volume, (dx,dy,dz)

def resample(volume, spacing, target_spacing=(0.4, 0.4, 0.4)):
    zoom_factors = [s / t for s, t in zip(spacing, target_spacing)]
    new_volume = zoom(volume, zoom_factors, order=1)

    return new_volume

def crop(volume):

    # On crée un masque des voxels non nuls
    mask = volume > (volume.max() * 0.1)
    if not mask.any():
        return volume  # rien à couper
    
    # On récupère les indices min/max pour chaque dimension
    x_min, x_max = mask.any(axis=(1,2)).nonzero()[0][[0, -1]] #axe_x
    y_min, y_max = mask.any(axis=(0,2)).nonzero()[0][[0, -1]] #axe_y
    z_min, z_max = mask.any(axis=(0,1)).nonzero()[0][[0, -1]] #axe_z
    
    # Crop
    cropped = volume[x_min:x_max+1, y_min:y_max+1, z_min:z_max+1]
    return cropped, (x_min,y_min,z_min)

def normalization(volume):
    
    v_min, v_max = volume.min(), volume.max()
    if v_max > v_min:  # éviter la division par zéro
        volume = (volume - v_min) / (v_max - v_min)
    else:
        volume = np.zeros_like(volume)
    return volume

In [None]:
def resample_coordonnees(spacing, coords, target_spacing=0.4):
    x,y,z=coords
    if isinstance(target_spacing, (int, float)):
        target_spacing = (target_spacing, target_spacing, target_spacing)
    
    # coord physiques
    coords_mm = np.array([x*spacing[0], y*spacing[1], z*spacing[2]])
    # nouveaux indices
    new_voxel = coords_mm / np.array(target_spacing)
    return new_voxel

In [None]:
def preprocessing_volume_and_coords(series_path,patient_path,df_loc):
    
    coords=get_center(series_path,patient_path,df_loc)
    volume,spacing = dicom_to_numpy(patient_path)
    
    resample_volume = resample(volume, spacing,target_spacing=(0.4, 0.4, 0.4))
    resample_coords=resample_coordonnees(spacing, coords,target_spacing=0.4)
    
    crop_volume, crop_indices = crop(resample_volume)
    crop_coords=resample_coords - crop_indices
    
    norm_volume = normalization(crop_volume)

    return norm_volume,crop_coords

In [None]:
## Pour la fonction d'inférence
def preprocessing_volume(patient_path):
    
    volume,spacing = dicom_to_numpy(patient_path)
    resample_volume = resample(volume, spacing,target_spacing=(0.4, 0.4, 0.4))
    crop_volume, crop_indices = crop(resample_volume)
    new_volume = normalization(crop_volume)

    return new_volume

In [None]:
def random_deformation(volume, grid_size=3, max_displacement=3):

    shape = volume.shape
    assert len(shape) == 3, "Le volume doit être 3D"


    # Coordonnées originales du volume
    dz, dy, dx = shape
    z, y, x = np.meshgrid(

        np.linspace(0, dz-1, shape[0]),
        np.linspace(0, dy-1, shape[1]),
        np.linspace(0, dx-1, shape[2]),
        indexing="ij"
    )

    # Génère une grille de points de contrôle (par ex. 3x3x3)
    grid_z = np.linspace(0, dz-1, grid_size)
    grid_y = np.linspace(0, dy-1, grid_size)
    grid_x = np.linspace(0, dx-1, grid_size)
    control_points = np.meshgrid(grid_z, grid_y, grid_x, indexing="ij")

    # Crée un champ de déplacements aléatoires
    # (même taille que la grille de contrôle)
    displacement = [
        np.random.uniform(-max_displacement, max_displacement, size=(grid_size,grid_size,grid_size))
        for _ in range(3)
    ]

    # Interpolation spline cubique du champ de déplacement


    from scipy.interpolate import RegularGridInterpolator
    disp_interp = [
        RegularGridInterpolator((grid_z, grid_y, grid_x), d, bounds_error=False, fill_value=0)
        for d in displacement
    ]

    # Calcule le champ de coordonnées déformées
    coords = np.array([z, y, x])
    dz_new = disp_interp[0](np.array([z.flatten(), y.flatten(), x.flatten()]).T).reshape(shape)
    dy_new = disp_interp[1](np.array([z.flatten(), y.flatten(), x.flatten()]).T).reshape(shape)
    dx_new = disp_interp[2](np.array([z.flatten(), y.flatten(), x.flatten()]).T).reshape(shape)

    z_new = z + dz_new
    y_new = y + dy_new
    x_new = x + dx_new

    # Applique la déformation au volume
    deformed = map_coordinates(volume, [z_new, y_new, x_new], order=3, mode='reflect')

    return deformed

In [None]:
def data_augmentation(volume,n_aug):

    augmented_volumes = []
    for i in range(n_aug):
        vol_def = random_deformation(volume)
        augmented_volumes.append(vol_def)
    
    augmented_volumes = np.stack(augmented_volumes)  # shape = (12, 48, 48, 48)
    return augmented_volumes

In [None]:
def dataset_augmented(liste_volumes,n_aneurysm):
    liste_cubes = []
     
    for i in tqdm(range(len(liste_volumes))):
        volume = liste_volumes[i]  # shape (48,48,48)

        # data_augmentation doit renvoyer (n, 48, 48, 48)
        aug = data_augmentation(volume,n_aneurysm)
        
        liste_cubes.append(aug)

    # Concaténer tous les résultats en un seul tableau
    liste_cubes = np.concatenate(liste_cubes, axis=0)  # shape (N*n, 48, 48, 48)
    return liste_cubes