<h3>Descripción del notebook</h3>
<p>Desarrollamos este script para poder procesar todas las imágenes que tenemos descargadas en nuestras carpetas y generar unos ficheros csv con todos los datos necesarios para realizar el entrenamiento y la validación de nuestro futuro modelo CNN</p>

<p>Cargamos las librerias necesáreas</p>

In [None]:
import os
import pandas as pd
import pydicom
import numpy as np

from skimage import exposure
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import layers
from skimage.transform import resize
from concurrent.futures import ThreadPoolExecutor
from pydicom.pixel_data_handlers import pillow_handler
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import ast

<p>Cargamos las imágenes y etiquetas</p>

In [None]:
def load_images_and_labels(df, base_path, batch_size=100):
    images_path = []
    labels = []
    paths = []  # Agrega una lista para almacenar las rutas completas

    with ThreadPoolExecutor(max_workers=5) as executor:
        for _, row in df.iterrows():
            print("*****Estoy en el load_images_and_labels*****",_)
            path_paciente = row['image file path']
            image_path = os.path.join(base_path, path_paciente.split('/')[0])
            image_path = image_path.replace("/", "\\")
            images_path.extend(search_images(image_path))

        # Cargamos imágenes en paralelo
        loaded_images = list(executor.map(load_image, images_path))

        # Filtramos las imágenes que no pudieron ser cargadas
        valid_indices = [i for i, img in enumerate(loaded_images) if img is not None]
        loaded_images = [img for i, img in enumerate(loaded_images) if i in valid_indices]
        paths = [path for i, path in enumerate(images_path) if i in valid_indices]

        # Obtenemos las etiquetas de las imágenes válidas
        labels = df['pathology'].apply(lambda x: 1 if 'MALIGNANT' in x else (2 if 'BENIGN' in x else (3 if 'BENIGN WITHOUT CALLBACK' in x else (4 if 'MALIGNANT WITHOUT CALLBACK' in x else 0))))
        labels = labels.values.astype(np.float32)
        labels = labels[valid_indices]

    return np.array(loaded_images), labels, paths

<p>Función que busca imagenes DICOM y que carga la imagen respectivamente</p>

In [None]:
def search_images(ruta):
    imagenes = []

    # Buscamos archivos de imagen (dcm) en la ruta de origen y subdirectorios
    for root, dirs, files in os.walk(ruta):

        # Este if lo hacemos por como está estructurada la arquitectura de carpetas y dcm, si no está en la raíz que no lo coja.
        # Previamente ya se ha realizado un copiado de todas las imagenes .dcm a la carpeta que nos interesa con un script
        if(len(root.split("\\")) <= 9):
            for file in files:
                if file.endswith('.dcm'):
                    imagenes.append(os.path.join(root, file))

    return imagenes

def load_image(image_path):
    # Para ver la matriz total declaramos estas opciones
    pd.set_option('display.max_rows', None)
    pd.set_option('display.max_columns', None)
    np.set_printoptions(threshold=np.inf)
    print("Cargando imagen", image_path)

    try:
        ds = pydicom.dcmread(image_path)

        # Configuramos PyDicom para que utilice el manejador pillow
        pydicom.config.image_handlers = [pillow_handler]

        image = ds.pixel_array.astype(np.float32)

        # Normalizamos los valores entre 0 y 1
        image = (image - np.min(image)) / (np.max(image) - np.min(image))

        image = exposure.equalize_adapthist(image)
        image_resized = resize(image, (128, 128), mode='reflect', anti_aliasing=True)

        # Convertimos la imagen en escala de grises a imagen RGB
        image_rgb = np.stack((image_resized,) * 3, axis=-1)

        return image_rgb
    except Exception as e:
        print(f"Error al procesar la imagen {image_path}: {e}")
        return None  # Retorna None para indicar que ha ocurrido un error

<p>Función que guarda todos los datos en un csv</p>

In [None]:
def save_to_csv(images, labels, paths, csv_filename):
    # Concatenamos las imágenes en una matriz tridimensional
    images_array = np.stack(images, axis=0)

    # Creamos un DataFrame con las imágenes y etiquetas
    df = pd.DataFrame({'Image': images_array.tolist(), 'Label': labels, 'Path': paths})

    # Guardamos el DataFrame en un archivo CSV
    df.to_csv(csv_filename, index=False, sep=';')

<p>Iniciamos el script definiendo la ruta base donde se encuentran las imágenes y cargamos los datos para generar los csv resultantes</p>

In [None]:
# Definimos la ruta base donde se encuentran las imágenes
base_image_path = "D:/Documentos/0.-Universidad Ingeniería Informática/TFG/proyecto/src/resources/CBIS-DDSM"

# Cargamos datos de cáncer de masa
df_mass_train = pd.read_csv('./resources/csv/mass_case_description_train_set.csv')
df_mass_test = pd.read_csv('./resources/csv/mass_case_description_test_set.csv')

# Cargamos datos de cáncer de calcificación
df_calc_train = pd.read_csv('./resources/csv/calc_case_description_train_set.csv')
df_calc_test = pd.read_csv('./resources/csv/calc_case_description_test_set.csv')

# Combinamos los datos de entrenamiento y prueba
df_train = pd.concat([df_mass_train, df_calc_train], ignore_index=True)
df_test = pd.concat([df_mass_test, df_calc_test], ignore_index=True)


# Cargar imágenes y etiquetas de entrenamiento y prueba y guardarlas en csv
X_train, y_train, paths_train = load_images_and_labels(df_train, base_image_path)
csv_filenameTrain = 'resultadosXytrain.csv'
save_to_csv(X_train, y_train, paths_train, csv_filenameTrain)

X_test, y_test, paths_test = load_images_and_labels(df_test, base_image_path)
csv_filenameTest = 'resultadosXytest.csv'
save_to_csv(X_test, y_test, paths_test, csv_filenameTest)