# 🌌 Master Notebook – Pipeline Spectroscopie DR5

## Objectif du pipeline

- Ce notebook télécharge, prépare et journalise des spectres .fits.gz de LAMOST DR5 pour entraîner un modèle de classification.
- Le DatasetBuilder garantit qu'aucun spectre ne sera jamais réutilisé en s'appuyant sur un log de fichiers déjà traités.
- Ce pipeline permet d’ajouter progressivement des spectres au jeu d’entraînement, en assurant qu’aucun spectre ne soit traité deux fois. Chaque exécution sélectionne un nouveau lot, l’entraîne, puis marque les spectres comme utilisés.


#

## Étape 0 : SETUP & IMPORTS

In [1]:
# --- Imports des librairies externes ---
import os
import pandas as pd
from IPython.display import display, Markdown
from datetime import datetime, timezone
import json
import hashlib
import subprocess

# --- Imports de TA librairie "astrospectro" ---
from utils import setup_project_env
from tools.dataset_builder import DatasetBuilder
from pipeline.processing import ProcessingPipeline
from pipeline.classifier import SpectralClassifier
# On importe la fonction pour générer le catalogue local
from tools.generate_catalog_from_fits import generate_catalog_from_fits 

# --- Initialisation de l'environnement ---
paths = setup_project_env()

# --- On définit les variables de chemin globales pour la lisibilité ---
RAW_DATA_DIR = paths["RAW_DATA_DIR"]
CATALOG_DIR = paths["CATALOG_DIR"]
PROCESSED_DIR = paths["PROCESSED_DIR"]
MODELS_DIR = paths["MODELS_DIR"]
REPORTS_DIR = paths["REPORTS_DIR"]

# --- Initialisation des outils ---
builder = DatasetBuilder(raw_data_dir=RAW_DATA_DIR, catalog_dir=CATALOG_DIR)

print("\nSetup terminé. Tu es prêt à lancer ton pipeline.")

[INFO] Racine du projet détectée : c:\Users\alexb\Documents\Google_Cloud\alex_labs_google_sprint\astro_spectro_git
[INFO] Dossier 'src' ajouté au sys.path.

Setup terminé. Tu es prêt à lancer ton pipeline.


#

## 1) Téléchargement des spectres
Utilisation du script ``dr5_downloader.py`` encapsulé en fonction.

Cette étape est désormais externalisée dans [01_download_spectra.ipynb](./01_download_spectra.ipynb) pour être exécutée seulement au besoin.

#

## 2) Préparation des données spectrales : Sélection du lot de spectres à traiter
- Le DatasetBuilder sélectionne un lot de nouveaux spectres jamais utilisés.
- Si tous les spectres disponibles ont déjà été utilisés, le pipeline s'arrête proprement.


In [3]:
print("\n=== ÉTAPE 2 : CRÉATION D'UN NOUVEAU LOT DE SPECTRES NON DÉJÀ UTILISÉS ===")

# Pour définir la grosseur du lot modifier la variable de batch_size= par la valeur voulu
new_batch_paths = builder.get_new_training_batch(batch_size=1000, strategy="random")

if new_batch_paths:
    print(f"\n{len(new_batch_paths)} nouveaux spectres proposés pour traitement.")
    print(f"Exemple : {new_batch_paths[0]}")
else:
    print("\nAucun nouveau spectre à traiter : le pipeline est à jour.")


=== ÉTAPE 2 : CRÉATION D'UN NOUVEAU LOT DE SPECTRES NON DÉJÀ UTILISÉS ===
--- Constitution d'un nouveau lot d'entraînement ---
  > 11150 spectres trouvés dans 'c:\Users\alexb\Documents\Google_Cloud\alex_labs_google_sprint\astro_spectro_git\data\raw'
  > 0 spectres déjà utilisés dans des entraînements précédents.
  > 11150 spectres nouveaux et disponibles pour l'entraînement.
  > Sélection d'un échantillon aléatoire de 1000 spectres.

1000 nouveaux spectres proposés pour traitement.
Exemple : B6202/spec-55862-B6202_sp06-207.fits.gz


#

## 3) parcours ce lot de spectres pour générer un CSV à partir des headers des fichiers ``.fits.gz``
C’est ce CSV qui sera ton ``master_catalog`` local, aligné exactement avec les spectres que tu vas traiter dans ce lot.

In [4]:
import os
# On importe la fonction depuis le module où elle se trouve
from tools.generate_catalog_from_fits import generate_catalog_from_fits

print("\n=== ÉTAPE 3 : GÉNÉRATION DU CATALOGUE LOCAL DE HEADERS ===\n")

# On vérifie que le lot de fichiers est disponible
if 'new_batch_paths' in locals() and new_batch_paths:
    # Chemin de sortie du CSV temporaire/local
    output_catalog_path = os.path.join(CATALOG_DIR, "master_catalog_temp.csv")

    # Créer le dossier si nécessaire
    os.makedirs(os.path.dirname(output_catalog_path), exist_ok=True)

    # Préfixer chaque chemin relatif avec le chemin complet vers data/raw
    # On utilise la variable RAW_DATA_DIR définie dans la cellule de SETUP
    full_paths = [os.path.join(RAW_DATA_DIR, path) for path in new_batch_paths]

    # Appel de la fonction
    generate_catalog_from_fits(full_paths, output_catalog_path)

    print(f"\nCatalogue master local créé : {output_catalog_path}")
else:
    print("Veuillez d'abord exécuter la cellule 'SÉLECTION DU LOT DE TRAVAIL' pour définir 'new_batch_paths'.")


=== ÉTAPE 3 : GÉNÉRATION DU CATALOGUE LOCAL DE HEADERS ===

[OK] spec-55862-B6202_sp06-207.fits.gz ajouté au catalogue.
[OK] spec-55863-GAC_105N29_B1_sp01-066.fits.gz ajouté au catalogue.
[OK] spec-55863-M31_011N40_B1_sp09-102.fits.gz ajouté au catalogue.
[OK] spec-55863-GAC_105N29_B1_sp03-083.fits.gz ajouté au catalogue.
[OK] spec-55863-M31_011N40_M1_sp10-009.fits.gz ajouté au catalogue.
[OK] spec-55862-B6210_sp09-026.fits.gz ajouté au catalogue.
[OK] spec-55863-M31_011N40_M1_sp07-138.fits.gz ajouté au catalogue.
[OK] spec-55863-GAC_105N29_B1_sp13-004.fits.gz ajouté au catalogue.
[OK] spec-55863-M31_011N40_M1_sp09-035.fits.gz ajouté au catalogue.
[OK] spec-55862-B6202_sp01-189.fits.gz ajouté au catalogue.
[OK] spec-55863-M31_011N40_B1_sp02-224.fits.gz ajouté au catalogue.
[OK] spec-55863-GAC_105N29_B1_sp14-102.fits.gz ajouté au catalogue.
[OK] spec-55862-B6212_sp09-132.fits.gz ajouté au catalogue.
[OK] spec-55863-GAC_105N29_B1_sp09-061.fits.gz ajouté au catalogue.
[OK] spec-55862-B62

#

### effacer le contenu du master_catalog_temp.csv avant de regénérer un nouveau lot avec de nouveaux spectres

In [None]:
import pandas as pd
import os

print("\n--- Nettoyage du catalogue temporaire ---")

# On utilise la variable CATALOG_DIR définie dans la cellule de SETUP
catalog_path = os.path.join(CATALOG_DIR, "master_catalog_temp.csv")

if os.path.exists(catalog_path):
    # Créer un DataFrame vide avec uniquement l'en-tête
    # Assure-toi que cette liste de colonnes est la même que celle générée par ton script
    columns = [
        'fits_name', 'obsid', 'plan_id', 'mjd', 'class', 'subclass',
        'filename_original', 'author', 'data_version', 'date_creation',
        'telescope', 'longitude_site', 'latitude_site', 'obs_date_utc',
        'jd', 'ra', 'dec', 'fiber_id', 'fiber_type', 'object_name', 'catalog_object_type',
        'magnitude_type', 'magnitude_u', 'magnitude_g', 'magnitude_r', 'magnitude_i', 'magnitude_z',
        'heliocentric_correction', 'radial_velocity_corr', 'seeing',
        'redshift', 'redshift_error', 'snr_u', 'snr_g', 'snr_r', 'snr_i', 'snr_z'
    ]
    empty_df = pd.DataFrame(columns=columns)
    
    # Écraser le fichier existant avec le DataFrame vide
    empty_df.to_csv(catalog_path, sep='|', index=False, encoding='utf-8')
    print(f"Fichier {os.path.basename(catalog_path)} vidé et prêt pour un nouveau lot.")
else:
    print(f"Le fichier {os.path.basename(catalog_path)} n'existe pas encore, pas de nettoyage nécessaire.")

#

## 4) Exploration des features
### Analyse exploratoire des spectres prétraités
Visualiser SNR, distribution de classes, etc.

In [5]:
# On vérifie que la variable 'new_batch_paths' a bien été créée
if 'new_batch_paths' in locals() and new_batch_paths:
    print("\n--- ÉTAPE 4: Lancement du pipeline de traitement ---")

    # --- Étape 4.1 : Charger le catalogue TEMPORAIRE ---
    master_catalog_path = os.path.join(CATALOG_DIR, "master_catalog_temp.csv")
    try:
        master_catalog_df = pd.read_csv(master_catalog_path, sep='|')
        print(f"  > Catalogue temporaire chargé avec succès ({len(master_catalog_df)} entrées).")
    except FileNotFoundError:
        print(f"  > ERREUR CRITIQUE : Le catalogue temporaire est introuvable.")
        master_catalog_df = None

    # --- Étape 4.2 : Initialisation du pipeline ---
    processing_pipeline = ProcessingPipeline(
        raw_data_dir=RAW_DATA_DIR,
        master_catalog_df=master_catalog_df
    )
    
    # --- Étape 4.3 : Lancement du traitement ---
    features_df = processing_pipeline.run(new_batch_paths)
    
    # --- Étape 4.4 : Sauvegarde et affichage des résultats ---
    if not features_df.empty:
        print("\n--- Aperçu du dataset de features généré ---")
        display(features_df.head())
        
        # <<< LA PARTIE MANQUANTE EST ICI >>>
        timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
        features_filename = f"features_{timestamp}.csv"
        features_path = os.path.join(PROCESSED_DIR, features_filename)
        
        os.makedirs(PROCESSED_DIR, exist_ok=True)
        
        features_df.to_csv(features_path, index=False)
        print(f"\nDataset de features sauvegardé avec succès dans : {features_path}")
        # --- FIN DE LA PARTIE MANQUANTE ---
        
    else:
        print("\n  > Aucun feature n'a pu être extrait.")
else:
    print("Veuillez d'abord exécuter la cellule de sélection de lot ('Étape 2').")


--- ÉTAPE 4: Lancement du pipeline de traitement ---
  > Catalogue temporaire chargé avec succès (1000 entrées).

--- Démarrage du pipeline de traitement pour 1000 spectres ---


Traitement des spectres: 100%|██████████| 1000/1000 [00:02<00:00, 489.71it/s]


Pipeline de traitement terminé. 1000 spectres traités et enrichis.

--- Aperçu du dataset de features généré ---





Unnamed: 0,file_path,feature_Hα,feature_Hβ,feature_CaIIK,feature_CaIIH,feature_ratio_CaK_Hbeta,feature_ratio_Halpha_Hbeta,fits_name,obsid,plan_id,...,heliocentric_correction,radial_velocity_corr,seeing,redshift,redshift_error,snr_u,snr_g,snr_r,snr_i,snr_z
0,B6202/spec-55862-B6202_sp06-207.fits.gz,0.0,0.0,0.558529,0.397959,558528.81074,0.0,spec-55862-B6202_sp06-207.fits.gz,706207,B6202,...,True,UNKNOWN,2.6,-0.000313,1.4e-05,0.0,4.72,23.19,68.05,63.93
1,GAC_105N29_B1/spec-55863-GAC_105N29_B1_sp01-06...,0.806066,1.944298,13.562284,4.740178,6.975412,0.414579,spec-55863-GAC_105N29_B1_sp01-066.fits.gz,1501066,GAC_105N29_B1,...,True,UNKNOWN,2.8,-9999.0,-9999.0,0.23,2.41,4.29,4.9,2.69
2,M31_011N40_B1/spec-55863-M31_011N40_B1_sp09-10...,0.794076,2.122971,6.695568,10.044172,3.153865,0.37404,spec-55863-M31_011N40_B1_sp09-102.fits.gz,1609102,M31_011N40_B1,...,True,UNKNOWN,2.8,-9999.0,-9999.0,0.35,1.64,2.25,2.66,1.63
3,GAC_105N29_B1/spec-55863-GAC_105N29_B1_sp03-08...,0.403506,2.577256,1.368954,1.648279,0.531167,0.156564,spec-55863-GAC_105N29_B1_sp03-083.fits.gz,1503083,GAC_105N29_B1,...,True,UNKNOWN,2.8,2.4e-05,3.5e-05,3.61,18.96,21.56,26.78,16.81
4,M31_011N40_M1/spec-55863-M31_011N40_M1_sp10-00...,0.411169,1.445293,11.58836,8.179425,8.017998,0.284488,spec-55863-M31_011N40_M1_sp10-009.fits.gz,1710009,M31_011N40_M1,...,True,UNKNOWN,2.8,-0.000314,0.00012,0.67,3.12,4.03,5.89,2.7



Dataset de features sauvegardé avec succès dans : c:\Users\alexb\Documents\Google_Cloud\alex_labs_google_sprint\astro_spectro_git\data\processed\features_20250726T015727Z.csv


#

## 5) Entraînement du modèle
### Machine Learning
Sélection des features et entraînement d'un modèle de classification.

#

In [None]:
import pandas as pd
from pipeline.classifier import SpectralClassifier
import os
import glob
import hashlib
import json
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 1) Charger le dernier dataset de features
list_of_feature_files = glob.glob(os.path.join(PROCESSED_DIR, 'features_*.csv'))
if not list_of_feature_files:
    print("ERREUR : Aucun fichier de features trouvé.")
else:
    latest_feature_file = max(list_of_feature_files, key=os.path.getctime)
    print(f"--- Chargement du dataset : {os.path.basename(latest_feature_file)} ---")
    features_df = pd.read_csv(latest_feature_file)

    # 2) Créer la colonne 'label' et nettoyer les données
    if 'subclass' in features_df.columns:
        features_df['label'] = features_df['subclass'].astype(str).str[0]
    else:
        features_df['label'] = 'UNKNOWN'
        
    initial_count = len(features_df)
    df_trainable = features_df[features_df["label"].notnull() & ~features_df["label"].isin(['U', 'N', 'n', 'N'])].copy() # Ajout de 'N' majuscule
    print(f"  > {initial_count - len(df_trainable)} lignes avec des labels invalides ou nuls supprimées.")
    
    label_counts = df_trainable["label"].value_counts()
    rare_labels = label_counts[label_counts < 5].index
    if len(rare_labels) > 0:
        print(f"  > Suppression des classes trop rares : {list(rare_labels)}")
        df_trainable = df_trainable[~df_trainable["label"].isin(rare_labels)]

    # 3) Préparer X et y, puis lancer l'entraînement
    if not df_trainable.empty:
        feature_cols = [col for col in df_trainable.columns if col.startswith('feature_')]
        X = df_trainable[feature_cols].values
        y = df_trainable["label"].values

        print(f"\nFeatures utilisées : {feature_cols}")
        print(f"Nombre d'échantillons final : {X.shape[0]}, Nombre de features : {X.shape[1]}")

        print("\n--- ÉTAPE 5: Entraînement et Évaluation du modèle ---")
        clf = SpectralClassifier(n_estimators=200)
        clf.train_and_evaluate(X, y, test_size=0.25)
        
        # --- Si l'entraînement a réussi, on continue avec la sauvegarde et le rapport ---
        
        # 4) Sauvegarder le modèle
        model_path = os.path.join(MODELS_DIR, "spectral_classifier.pkl")
        os.makedirs(MODELS_DIR, exist_ok=True)
        clf.save_model(model_path)
        
        # 5) Mettre à jour le journal des spectres
        print("\n--- ÉTAPE 6: Mise à jour du Journal des Spectres Utilisés ---")
        processed_files = df_trainable['file_path'].tolist()
        builder.update_trained_log(processed_files)
        
        # 6) Générer le rapport de session
        print("\n--- ÉTAPE 7: Génération du Rapport de Session ---")
        timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
        
        model_hash = "N/A"
        if os.path.exists(model_path):
            with open(model_path, "rb") as f:
                model_hash = hashlib.md5(f.read()).hexdigest()
            print(f"  > Hash MD5 du modèle : {model_hash}")
        
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42, stratify=y)
        predictions = clf.model.predict(X_test)
        report_dict = classification_report(y_test, predictions, labels=clf.class_labels, zero_division=0, output_dict=True)
        metrics_summary = report_dict.get("weighted avg", {})
        print(f"  > Métriques extraites : Accuracy = {report_dict.get('accuracy', 0):.2f}")

        session_report = {
            "session_id": timestamp,
            "date_utc": datetime.now(timezone.utc).isoformat(),
            "model_path": model_path,
            "model_hash_md5": model_hash,
            "training_set_size": len(X_train),
            "test_set_size": len(X_test),
            "total_spectra_processed": len(processed_files),
            "feature_columns": feature_cols,
            "class_labels": clf.class_labels,
            "metrics": metrics_summary,
            "processed_files_list": processed_files
        }

        report_filename = f"session_report_{timestamp}.json"
        report_path = os.path.join(REPORTS_DIR, report_filename)
        os.makedirs(REPORTS_DIR, exist_ok=True)

        with open(report_path, "w", encoding="utf-8") as f:
            json.dump(session_report, f, indent=4)
        print(f"\nRapport de session sauvegardé dans : {report_path}")
        
        print("\n\nSESSION DE RECHERCHE TERMINÉE")
    else:
        print("\n  > Pas assez de données valides pour lancer l'entraînement.")

#

## **Permettre l'affichage des labels**

In [None]:
print(features_df["label"].value_counts())