## 0. Preparación del entorno
Para empezar, lo primero que vamos a hacer va a ser preparar el entorno en el que se van a hacer las ejecuciones, de donde el programa tiene que cargar las imágenes y las máscaras de las nebulosas planetarias y donde va a guardar los resultados obtenidos.

In [None]:
import os

print("Vamos a cambiar el directorio de trabajo")

# Indicamos la ruta del directorio de trabajo
# route = "C:\\Users\\Lucan\\OneDrive - Universidade da Coruña\\Escritorio\\4_GCEID\\TFG\\test\\PNe_segmentation"
route = os.getcwd() #+ "/TFG/test/PNe_segmentation"
os.chdir(route)

current_directory = os.getcwd()
print("\nEl directorio actual es:", current_directory)

# Listamos el contenido del directorio
files = os.listdir(current_directory)
print("\nContenido del directorio actual:")
for file in files:
    print("\t",file)


### 0.1. Carga de las imágenes junto a sus máscaras


Vamos a hacer una prueba de como cargaríamos una máscara y contorno junto con la imagen a la que pertenece. Pero antes vamos a realizar ciertas operaciones para tener almacenados y clasificados todos los archivos que tenemos disponibles.

In [None]:
# Listamos el contenido del directorio de las máscaras
# masks_directory = "C:\\Users\\Lucan\\OneDrive - Universidade da Coruña\\Escritorio\\4_GCEID\\TFG\\test\\PNe_segmentation\\masks"
masks_directory = os.getcwd() + "/masks"
masksFiles = os.listdir(masks_directory)
masks_files = [file for file in masksFiles if file.endswith(".png")]

# data_directory = "C:\\Users\\Lucan\\OneDrive - Universidade da Coruña\\Escritorio\\4_GCEID\\TFG\\test\\PNe_segmentation\\data"
data_directory = os.getcwd() + "/data"
dataFiles = os.listdir(data_directory)
data_files = [file for file in dataFiles if file.endswith(".fits")]

Creamos un diccionario en el que las claves sean el nombre identificador de la nebulosa y el valor sea una lista con las máscaras y contornos disponibles de esa nebulosa.

In [None]:
masks_dict = {}

for mask_file in masks_files:
    nebula_name = mask_file.split("_")[:-1]
    nebula_name = "_".join(nebula_name)
    if masks_dict.get(nebula_name) is None:
        masks_dict[nebula_name] = [mask_file]
    else: 
        masks_dict[nebula_name].append(mask_file)

masks_dict


Comprobamos que todas las máscaras/contornos disponibles se han asignado a una nebulosa

In [None]:
masks_dict_count = sum(len(files) for files in masks_dict.values())
masks_files_count = len(masks_files)

if masks_dict_count == masks_files_count:
    print("The number of files in masks_dict is the same as the number of files in masks_files.")
else:
    print("The number of files in masks_dict is different from the number of files in masks_files.")
    
    files_not_in_data_dict = set(masks_files) - set(sum(masks_dict.values(), []))
    print("Files not in data_dict:", files_not_in_data_dict)


Ahora creamos un diccionario similar al anterior, con las mismas claves, pero en los valores se van a guardar los archivos fits que se refieren a la nebulosa indicada.

In [None]:
import re
data_dict = {}

for nebula in masks_dict.keys():
    patron = re.compile(r'(?i)' + nebula + r'\D+.*')
    data_dict[nebula] = [file for file in data_files if patron.match(file)]

data_dict

Comprobamos que todos los archivos fits disponibles se han asignado a una nebulosa

In [None]:
data_dict_count = sum(len(files) for files in data_dict.values())
data_files_count = len(data_files)

if data_dict_count == data_files_count:
    print("The number of files in data_dict is the same as the number of files in data_files.")
else:
    print("The number of files in data_dict is different from the number of files in data_files.")
    
    files_not_in_data_dict = set(data_files) - set(sum(data_dict.values(), []))
    print("Files not in data_dict:", files_not_in_data_dict)


Descartamos la nebulosa K3_72 debido a que hemos realizado dos segmentaciones las cuales en un futuro tendremos que decidir con cual nos quedamos o utilizar las dos de alguna manera (como si fueran dos nebulosas independientes).

In [None]:
data_dict.pop('K3_72_big', None)
data_dict.pop('K3_72_small', None)

En este punto tenemos dos diccionarios con los cuales cargar los archivos facilmente y clasificados por el nombre identificador de la nebulosa.
Ahora sí, vamos a cargar todos los canales disponibles de una nebulosa junto a su contorno y máscara (si tiene).

In [None]:
from astropy.io import fits
import imageio as io
import numpy as np
def cargar_canales_nebulosa(nebula, data_dict, masks_dict, data_directory, masks_directory, name_files:bool=False):
    data_files = data_dict[nebula]
    masks_files = masks_dict[nebula]
    data = {}
    masks = {}
    charge_files = {}
    
    for file in masks_files:
        key = file.split("_")[-1].split(".")[0]
        image = io.imread("masks/"+file)
        if len(image.shape) > 2:
            image = image[:,:,0]
        masks[key] = image
        
    for file in data_files:
        image = fits.getdata("data/"+file)
        image = np.flip(image, axis=0)
        if image.shape == masks['contour'].shape:
            key = file.replace(nebula, "").replace("_","")[0].lower()
            data[key] = image
            charge_files[key] = file
        # else:
        #     print(f"Error: {file} has a different shape than the masks.")
    if not name_files:
        return data, masks
    else:
        return data, masks, charge_files

En este punto, ya podríamos cargar todos los canales y máscaras que tenemos, pudiendo distinguir por nebulosa. Por ejemplo, en la siguiente celda de código vamos a cargar todos los canales de una nebulosa en específico junto a sus máscaras.

In [None]:
data_dict['K2_1']

In [None]:
canales, mascaras = cargar_canales_nebulosa("K3_46", data_dict, masks_dict, data_directory, masks_directory)

Vamos a analizar como son las imagenes obtenidas, sus valores mínimos y máximos, sus dimensiones, etc.

In [None]:
def imprimir_info_imagen(image, title):
    min_value = np.min(image)
    max_value = np.max(image)
    dimensions = image.shape
    null_values = np.isnan(image).sum()
    print(title)
    print(f"Minimum value: {min_value}")
    print(f"Maximum value: {max_value}")
    print(f"Null values: {null_values}")
    print(f"Dimensions: {dimensions}\n")

In [None]:
# Analyze canales' images
for key, channel in canales.items():
    imprimir_info_imagen(channel, f"Channel {key}:")

# Analyze mascaras' images
for key, mask in mascaras.items():
    imprimir_info_imagen(mask, f"Mask {key}:")

In [None]:
import matplotlib.pyplot as plt
num_images = len(canales)

fig, ax = plt.subplots(1, num_images, figsize=(10*num_images, 10))
fig.suptitle(f"Canales de la nebulosa A2", fontweight = 'bold', fontsize = 14)

for i, (key, channel) in enumerate(canales.items()):
    ax[i].imshow(channel, cmap = "gray")
    ax[i].set_title(f"Canal {key}")
fig.show()

if 'mask' in mascaras.keys():
    fig, ax = plt.subplots(1, 2, figsize=(20, 10))
    fig.suptitle(f"Contorno y máscara de la nebulosa A2", fontweight = 'bold', fontsize = 14)

    ax[0].imshow(mascaras['contour'], cmap = "gray")
    ax[1].imshow(mascaras['mask'], cmap = "gray")
    fig.show()
else:
    fig, ax = plt.subplots(1, 1, figsize=(10, 10))
    fig.suptitle(f"Contorno de la nebulosa A2", fontweight = 'bold', fontsize = 14)

    ax.imshow(mascaras['contour'], cmap = "gray")
    fig.show()

### 0.2. Preprocesado de los datos

Como parte del preprocesado inicial, vamos a hacer un estudio de cuales son los canales/filtros que comparten todas (o la mayoría de nebulosas) para así empezar a hacer pruebas con un conjunto de imágenes lo más homogéneo posible (para los canales).

In [None]:
for i in data_dict.keys():
    canales, mascaras = cargar_canales_nebulosa(i, data_dict, masks_dict, data_directory, masks_directory)
    if len(canales) != 2:
        print(i, len(canales))
    

En este punto se han eliminado algunas imágenes de nebulosas del conjunto de datos:
- A33: dimensiones descuadradas
- H3_75: dimensiones descuadradas 

Vamos a comprobar que las nebulosas tengan mínimo un canal en común y ver cual es:

In [None]:
cnt = {}
for i in data_dict.keys():
    canales, mascaras = cargar_canales_nebulosa(i, data_dict, masks_dict, data_directory, masks_directory)
    for key in canales.keys():
        if key in cnt.keys():
            cnt[key] += 1
        else:
            cnt[key] = 1
        
        if key == '.':
            print(i, key, canales.keys())

print(cnt)
    
    

Descartamos la nebulosa h2 debido a que no se no sabemos a que canal pertenece (creo que es una combinación en escala de grises de diferentes canales por como se describe en el .fits)

In [None]:
data_dict.pop('h2', None)

Como se puede observar, la mayoría de nebulosas tienen el canal 'h', por lo que nos vamos a quedar con todas las imágenes que contiene ese canal y vamos a empezar a trabajar con ellos.

In [None]:
new_data_dict = {}
for i in data_dict.keys():
    canales, mascaras, charge_files = cargar_canales_nebulosa(i, data_dict, masks_dict, data_directory, masks_directory, True)
    if 'h' in canales.keys():
        if 'n' in canales.keys() or 'o' in canales.keys() or 'b' in canales.keys():
            charge_files.pop('n', None)
            charge_files.pop('o', None)
            charge_files.pop('b', None)
        new_data_dict[i] = charge_files

print(f"El tamaño de data_dict es {len(data_dict)} y el tamaño de new_data_dict es {len(new_data_dict)}")

Vamos a crear el diccionario definitivo con los archivos que vamos a cargar. Para esta prueba inicial, vamos a cargar como máscara única el archivo _mask si la nebulosa lo tiene y si no cargaremos el _contour.

In [None]:
for nebula, files in new_data_dict.items():
    mask = [file for file in masks_dict[nebula] if 'contour' in file][0]
    if len(masks_dict[nebula]) > 1:
        mask = [file for file in masks_dict[nebula] if 'mask' in file][0]
    new_data_dict[nebula].update({'mask': mask})
# Mostramos un ejemplo para ver que todo ha ido correctamente
print(new_data_dict[nebula])

Vamos a transformarlo a un dataframe de pandas para poder exportarlo a .csv y más adelante no tener que volver a hacer todo el procedimiento, además trabajar con dataframes de pandas puede ser mucho más accesible y fácil de visualizar.

In [None]:
import pandas as pd
df = pd.DataFrame.from_dict(new_data_dict, orient='index')

# Quiero cargar el nombre de la nebulosa como una columna y cambiarle el nombre de index a name
df.reset_index(inplace=True)
df.rename(columns={'index':'name'}, inplace=True)
print(df.head())

# Guardar el dataframe en un archivo csv
df.to_csv("data_files_1c.csv", index=False)

Llegados a este punto, a no ser que se amplie el dataset de imágenes, no vamos a tener que volver a ejecutar esta parte inicial.

## 0.3. División de los conjuntos train y test

Vamos a formar dos .csv a partir del principal (creado anteriormente), uno que solo tenga las nebulosas usadas para entrenamiento y otro que solo tenga las nebulosas para test. De este modo, cuando ampliamos el dataset con más nebulosas, solo tenemos que utilizar el 'dataset_info.csv' para que siempre las mismas imágenes se encuentren en entrenamiento y en test, para que a la hora de realizar las pruebas con los distintos algoritmos no haya 'contaminaciones'.

In [6]:
import pandas as pd
df_general = pd.read_csv("data_files_1c.csv")
df_info = pd.read_csv("dataset_info.csv", sep=';')

df_merged = pd.merge(df_general, df_info, on='name', how='left')

In [14]:
missing_rows = df_info[~df_info['name'].isin(df_general['name'])]
missing_rows

Unnamed: 0,name,complete_tag,simple_tag,set
9,A33,R,R,test
99,K3_72,Br,B,train


Hacemos las divisiones de train y de test

In [9]:
df_train = df_merged[df_merged['set'] == 'train']
df_test = df_merged[df_merged['set'] == 'test']

Comprobamos que la suma del número de filas de ambas particiones más las filas que se han perdido por el camino suman el número total de filas en df_info (nebulosas procesadas, aunque algunas se hayan perdido por el camino por diversas razones)

In [19]:
df_train.shape[0] + df_test.shape[0] + missing_rows.shape[0] == df_info.shape[0]

True

Guardamos en dos .csv distintos los conjuntos de train y de test, con sus etiquetas incluidas y todo

In [21]:
df_train = df_train.drop(columns=['set'])
df_test = df_test.drop(columns=['set'])

df_train.to_csv("data_files_1c_train.csv", index=False)
df_test.to_csv("data_files_1c_test.csv", index=False)