# 12. Preprocesado tras incorporar ASAS-SN v band al dataset

In [1]:
# Configuración general para evitar errores de warnings y compatibilidad
import warnings
import os
warnings.filterwarnings("ignore")
os.environ["RICH_NO_RICH"] = "1"
print("Configuración de entorno aplicada.")

Configuración de entorno aplicada.


Dentro del script, este es el preprocesado que se hace por cada curva:

| Paso                                   | ¿Incluido?    | Descripción                                                               |
| -------------------------------------- | ------------- | ------------------------------------------------------------------------- |
| Orden temporal                         | ❌ (implícito) | No se fuerza explícitamente `sort_values('tiempo')`, pero puede añadirse. |
| Sustitución de `NaN`                   | ✅             | Usa `np.nan_to_num` con la mediana.                                       |
| Filtro por longitud mínima             | ✅             | `if len(magnitudes) < MIN_POINTS`                                         |
| Filtro por dispersión mínima (`std`)   | ✅             | `if np.std(magnitudes) < MIN_STD`                                         |
| Normalización robusta (mediana/IQR)    | ✅             | `(magnitudes - median) / iqr`                                             |
| Clip de valores extremos               | ✅             | `np.clip(..., -1000, 1000)`                                               |
| Padding y atención mask                | ✅             | Rellena hasta `seq_length`, y genera `attention_mask`                     |
| Validación de `nan`/`inf` tras normal. | ✅             | Verifica si hay valores no válidos después de normalizar.                 |
| Normalización de clase                 | ✅             | Aplica `normalize_label` a la clase de entrada.                           |

¿Qué se podría mejorar?

1. **Orden temporal explícito**:
   Actualmente **no se aplica `sort_values("tiempo")`** sobre cada curva antes de procesarla. Aunque muchas curvas ya vienen ordenadas, sería más robusto añadir:

   ```python
   df = df.sort_values(by=find_column(df.columns, "tiempo"))
   ```

   ...como primer paso dentro de `process_single_curve()`.

2. **Soporte para features adicionales**:
   Ahora solo se usa `mag` (magnitud), pero si en el futuro deseas usar también `flux`, `mag_err`, etc., habría que adaptar esta función o añadir variantes.

Como está ahora, los **únicos datos de entrada al modelo** son:

* `magnitudes_norm`: vector de `seq_length` elementos (curva preprocesada)
* `attention_mask`: vector binario que indica datos válidos
* `clase`: etiqueta de clase codificada (para entrenamiento o evaluación)

Esto es **lo correcto** para un modelo tipo Transformer que espera curvas de magnitud como secuencia 1D.



In [1]:
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pyarrow.fs as fs
from src.utils.dataset_paths import DATASET_PATHS_AWS

s3_bucket = "sagemaker-eu-west-3-478344394470"
s3_path = "datasets/dataset_consolidado_optimizado.parquet"

print("Cargando dataset completo en memoria (requiere ~40-60GB)...")
dataset = ds.dataset(DATASET_PATHS_AWS, format="parquet")
tabla = dataset.to_table()

print("Guardando dataset consolidado directamente en S3 (sin pasar por disco local)...")
s3_filesystem = fs.S3FileSystem()
with s3_filesystem.open_output_stream(f"{s3_bucket}/{s3_path}") as s3_file:
    pq.write_table(tabla, s3_file, row_group_size=10_000_000)

print("✅ ¡Dataset optimizado guardado correctamente en S3!")


Cargando dataset completo en memoria (requiere ~40-60GB)...
Guardando dataset consolidado directamente en S3 (sin pasar por disco local)...
✅ ¡Dataset optimizado guardado correctamente en S3!


In [1]:
import pyarrow.parquet as pq

# Ruta local del parquet consolidado
parquet_path = "/mnt/data/datasets/dataset_consolidado_optimizado.parquet"

# Leer solo la metadata (no carga datos)
metadata = pq.read_metadata(parquet_path)

print("✅ Archivo leído correctamente")
print(f"🧮 Número de filas: {metadata.num_rows:,}")
print(f"📊 Número de columnas: {metadata.num_columns}")
print(f"🧾 Columnas: {[metadata.schema.column(i).name for i in range(metadata.num_columns)]}")




✅ Archivo leído correctamente
🧮 Número de filas: 626,189,090
📊 Número de columnas: 13
🧾 Columnas: ['id', 'time', 'mag', 'mag_err', 'flux', 'flux_err', 'clase_variable', 'clase_variable_normalizada', 'mission', 'mission_id', 'source_dataset', 'label_source', 'band']


In [2]:
import pyarrow.parquet as pq
import pandas as pd

# Ruta al parquet
parquet_path = "/mnt/data/datasets/dataset_consolidado_optimizado.parquet"

# Abrir el archivo como ParquetFile
pf = pq.ParquetFile(parquet_path)

# Leer las primeras N filas del primer row group (sin cargar todo)
sample_table = pf.read_row_group(0)
sample_df = sample_table.to_pandas()

# Mostrar algunas filas
pd.set_option("display.max_columns", None)
print(sample_df.head(5))


                             id           time     mag  mag_err     flux  \
0  ASASSN-V J000000.19+320847.2            HJD     mag  mag_err     flux   
1  ASASSN-V J000000.19+320847.2  2458017.73473  15.468  0.05599  2.35900   
2  ASASSN-V J000000.19+320847.2  2458018.75446   15.39  0.05292  2.53600   
3  ASASSN-V J000000.19+320847.2  2458034.87939  15.276  0.04876  2.81700   
4  ASASSN-V J000000.19+320847.2  2458035.92739  15.405  0.05350  2.50100   

   flux_err clase_variable clase_variable_normalizada mission    mission_id  \
0  flux_err             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
1   0.12152             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
2   0.12347             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
3   0.12638             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
4   0.12309             EW           Eclipsing Binary  ASASSN  ASASSN_gband   

  source_dataset    label_source band  
0   asassn_gband  ASASSN_Cat

In [1]:
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pyarrow.fs as fs
from src.utils.dataset_paths import DATASET_PATHS_AWS

s3_bucket = "sagemaker-eu-west-3-478344394470"
s3_path = "datasets/dataset_consolidado_optimizado.parquet"

print("Cargando dataset completo en memoria (requiere ~40-60GB)...")
dataset = ds.dataset(DATASET_PATHS_AWS, format="parquet")
tabla = dataset.to_table()

print("Guardando dataset consolidado directamente en S3 (sin pasar por disco local)...")
s3_filesystem = fs.S3FileSystem()
with s3_filesystem.open_output_stream(f"{s3_bucket}/{s3_path}") as s3_file:
    pq.write_table(tabla, s3_file, row_group_size=10_000_000)

print("✅ ¡Dataset optimizado guardado correctamente en S3!")


Cargando dataset completo en memoria (requiere ~40-60GB)...
Guardando dataset consolidado directamente en S3 (sin pasar por disco local)...
✅ ¡Dataset optimizado guardado correctamente en S3!


In [1]:
import pyarrow.parquet as pq

# Ruta local del parquet consolidado
parquet_path = "/mnt/data/datasets/dataset_consolidado_optimizado.parquet"

# Leer solo la metadata (no carga datos)
metadata = pq.read_metadata(parquet_path)

print("✅ Archivo leído correctamente")
print(f"🧮 Número de filas: {metadata.num_rows:,}")
print(f"📊 Número de columnas: {metadata.num_columns}")
print(f"🧾 Columnas: {[metadata.schema.column(i).name for i in range(metadata.num_columns)]}")




✅ Archivo leído correctamente
🧮 Número de filas: 626,189,090
📊 Número de columnas: 13
🧾 Columnas: ['id', 'time', 'mag', 'mag_err', 'flux', 'flux_err', 'clase_variable', 'clase_variable_normalizada', 'mission', 'mission_id', 'source_dataset', 'label_source', 'band']


In [2]:
import pyarrow.parquet as pq
import pandas as pd

# Ruta al parquet
parquet_path = "/mnt/data/datasets/dataset_consolidado_optimizado.parquet"

# Abrir el archivo como ParquetFile
pf = pq.ParquetFile(parquet_path)

# Leer las primeras N filas del primer row group (sin cargar todo)
sample_table = pf.read_row_group(0)
sample_df = sample_table.to_pandas()

# Mostrar algunas filas
pd.set_option("display.max_columns", None)
print(sample_df.head(5))


                             id           time     mag  mag_err     flux  \
0  ASASSN-V J000000.19+320847.2            HJD     mag  mag_err     flux   
1  ASASSN-V J000000.19+320847.2  2458017.73473  15.468  0.05599  2.35900   
2  ASASSN-V J000000.19+320847.2  2458018.75446   15.39  0.05292  2.53600   
3  ASASSN-V J000000.19+320847.2  2458034.87939  15.276  0.04876  2.81700   
4  ASASSN-V J000000.19+320847.2  2458035.92739  15.405  0.05350  2.50100   

   flux_err clase_variable clase_variable_normalizada mission    mission_id  \
0  flux_err             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
1   0.12152             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
2   0.12347             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
3   0.12638             EW           Eclipsing Binary  ASASSN  ASASSN_gband   
4   0.12309             EW           Eclipsing Binary  ASASSN  ASASSN_gband   

  source_dataset    label_source band  
0   asassn_gband  ASASSN_Cat

In [None]:
import warnings
import numpy as np

# Ignorar solo los RuntimeWarning de numpy (como overflows en reduce)
warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy")

import warnings
import numpy as np

# Ignorar solo los RuntimeWarning de numpy (como overflows en reduce)
warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy")

from src.fase2.script_1_transformer_preprocessing_optimizado_2 import main

max_per_class_override={
    "Irregular": 9000,
    "Rotational": 9000,
    "Eclipsing Binary": 9000,
    "Delta Scuti": None,               # 7.550 → TODAS
    "RR Lyrae": 9000,                    # 41.208 → TODAS NO
    "Young Stellar Object": None,      # 9.809 → TODAS
    "Cataclysmic": None,               # 2.080 → TODAS
    "White Dwarf": None,               # 118 → TODAS
    "Variable": 1000                   # limitada por ser genérica
}

main(
    seq_length=25000,
    max_per_class=None, # usamos override completo
    max_per_class_override=max_per_class_override,
    parquet_batch_size=10_000_000,
    dataloader_batch_size=128,
    num_workers=72,    
)

📂 Cargando datos en lotes con PyArrow...
💾 [INFO] Cargando agrupación de curvas desde cache: data/train/grouped_data.pkl
✅ [INFO] Agrupación cargada desde cache. Total objetos: 88765
⏳ [INFO] Tiempo en agrupación de datos: 17.3 segundos
🚀 Procesando 88765 curvas en paralelo usando 72 CPUs...


  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else magnitudes - median
  magnitudes_norm = (magnitudes - median) / iqr if iqr != 0 else 

⏳ [INFO] Tiempo en procesamiento paralelo: 89.8 segundos
🔋 [INFO] Curvas válidas tras filtrado: 85016
[INFO] Uso de memoria de sequences: 4053.88 MB
[INFO] Uso de memoria de masks: 4053.88 MB
[INFO] Uso de memoria de labels: 0.32 MB
💾 [INFO] Guardando label_encoder.pkl...
📊 Recuento por clase codificada:
 3 (Irregular): 9000
 4 (RR Lyrae): 37569
 2 (Eclipsing Binary): 9000
 5 (Rotational): 9000
 0 (Cataclysmic): 2080
 8 (Young Stellar Object): 9799
 1 (Delta Scuti): 7450
 6 (Variable): 1000
 7 (White Dwarf): 118
[INFO] N curvas: 85016, seq_length: 25000
[INFO] Estimación memoria sequences (float16): 3.96 GB
[INFO] Estimación memoria sequences (float32): 7.92 GB
[INFO] Si tienes problemas de memoria, considera usar almacenamiento en disco y Dataset bajo demanda.
📝 [INFO] Realizando split train/val...
💾 [INFO] Guardando datasets serializados...

📉 Resumen de curvas descartadas:
🔸 All nan                       : 0
🔸 Low std                       : 0
🔸 Short curve                   : 0
🔸 N

(<torch.utils.data.dataloader.DataLoader at 0x7fdfbdcbee00>,
 <torch.utils.data.dataloader.DataLoader at 0x7fdfbdcbfe80>)

(<torch.utils.data.dataloader.DataLoader at 0x7fdfbdcbee00>,
 <torch.utils.data.dataloader.DataLoader at 0x7fdfbdcbfe80>)

Todo ha funcionado correctamente y con métricas excelentes:

#### ✅ **Preprocesado finalizado con éxito**

* 🧪 **Curvas válidas**: `85.016` (tras filtrado y eliminación de clase `Unknown`)
* 💾 **Uso de memoria estimado**:

  * `sequences` (float16): **3.96 GB**
  * `masks` (float16): **3.96 GB**
  * `labels` (int32): **0.32 MB**
* 💡 `float16` ha reducido significativamente el uso de RAM (hubiera sido casi 8 GB con float32).
* 📊 **Distribución final por clase**:

| Clase codificada | Clase                | Nº curvas |
| ---------------- | -------------------- | --------- |
| 0                | Cataclysmic          | 2.080     |
| 1                | Delta Scuti          | 7.450     |
| 2                | Eclipsing Binary     | 9.000     |
| 3                | Irregular            | 9.000     |
| 4                | RR Lyrae             | 37.569    |
| 5                | Rotational           | 9.000     |
| 6                | Variable             | 1.000     |
| 7                | White Dwarf          | 118       |
| 8                | Young Stellar Object | 9.799     |

**Curvas descartadas** (todos los motivos controlados):

```plaintext
🔸 All nan                       : 0
🔸 Low std                       : 0
🔸 Short curve                   : 0
🔸 Nan or inf after norm         : 0
🔸 Unknown class                 : 0
🔸 Ok                            : 0
```

Esto último indica que **todas las curvas procesadas fueron válidas**, y que el filtrado previo fue preciso y efectivo.

#### 📦 **Salidas generadas** (en data/train)

* `label_encoder.pkl`
* `train_dataset.pt`
* `val_dataset.pt`
* `debug_clases_codificadas.csv`
* `debug_descartes.csv`

🕓 **Tiempo total de ejecución**: `3.9 minutos` (en una máquina de alto rendimiento, 72 CPUs, 140 GB RAM).

Este resultado es **óptimo**, y confirma que la configuración actual del pipeline es estable, eficiente y lista para el entrenamiento final del modelo Transformer.

