# TESS Exoplanet Modeling (Flask-ready)

**Notebook:** `TESS_Exoplanet_Modeling_FlaskReady.ipynb`

**Purpose:** Full ML workflow for TESS dataset — feature selection, outlier handling, training (RandomForest, XGBoost, LogisticRegression),
evaluation, extraction of top-5 features, and saving artifacts (models, plots, JSON) for a Flask web backend.

**Notes:**
- Dataset expected at `/mnt/data/TESS.csv`.
- Update mapping or column names in the Feature Engineering step if your TESS file uses different headers.


## Step 1 — Imports & Global Config
Import libraries, suppress warnings, and define directories.

In [45]:
# -*- coding: utf-8 -*-
import warnings
warnings.filterwarnings('ignore')

import os
import json
import uuid
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
                             confusion_matrix, roc_curve)
import joblib

# optional: XGBoost may not be installed in every environment; wrapped import
try:
    from xgboost import XGBClassifier
    XGB_AVAILABLE = True
except Exception:
    XGB_AVAILABLE = False

# Directories for Flask/static artifacts
BASE_MODEL_DIR = r'static\models'
PLOTS_DIR = r'static\plots'
RESULTS_DIR = r'static\results'
os.makedirs(BASE_MODEL_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

RANDOM_STATE = 42
sns.set(style='whitegrid')
print("Directories prepared:", BASE_MODEL_DIR, PLOTS_DIR, RESULTS_DIR)


Directories prepared: static\models static\plots static\results


In [46]:
# Fixed get_dataset_path function - corrected "Note books" to "Notebooks"
def get_dataset_path_fixed(dataset_name='TESS'):
    mapping = {
        'TESS': r'Data Sources\TESS.csv',
        # add others if needed
    }
    return mapping.get(dataset_name)

# Use the fixed function
DATA_PATH = get_dataset_path_fixed('TESS')
if DATA_PATH is None or not os.path.exists(DATA_PATH):
    raise FileNotFoundError(f"TESS.csv not found at {DATA_PATH}. Please check the file path in the get_dataset_path function.")
df_raw = pd.read_csv(DATA_PATH)
print('Loaded TESS dataset, shape:', df_raw.shape)
display(df_raw.head())


Loaded TESS dataset, shape: (7703, 27)


Unnamed: 0,rowid,toi,toipfx,tid,ctoi_alias,pl_pnum,tfopwg_disp,rastr,ra,decstr,...,pl_rade,pl_insol,pl_eqt,st_tmag,st_dist,st_teff,st_logg,st_rad,toi_created,rowupdate
0,1,1000.01,1000,50365310,50365310.0,1,FP,07h29m25.85s,112.357708,-12d41m45.46s,...,5.818163,22601.94858,3127.204052,9.604,485.735,10249.0,4.19,2.16986,24/07/2019 15:58,09/09/2024 10:08
1,2,1001.01,1001,88863718,88863720.0,1,PC,08h10m19.31s,122.580465,-05d30m49.87s,...,11.2154,44464.5,4045.0,9.42344,295.862,7070.0,4.03,2.01,24/07/2019 15:58,03/04/2023 14:31
2,3,1002.01,1002,124709665,124709700.0,1,FP,06h58m54.47s,104.726966,-10d34m49.64s,...,23.7529,2860.61,2037.0,9.299501,943.109,8924.0,,5.73,24/07/2019 15:58,11/07/2022 16:02
3,4,1003.01,1003,106997505,106997500.0,1,FP,07h22m14.39s,110.559945,-25d12m25.26s,...,,1177.36,1631.0,9.3003,7728.17,5388.5,4.15,,24/07/2019 15:58,23/02/2022 10:10
4,5,1004.01,1004,238597883,238597900.0,1,FP,08h08m42.77s,122.178195,-48d48m10.12s,...,11.3113,54679.3,4260.0,9.1355,356.437,9219.0,4.14,2.15,24/07/2019 15:58,09/09/2024 10:08


In [None]:
# Fixed directory paths - corrected "Note books" to "Notebooks"
BASE_MODEL_DIR = r'static\models'
PLOTS_DIR = r'static\plots'
RESULTS_DIR = r'static\results'

# Create directories if they don't exist
os.makedirs(BASE_MODEL_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

print(f"Directories prepared: {BASE_MODEL_DIR} {PLOTS_DIR} {RESULTS_DIR}")


Directories prepared: C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\models C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\plots C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\results


## Step 2 — Dataset Path Helper
Map dataset name to path (TESS is default).

In [None]:
def get_dataset_path(dataset_name='TESS'):
    mapping = {
        'TESS': r"Data Sources\TESS.csv",
        # add others if needed
    }
    return mapping.get(dataset_name)


## Step 3 — Load & Inspect Data
Load TESS.csv and show basic info.

In [49]:
import os

def get_dataset_path(dataset_name: str) -> str:
    """
    Return the full path to a dataset CSV file.
    
    Args:
        dataset_name (str): Name of the dataset (e.g., 'Kepler', 'TESS').
    
    Returns:
        str: Full path to the dataset file.
    """
    # Base data folder (adjust if needed)
    base_dir = r"C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\Data Sources"
    
    # Map dataset names to filenames
    dataset_files = {
        "Kepler": "Kepler.csv",
        "TESS": "TESS.csv"
    }
    
    # Return path if exists
    if dataset_name in dataset_files:
        return os.path.join(base_dir, dataset_files[dataset_name])
    else:
        return None


## Step 4 — Feature Engineering & Renaming
Select relevant columns and rename to readable names. Update mapping if your CSV uses different columns.

In [50]:
# Make a copy and normalize column names
df = df_raw.copy()
df.columns = [c.strip() for c in df.columns]

# TESS dataset column selection & rename dictionary
selected = [
    # physical / transit properties
    "pl_rade", "pl_trandep", "pl_orbper", "pl_trandurh", 
    "pl_insol", "pl_eqt",
    # stellar properties
    "st_teff", "st_logg", "st_rad", "st_tmag", "st_dist",
    # coordinates
    "ra", "dec",
    # disposition
    "tfopwg_disp"
]

rename_map = {
    "pl_rade": "planet_radius_earth",
    "pl_trandep": "transit_depth_ppm", 
    "pl_orbper": "orbital_period_days",
    "pl_trandurh": "transit_duration_hrs",
    "pl_insol": "insolation_flux_earth",
    "pl_eqt": "equilibrium_temp_k",
    "st_teff": "stellar_temp_k",
    "st_logg": "stellar_logg",
    "st_rad": "stellar_radius_solar",
    "st_tmag": "stellar_magnitude",
    "st_dist": "stellar_distance_pc",
    "ra": "RA_deg",
    "dec": "Dec_deg",
    "tfopwg_disp": "disposition"
}

# Keep only those columns that exist in the dataframe
cols = [c for c in selected if c in df.columns]
df = df[cols].rename(columns=rename_map)

# Create Target column based on TESS disposition values
# PC (Planet Candidate), CP (Confirmed Planet), KP (Kepler Planet), APC (Ambiguous Planet Candidate) = 1 (positive)
# FP (False Positive), FA (False Alarm) = 0 (negative)
if 'disposition' in df.columns:
    # Filter to keep only valid disposition values
    valid_dispositions = ['PC', 'FP', 'CP', 'KP', 'APC', 'FA']
    df = df[df['disposition'].isin(valid_dispositions)].copy()
    
    # Create Target: 1 for planet candidates/confirmed, 0 for false positives
    df['Target'] = df['disposition'].isin(['PC', 'CP', 'KP', 'APC']).astype(int)
    print(f"Target distribution: {df['Target'].value_counts().to_dict()}")
else:
    print('Warning: disposition column not found — please verify column name and edit the notebook accordingly.')

print('After selection & renaming, shape:', df.shape)
display(df.head())


Target distribution: {1: 6408, 0: 1295}
After selection & renaming, shape: (7703, 15)


Unnamed: 0,planet_radius_earth,transit_depth_ppm,orbital_period_days,transit_duration_hrs,insolation_flux_earth,equilibrium_temp_k,stellar_temp_k,stellar_logg,stellar_radius_solar,stellar_magnitude,stellar_distance_pc,RA_deg,Dec_deg,disposition,Target
0,5.818163,656.886099,2.171348,2.01722,22601.94858,3127.204052,10249.0,4.19,2.16986,9.604,485.735,112.357708,-12.69596,FP,0
1,11.2154,1286.0,1.931646,3.166,44464.5,4045.0,7070.0,4.03,2.01,9.42344,295.862,122.580465,-5.513852,PC,1
2,23.7529,1500.0,1.867557,1.408,2860.61,2037.0,8924.0,,5.73,9.299501,943.109,104.726966,-10.580455,FP,0
3,,383.41,2.74323,3.167,1177.36,1631.0,5388.5,4.15,,9.3003,7728.17,110.559945,-25.207017,FP,0
4,11.3113,755.0,3.573014,3.37,54679.3,4260.0,9219.0,4.14,2.15,9.1355,356.437,122.178195,-48.802811,FP,0


## Step 5 — Missing Value Handling & Conservative Outlier Removal

In [51]:
# Fill missing numeric with median, categorical with mode
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df.select_dtypes(include=['object']).columns.tolist()

for c in numeric_cols:
    if df[c].isnull().any():
        df[c].fillna(df[c].median(), inplace=True)
for c in categorical_cols:
    if df[c].isnull().any():
        df[c].fillna(df[c].mode().iloc[0], inplace=True)

print('Nulls after imputation:', df.isnull().sum().sum())

# Conservative outlier removal using 1st/99th percentiles and 3*IQR
def remove_extreme_outliers(df, numeric_cols, lower_q=0.01, upper_q=0.99, multiplier=3.0):
    q_low = df[numeric_cols].quantile(lower_q)
    q_high = df[numeric_cols].quantile(upper_q)
    iqr = q_high - q_low
    lower = q_low - multiplier * iqr
    upper = q_high + multiplier * iqr
    mask = ~((df[numeric_cols] < lower) | (df[numeric_cols] > upper)).any(axis=1)
    return df[mask]

num_cols = [c for c in numeric_cols if c != 'Target']
if len(num_cols) > 0:
    df_clean = remove_extreme_outliers(df, num_cols, lower_q=0.01, upper_q=0.99, multiplier=3.0)
    print('Shape before outlier removal:', df.shape, 'after:', df_clean.shape)
else:
    df_clean = df.copy()
    print('No numeric columns to apply outlier removal.')

df = df_clean.copy()


Nulls after imputation: 0
Shape before outlier removal: (7703, 15) after: (7666, 15)


## Step 6 — Train/Val/Test Split & Preprocessor

In [52]:
# Ensure Target exists
if 'Target' not in df.columns:
    raise ValueError("Target column not found — set Target creation logic in the Feature Engineering step.")

X = df.drop(columns=['Target'])
y = df['Target'].astype(int)

# train/val/test split (60/20/20)
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=RANDOM_STATE, stratify=y_train_val)

print('Shapes -> train:', X_train.shape, 'val:', X_val.shape, 'test:', X_test.shape)

numeric_features = X_train.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = X_train.select_dtypes(include=['object']).columns.tolist()

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer

def build_preprocessor(numeric_features, categorical_features):
    num_pipe = Pipeline([('scaler', StandardScaler())])
    cat_pipe = Pipeline([('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))]) if len(categorical_features)>0 else None
    transformers = []
    if len(numeric_features)>0:
        transformers.append(('num', num_pipe, numeric_features))
    if cat_pipe is not None:
        transformers.append(('cat', cat_pipe, categorical_features))
    preprocessor = ColumnTransformer(transformers, remainder='drop')
    return preprocessor

preprocessor = build_preprocessor(numeric_features, categorical_features)
print('Preprocessor built. Numeric features:', len(numeric_features), 'Categorical:', len(categorical_features))


Shapes -> train: (4599, 14) val: (1533, 14) test: (1534, 14)
Preprocessor built. Numeric features: 13 Categorical: 1


## Step 7 — Train Models (RandomForest, XGBoost if available, LogisticRegression)

In [53]:
models = {
    'RandomForest': RandomForestClassifier(n_estimators=200, random_state=RANDOM_STATE),
    'LogisticRegression': LogisticRegression(max_iter=1000, random_state=RANDOM_STATE)
}
if XGB_AVAILABLE:
    models['XGBoost'] = XGBClassifier(eval_metric='logloss', use_label_encoder=False, random_state=RANDOM_STATE)

trained_models = {}
for name, clf in models.items():
    pipe = Pipeline([('preprocessor', preprocessor), ('clf', clf)])
    print('Training', name)
    pipe.fit(X_train, y_train)
    path = os.path.join(BASE_MODEL_DIR, f'{name}_pipeline.pkl')
    joblib.dump(pipe, path)
    trained_models[name] = pipe
    print('Saved', name, '->', path)


Training RandomForest


Saved RandomForest -> C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\models\RandomForest_pipeline.pkl
Training LogisticRegression
Saved LogisticRegression -> C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\models\LogisticRegression_pipeline.pkl
Training XGBoost
Saved XGBoost -> C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\models\XGBoost_pipeline.pkl


## Step 8 — Evaluate Models & Save Metrics/Plots

In [54]:
def evaluate_models(models_dict, X_eval, y_eval, dataset_name='TESS'):
    results = {}
    for name, model in models_dict.items():
        y_pred = model.predict(X_eval)
        y_proba = model.predict_proba(X_eval)[:,1] if hasattr(model, 'predict_proba') else None
        metrics = {
            'accuracy': float(accuracy_score(y_eval, y_pred)),
            'precision': float(precision_score(y_eval, y_pred, zero_division=0)),
            'recall': float(recall_score(y_eval, y_pred, zero_division=0)),
            'f1': float(f1_score(y_eval, y_pred, zero_division=0)),
            'auc': float(roc_auc_score(y_eval, y_proba)) if y_proba is not None else None
        }
        results[name] = metrics

        # confusion matrix
        cm = confusion_matrix(y_eval, y_pred)
        plt.figure(figsize=(4,3))
        sns.heatmap(cm, annot=True, fmt='d', cbar=False)
        plt.title(f'{dataset_name} - {name} Confusion')
        cm_path = os.path.join(PLOTS_DIR, f'{dataset_name}_{name}_confusion.png')
        plt.savefig(cm_path, bbox_inches='tight')
        plt.close()

        # ROC
        if y_proba is not None:
            fpr, tpr, _ = roc_curve(y_eval, y_proba)
            plt.figure(figsize=(5,4))
            plt.plot(fpr, tpr, label=f'AUC={metrics["auc"]:.3f}')
            plt.plot([0,1],[0,1],'--')
            plt.xlabel('FPR'); plt.ylabel('TPR'); plt.title(f'{dataset_name} - {name} ROC')
            roc_path = os.path.join(PLOTS_DIR, f'{dataset_name}_{name}_roc.png')
            plt.savefig(roc_path, bbox_inches='tight')
            plt.close()

    out_path = os.path.join(RESULTS_DIR, f'{dataset_name}_metrics.json')
    with open(out_path, 'w') as f:
        json.dump(results, f, indent=2)
    print('Saved metrics to', out_path)
    return results

metrics = evaluate_models(trained_models, X_val, y_val, dataset_name='TESS')

# Comparison barplot
res_df = pd.DataFrame(metrics).T.reset_index().rename(columns={'index':'model'})
melted = res_df.melt(id_vars='model', value_vars=[c for c in ['accuracy','precision','recall','f1','auc'] if c in res_df.columns],
                     var_name='metric', value_name='value')
plt.figure(figsize=(10,5))
sns.barplot(data=melted, x='model', y='value', hue='metric')
plt.xticks(rotation=45)
plt.tight_layout()
comp_path = os.path.join(PLOTS_DIR, 'TESS_model_comparison.png')
plt.savefig(comp_path, bbox_inches='tight')
plt.close()
print('Saved comparison plot to', comp_path)


Saved metrics to C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\results\TESS_metrics.json
Saved comparison plot to C:\Users\Abdelrahman Bakr\Desktop\me\project\Nasa\Exoplanets-Detection-Using-Machine-Learning\Backend\Notebooks\static\plots\TESS_model_comparison.png


## Step 9 — Extract Top-5 Features per Model & Save Medians

In [55]:
def extract_top_features_and_save(models_dict, X_train_df, dataset_name='TESS', top_k=5):
    feature_names = X_train_df.columns.tolist()
    medians = X_train_df.median(numeric_only=True).to_dict()
    with open(os.path.join(RESULTS_DIR, f'{dataset_name}_feature_medians.json'), 'w') as f:
        json.dump(medians, f, indent=2)
    # save training columns order
    with open(os.path.join(RESULTS_DIR, f'{dataset_name}_training_columns.json'), 'w') as f:
        json.dump(feature_names, f, indent=2)

    all_top = {}
    for name, model in models_dict.items():
        clf = model.named_steps['clf']
        imp = None
        if hasattr(clf, 'feature_importances_'):
            imp = np.array(clf.feature_importances_)
        elif hasattr(clf, 'coef_'):
            arr = np.array(clf.coef_)
            imp = np.abs(arr.ravel())[:len(feature_names)]
        else:
            print('No importances for', name)
            continue

        if len(imp) != len(feature_names):
            print(f'Warning: importance length {len(imp)} != feature len {len(feature_names)} for {name}')
            # try to skip or map where possible
            continue

        s = pd.Series(imp, index=feature_names).sort_values(ascending=False)
        top_feats = s.head(top_k).index.tolist()
        all_top[name] = top_feats
        with open(os.path.join(RESULTS_DIR, f'{dataset_name}_{name}_top_features.json'), 'w') as f:
            json.dump(top_feats, f, indent=2)

        # plot top features
        plt.figure(figsize=(6, max(2, len(top_feats)*0.5)))
        sns.barplot(x=s.head(top_k).values, y=s.head(top_k).index)
        plt.title(f'{name} Top {top_k} features')
        plt.tight_layout()
        plt.savefig(os.path.join(PLOTS_DIR, f'{dataset_name}_{name}_topk.png'), bbox_inches='tight')
        plt.close()
        print('Saved top features for', name)
    return all_top

top_features = extract_top_features_and_save(trained_models, X_train, dataset_name='TESS', top_k=5)
print('Top features saved:', top_features)


Saved top features for LogisticRegression
Top features saved: {'LogisticRegression': ['disposition', 'equilibrium_temp_k', 'stellar_radius_solar', 'planet_radius_earth', 'stellar_magnitude']}


## Step 10 — Prediction Helpers for Flask

In [56]:
def predict_from_full_vector(model_name, input_vector):
    model_path = os.path.join(BASE_MODEL_DIR, f'{model_name}_pipeline.pkl')
    if not os.path.exists(model_path):
        raise FileNotFoundError(f'Model not found: {model_path}')
    model = joblib.load(model_path)
    X = np.array(input_vector).reshape(1, -1)
    pred = int(model.predict(X)[0])
    proba = float(model.predict_proba(X)[0][1]) if hasattr(model, 'predict_proba') else None
    return {'prediction': pred, 'probability': proba}

def predict_from_top5(model_name, dataset_name, top5_mapping):
    # top5_mapping: dict {feature_name: value} for the model's top5 order
    med_path = os.path.join(RESULTS_DIR, f'{dataset_name}_feature_medians.json')
    cols_path = os.path.join(RESULTS_DIR, f'{dataset_name}_training_columns.json')
    top_path = os.path.join(RESULTS_DIR, f'{dataset_name}_{model_name}_top_features.json')
    if not (os.path.exists(med_path) and os.path.exists(cols_path) and os.path.exists(top_path)):
        raise FileNotFoundError('Required artifacts (medians/cols/top_features) missing. Run training workflow first.')
    with open(med_path,'r') as f:
        medians = json.load(f)
    with open(cols_path,'r') as f:
        cols = json.load(f)
    with open(top_path,'r') as f:
        top_feats = json.load(f)
    # start with medians
    row = {c: medians.get(c, 0.0) for c in cols}
    # overwrite with provided top5
    for k,v in top5_mapping.items():
        if k not in row:
            raise ValueError(f'Feature {k} not in training columns')
        row[k] = v
    df_row = pd.DataFrame([row], columns=cols)
    model = joblib.load(os.path.join(BASE_MODEL_DIR, f'{model_name}_pipeline.pkl'))
    pred = int(model.predict(df_row)[0])
    proba = float(model.predict_proba(df_row)[0][1]) if hasattr(model, 'predict_proba') else None
    return {'prediction': pred, 'probability': proba}


## Step 11 — Run Full Workflow (example)
Run `run_full_workflow()` to execute the end-to-end pipeline for TESS. It will save artifacts for Flask consumption.

## Step 13 — Model Results Analysis Function

Comprehensive function to analyze and display modeling results with visualizations and detailed metrics.


In [57]:
def run_full_workflow(dataset_name='TESS'):
    path = get_dataset_path(dataset_name)
    if path is None or not os.path.exists(path):
        raise FileNotFoundError(f'Dataset not found at {path}')
    df0 = pd.read_csv(path)
    print('Initial shape:', df0.shape)
    # reuse logic above: normalize column names and select/rename
    df0.columns = [c.strip() for c in df0.columns]
    # apply same selection & rename as earlier in notebook
    # to keep notebook compact, call the steps we defined earlier in cells: we will rebuild minimal selection here
    df1 = df0.copy()
    # attempt to recreate same processing: only keep columns present in df
    cols_present = [c for c in ['pl_rade','pl_trandep','pl_orbper','pl_trandurh','pl_insol','pl_eqt','st_teff','st_logg','st_rad','st_tmag','st_dist','ra','dec','tfopwg_disp'] if c in df1.columns]
    df_sel = df1[cols_present].copy()
    # rename where possible
    df_sel = df_sel.rename(columns={k: v for k,v in {
        'pl_rade': 'planet_radius_earth','pl_trandep':'transit_depth_ppm','pl_orbper':'orbital_period_days','pl_trandurh':'transit_duration_hrs',
        'pl_insol':'insolation_flux_earth','pl_eqt':'equilibrium_temp_k','st_teff':'stellar_temp_k','st_logg':'stellar_logg',
        'st_rad':'stellar_radius_solar','st_tmag':'stellar_magnitude','st_dist':'stellar_distance_pc','ra':'RA_deg','dec':'Dec_deg','tfopwg_disp':'disposition'
    }.items() if k in df1.columns})
    # create Target if possible
    if 'disposition' in df_sel.columns:
        valid_dispositions = ['PC', 'FP', 'CP', 'KP', 'APC', 'FA']
        df_sel = df_sel[df_sel['disposition'].isin(valid_dispositions)].copy()
        df_sel['Target'] = df_sel['disposition'].isin(['PC', 'CP', 'KP', 'APC']).astype(int)
    else:
        raise ValueError('disposition not found in dataset; cannot create Target automatically.')
    print('Selected & filtered shape:', df_sel.shape)
    # proceed with imputation and outlier removal (reuse above)
    numeric_cols = df_sel.select_dtypes(include=[np.number]).columns.tolist()
    for c in numeric_cols:
        df_sel[c].fillna(df_sel[c].median(), inplace=True)
    categorical_cols = df_sel.select_dtypes(include=['object']).columns.tolist()
    for c in categorical_cols:
        df_sel[c].fillna(df_sel[c].mode().iloc[0], inplace=True)
    df_clean = remove_extreme_outliers(df_sel, [c for c in numeric_cols if c!='Target'], lower_q=0.01, upper_q=0.99, multiplier=3.0)
    df_clean = df_clean.dropna(subset=['Target'])
    print('After cleaning shape:', df_clean.shape)
    # save training columns and medians
    training_cols = [c for c in df_clean.columns if c!='Target']
    with open(os.path.join(RESULTS_DIR, 'TESS_training_columns.json'), 'w') as f:
        json.dump(training_cols, f, indent=2)
    medians = df_clean[training_cols].median(numeric_only=True).to_dict()
    with open(os.path.join(RESULTS_DIR, 'TESS_feature_medians.json'), 'w') as f:
        json.dump(medians, f, indent=2)
    # split & train
    X = df_clean[training_cols]
    y = df_clean['Target'].astype(int)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y)
    preprocessor = build_preprocessor(X_train.select_dtypes(include=[np.number]).columns.tolist(), X_train.select_dtypes(include=['object']).columns.tolist())
    # train models
    trained = {}
    model_defs = {'RandomForest': RandomForestClassifier(n_estimators=200, random_state=RANDOM_STATE), 'LogisticRegression': LogisticRegression(max_iter=1000, random_state=RANDOM_STATE)}
    if XGB_AVAILABLE:
        model_defs['XGBoost'] = XGBClassifier(eval_metric='logloss', use_label_encoder=False, random_state=RANDOM_STATE)
    for name, clf in model_defs.items():
        pipe = Pipeline([('preprocessor', preprocessor), ('clf', clf)])
        pipe.fit(X_train, y_train)
        joblib.dump(pipe, os.path.join(BASE_MODEL_DIR, f'{name}_pipeline.pkl'))
        trained[name] = pipe
    # evaluate
    metrics = evaluate_models(trained, X_test, y_test, dataset_name='TESS')
    # extract top features
    extract_top_features_and_save(trained, X_train, dataset_name='TESS', top_k=5)
    print('Workflow finished for TESS. Artifacts in /mnt/data/static/')
    return metrics

# Note: Running run_full_workflow() will train models and save artifacts.


In [58]:
def analyze_model_results(trained_models, X_test, y_test, X_train, y_train, dataset_name='TESS'):
    """
    Comprehensive analysis of model results with detailed metrics, visualizations, and comparisons.
    
    Parameters:
    - trained_models: dict of trained model pipelines
    - X_test, y_test: test set for evaluation
    - X_train, y_train: training set for analysis
    - dataset_name: name for saving files
    """
    import matplotlib.pyplot as plt
    import seaborn as sns
    from sklearn.metrics import (classification_report, confusion_matrix, 
                                roc_curve, precision_recall_curve, 
                                average_precision_score)
    import pandas as pd
    import numpy as np
    
    print("="*80)
    print(f"COMPREHENSIVE MODEL ANALYSIS FOR {dataset_name.upper()}")
    print("="*80)
    
    # Initialize results storage
    results = {}
    predictions = {}
    probabilities = {}
    
    # 1. INDIVIDUAL MODEL ANALYSIS
    print("\n" + "="*50)
    print("INDIVIDUAL MODEL PERFORMANCE")
    print("="*50)
    
    for model_name, model in trained_models.items():
        print(f"\n--- {model_name.upper()} ---")
        
        # Get predictions
        y_pred = model.predict(X_test)
        y_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
        
        predictions[model_name] = y_pred
        probabilities[model_name] = y_proba
        
        # Calculate metrics
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
        
        metrics = {
            'accuracy': accuracy_score(y_test, y_pred),
            'precision': precision_score(y_test, y_pred, zero_division=0),
            'recall': recall_score(y_test, y_pred, zero_division=0),
            'f1': f1_score(y_test, y_pred, zero_division=0),
            'auc': roc_auc_score(y_test, y_proba) if y_proba is not None else None
        }
        
        results[model_name] = metrics
        
        # Print detailed metrics
        print(f"Accuracy:  {metrics['accuracy']:.4f}")
        print(f"Precision: {metrics['precision']:.4f}")
        print(f"Recall:    {metrics['recall']:.4f}")
        print(f"F1-Score:  {metrics['f1']:.4f}")
        if metrics['auc']:
            print(f"AUC-ROC:   {metrics['auc']:.4f}")
        
        # Classification report
        print(f"\nClassification Report:")
        print(classification_report(y_test, y_pred, target_names=['False Positive', 'Exoplanet']))
        
        # Confusion Matrix
        cm = confusion_matrix(y_test, y_pred)
        print(f"\nConfusion Matrix:")
        print(f"True Negatives:  {cm[0,0]:4d} | False Positives: {cm[0,1]:4d}")
        print(f"False Negatives: {cm[1,0]:4d} | True Positives:  {cm[1,1]:4d}")
        
        # Feature importance (if available)
        if hasattr(model.named_steps['clf'], 'feature_importances_'):
            feature_names = X_train.columns.tolist()
            importances = model.named_steps['clf'].feature_importances_
            feature_importance = pd.Series(importances, index=feature_names).sort_values(ascending=False)
            print(f"\nTop 5 Most Important Features:")
            for i, (feature, importance) in enumerate(feature_importance.head().items()):
                print(f"{i+1}. {feature}: {importance:.4f}")
    
    # 2. MODEL COMPARISON
    print("\n" + "="*50)
    print("MODEL COMPARISON")
    print("="*50)
    
    # Create comparison DataFrame
    comparison_df = pd.DataFrame(results).T
    print("\nPerformance Comparison:")
    print(comparison_df.round(4))
    
    # Find best model for each metric
    print(f"\nBest Models:")
    for metric in ['accuracy', 'precision', 'recall', 'f1', 'auc']:
        if metric in comparison_df.columns and not comparison_df[metric].isna().all():
            best_model = comparison_df[metric].idxmax()
            best_score = comparison_df.loc[best_model, metric]
            print(f"Best {metric.upper()}: {best_model} ({best_score:.4f})")
    
    # 3. VISUALIZATIONS
    print("\n" + "="*50)
    print("GENERATING VISUALIZATIONS")
    print("="*50)
    
    # Set up the plotting style
    plt.style.use('default')
    sns.set_palette("husl")
    
    # 3.1 Model Comparison Bar Plot
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle(f'{dataset_name} - Model Performance Comparison', fontsize=16, fontweight='bold')
    
    # Accuracy comparison
    comparison_df['accuracy'].plot(kind='bar', ax=axes[0,0], title='Accuracy Comparison')
    axes[0,0].set_ylabel('Accuracy')
    axes[0,0].tick_params(axis='x', rotation=45)
    
    # F1-Score comparison
    comparison_df['f1'].plot(kind='bar', ax=axes[0,1], title='F1-Score Comparison')
    axes[0,1].set_ylabel('F1-Score')
    axes[0,1].tick_params(axis='x', rotation=45)
    
    # Precision comparison
    comparison_df['precision'].plot(kind='bar', ax=axes[1,0], title='Precision Comparison')
    axes[1,0].set_ylabel('Precision')
    axes[1,0].tick_params(axis='x', rotation=45)
    
    # Recall comparison
    comparison_df['recall'].plot(kind='bar', ax=axes[1,1], title='Recall Comparison')
    axes[1,1].set_ylabel('Recall')
    axes[1,1].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    comparison_plot_path = os.path.join(PLOTS_DIR, f'{dataset_name}_model_comparison_detailed.png')
    plt.savefig(comparison_plot_path, dpi=300, bbox_inches='tight')
    plt.show()
    print(f"Saved detailed comparison plot: {comparison_plot_path}")
    
    # 3.2 ROC Curves
    plt.figure(figsize=(10, 8))
    for model_name, y_proba in probabilities.items():
        if y_proba is not None:
            fpr, tpr, _ = roc_curve(y_test, y_proba)
            auc_score = roc_auc_score(y_test, y_proba)
            plt.plot(fpr, tpr, label=f'{model_name} (AUC = {auc_score:.3f})', linewidth=2)
    
    plt.plot([0, 1], [0, 1], 'k--', label='Random Classifier')
    plt.xlabel('False Positive Rate', fontsize=12)
    plt.ylabel('True Positive Rate', fontsize=12)
    plt.title(f'{dataset_name} - ROC Curves Comparison', fontsize=14, fontweight='bold')
    plt.legend(fontsize=11)
    plt.grid(True, alpha=0.3)
    roc_plot_path = os.path.join(PLOTS_DIR, f'{dataset_name}_roc_curves.png')
    plt.savefig(roc_plot_path, dpi=300, bbox_inches='tight')
    plt.show()
    print(f"Saved ROC curves plot: {roc_plot_path}")
    
    # 3.3 Confusion Matrices
    n_models = len(trained_models)
    cols = min(3, n_models)
    rows = (n_models + cols - 1) // cols
    
    fig, axes = plt.subplots(rows, cols, figsize=(5*cols, 4*rows))
    if n_models == 1:
        axes = [axes]
    elif rows == 1:
        axes = axes.reshape(1, -1)
    
    for i, (model_name, y_pred) in enumerate(predictions.items()):
        row, col = i // cols, i % cols
        ax = axes[row, col] if rows > 1 else axes[col]
        
        cm = confusion_matrix(y_test, y_pred)
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
                   xticklabels=['False Positive', 'Exoplanet'],
                   yticklabels=['False Positive', 'Exoplanet'])
        ax.set_title(f'{model_name} Confusion Matrix', fontweight='bold')
        ax.set_xlabel('Predicted')
        ax.set_ylabel('Actual')
    
    # Hide empty subplots
    for i in range(n_models, rows * cols):
        row, col = i // cols, i % cols
        ax = axes[row, col] if rows > 1 else axes[col]
        ax.set_visible(False)
    
    plt.tight_layout()
    cm_plot_path = os.path.join(PLOTS_DIR, f'{dataset_name}_confusion_matrices.png')
    plt.savefig(cm_plot_path, dpi=300, bbox_inches='tight')
    plt.show()
    print(f"Saved confusion matrices plot: {cm_plot_path}")
    
    # 4. DATASET ANALYSIS
    print("\n" + "="*50)
    print("DATASET ANALYSIS")
    print("="*50)
    
    print(f"Dataset: {dataset_name}")
    print(f"Total samples: {len(X_train) + len(X_test)}")
    print(f"Training samples: {len(X_train)}")
    print(f"Test samples: {len(X_test)}")
    print(f"Features: {X_train.shape[1]}")
    
    # Class distribution
    train_dist = pd.Series(y_train).value_counts().sort_index()
    test_dist = pd.Series(y_test).value_counts().sort_index()
    
    print(f"\nClass Distribution:")
    print(f"Training set - False Positive: {train_dist[0]}, Exoplanet: {train_dist[1]}")
    print(f"Test set - False Positive: {test_dist[0]}, Exoplanet: {test_dist[1]}")
    
    # Feature analysis
    print(f"\nFeature Analysis:")
    numeric_features = X_train.select_dtypes(include=[np.number]).columns
    categorical_features = X_train.select_dtypes(include=['object']).columns
    print(f"Numeric features: {len(numeric_features)}")
    print(f"Categorical features: {len(categorical_features)}")
    
    if len(numeric_features) > 0:
        print(f"\nNumeric features: {list(numeric_features)}")
    if len(categorical_features) > 0:
        print(f"Categorical features: {list(categorical_features)}")
    
    # 5. SAVE RESULTS
    print("\n" + "="*50)
    print("SAVING RESULTS")
    print("="*50)
    
    # Save detailed results
    detailed_results = {
        'dataset_info': {
            'name': dataset_name,
            'total_samples': len(X_train) + len(X_test),
            'train_samples': len(X_train),
            'test_samples': len(X_test),
            'features': X_train.shape[1],
            'numeric_features': len(numeric_features),
            'categorical_features': len(categorical_features)
        },
        'class_distribution': {
            'train': train_dist.to_dict(),
            'test': test_dist.to_dict()
        },
        'model_performance': results,
        'best_models': {
            metric: comparison_df[metric].idxmax() 
            for metric in comparison_df.columns 
            if not comparison_df[metric].isna().all()
        }
    }
    
    results_path = os.path.join(RESULTS_DIR, f'{dataset_name}_detailed_analysis.json')
    with open(results_path, 'w') as f:
        json.dump(detailed_results, f, indent=2, default=str)
    
    print(f"Saved detailed analysis: {results_path}")
    print(f"Saved plots in: {PLOTS_DIR}")
    
    print("\n" + "="*80)
    print("ANALYSIS COMPLETE!")
    print("="*80)
    
    return detailed_results, comparison_df


In [59]:
# Example usage of the analysis function
# Run this after training your models to get comprehensive results

# detailed_results, comparison_df = analyze_model_results(
#     trained_models=trained_models,
#     X_test=X_test, 
#     y_test=y_test,
#     X_train=X_train,
#     y_train=y_train,
#     dataset_name='TESS'
# )

print("Analysis function ready! Uncomment the lines above to run comprehensive model analysis.")


Analysis function ready! Uncomment the lines above to run comprehensive model analysis.


## Step 12 — Final Summary
After running, models will be saved to `/mnt/data/static/models`, plots to `/mnt/data/static/plots`, and metrics/top-features to `/mnt/data/static/results`. Use these artifacts in your Flask backend.