# **ETL -IMAGES**

In [1]:
# [Config] Librerías
import pandas as pd
import os
import nibabel as nib
import re
import nibabel as nib
import pandas as pd
from datetime import datetime
import subprocess
import numpy as np


In [2]:
# [Config] Rutas
base_dir  = r"C:\Users\Hp\MACHINE\MRI\DATA"

output_dir  = r"C:\Users\Hp\MACHINE\MRI\IMAGES"
dcm2niix_path = r"C:\Users\Hp\dcm2niix_win\dcm2niix.exe"

df_clinical = pd.read_csv(r"C:\Users\Hp\MACHINE\MRI\ADNIMERGE.csv", low_memory=False)

In [3]:
# Atributos clínicos considerados
atributes = [
    # Identificación
    "PTID","VISCODE","EXAMDATE","DX",
    # Demografía
    "AGE","PTGENDER","PTEDUCAT",
    # Genética
    "APOE4",
    # Cognitivas
    "CDRSB","MMSE","ADAS13","FAQ",
    "RAVLT_immediate","RAVLT_learning","RAVLT_forgetting",
    "DIGITSCOR","TRABSCOR",
    # MRI volumétricos
    "Ventricles","Hippocampus","WholeBrain","Entorhinal","Fusiform","MidTemp","ICV",
]
df_clinical = df_clinical[atributes]
df_clinical["EXAMDATE"] = pd.to_datetime(df_clinical["EXAMDATE"], errors="coerce")

````
# [Función] Convertir DICOM -> NifTI
for root, dirs, files in os.walk(base_dir):
    if any(f.lower().endswith('.dcm') for f in files):
        rel_path = os.path.relpath(root, base_dir)
        out_path = os.path.join(output_dir, rel_path)
        os.makedirs(out_path, exist_ok=True)
        cmd = [
            dcm2niix_path,
            "-z", "n",       # ❌ No comprimir (.nii en lugar de .nii.gz)
            "-b", "n",       # ❌ No crear JSON
            "-o", out_path,  # 📁 Carpeta de salida
            "-f", "%p_%s",   # 🧩 Nombre del archivo (protocolo_series)
            root             # 📂 Carpeta de entrada con los DICOM
        ]
        print(f"📦 Convirtiendo: {root}")
        subprocess.run(cmd, check=True)
print("✅ Conversión DICOM → NIfTI completada.")
````

In [6]:
# [] Guardar atributos de las imágenes
records = []

for root, dirs, files in os.walk(output_dir):
    for file in files:
        if file.endswith(".nii"):
            full_path = os.path.join(root, file)

            # 🧩 Extraer ID del sujeto
            match_id = re.search(r'(\d{3}_S_\d{4})', full_path)
            sujeto_id = match_id.group(1) if match_id else None

            # 🗓️ Extraer fecha del estudio (YYYY-MM-DD)
            match_fecha = re.search(r'(\d{4}-\d{2}-\d{2})', full_path)
            fecha = match_fecha.group(1) if match_fecha else None

            # Leer metadatos del NIfTI
            try:
                img = nib.load(full_path)
                data = img.get_fdata()

                header = img.header
                shape = img.shape
                voxel_size = header.get_zooms()
                voxel_volume = np.prod(voxel_size)
                total_volume = voxel_volume*np.prod(shape)
                datatype = str(header.get_data_dtype())
                mean_intensity = np.mean(data)
                std_intensity = np.std(data)
                orientation = nib.aff2axcodes(img.affine)
                units = header.get_xyzt_units()

                records.append([
                    file, fecha, sujeto_id, shape, voxel_size, datatype, full_path,
                    voxel_volume, total_volume, mean_intensity, std_intensity, orientation, units
                ])
            except Exception as e:
                print(f"⚠️ Error leyendo {file}: {e}")

df_images = pd.DataFrame(records, columns=[
    "archivo", "fecha_imagen", "sujeto_id", "shape", "voxel_size", "datatype", "ruta",
    "voxel_volume_mm3", "total_volume", "mean_intensity", "std_intensity", "orientation", "units"
])

# Convertir fecha a tipo datetime
df_images["fecha_imagen"] = pd.to_datetime(df_images["fecha_imagen"], errors="coerce")
df_images.to_csv("Imagenes.csv", index=False)

print(f"En total {len(df_images)} imágenes fueron cargadas correctamente.")

En total 302 imágenes fueron cargadas correctamente.


In [20]:
df_images_sorted = df_images.sort_values(["sujeto_id", "fecha_imagen"])
df_clinical_sorted = df_clinical.sort_values(["PTID", "EXAMDATE"])
# Agrupar por sujeto
grupos_img = df_images_sorted.groupby("sujeto_id")
grupos_clin = df_clinical_sorted.groupby("PTID")

# Lista para almacenar resultados
asignaciones = []

# Iterar por sujeto
for sujeto in df_images_sorted["sujeto_id"].unique():
    imgs = grupos_img.get_group(sujeto).reset_index(drop=True)
    clin = grupos_clin.get_group(sujeto).reset_index(drop=True)
    
    # Asegurar que tienen el mismo número de registros
    n = min(len(imgs), len(clin))
    
    # Asignar VISCODE por posición
    imgs = imgs.iloc[:n].copy()
    clin = clin.iloc[:n].copy()
    imgs["VISCODE"] = clin["VISCODE"]
    
    asignaciones.append(imgs)

# Concatenar todo
df = pd.concat(asignaciones, ignore_index=True)
df = df.merge(
    df_clinical,
    left_on=["sujeto_id", "VISCODE"],
    right_on=["PTID", "VISCODE"],
    how="left"  # Usamos left para conservar todas las imágenes
)

print(f"Se tienen {df.shape[0]} imágenes con su registro.")

Se tienen 298 imágenes con su registro.


In [21]:
print(f"Total de pacientes {df['sujeto_id'].nunique()}")
# Crear una tabla pivote donde las celdas indican si hay o no registro
tabla_visitas = (
    df
    .assign(valor="O")  # marcamos presencia
    .pivot_table(index="sujeto_id", columns="VISCODE", values="valor", aggfunc="first", fill_value="X")
    .sort_index(axis=1)
)
tabla_visitas
conteo_visitas = (tabla_visitas == "O").sum(axis=1)
resumen_visitas = conteo_visitas.value_counts().sort_index()
tabla_resumen = pd.DataFrame({
    "Vistas": resumen_visitas.index,
    "Pacientes": resumen_visitas.values
})
print("Resumen de número de visitas por paciente:")
print(tabla_resumen)


Total de pacientes 54
Resumen de número de visitas por paciente:
   Vistas  Pacientes
0       2          2
1       3          6
2       4          7
3       5         10
4       6         13
5       7         12
6       8          3
7      12          1


In [23]:
df.to_csv('ADNI_Images.csv', index=False)