NASA SPACE APPS HACKATHON - PROJECT NOTEBOOK

Title : Exoplanet Detection using Artificial Intelligence and Machine Learning

Team : Ravenhart Vexmoor 

Track  : Best use of Science / Technology / Data / Global Impact

1: Importing Required Libraries

In [None]:
import pandas as pd
import scipy
from sklearn.metrics import classification_report, roc_auc_score
import astropy
import lightkurve
import matplotlib
import tensorflow
import shap
import os, json
import glob
import numpy as np
from astropy.timeseries import BoxLeastSquares
from scipy.signal import periodogram
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.calibration import CalibratedClassifierCV
import joblib
from clean import load_and_preprocess
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier
import argparse
from fastapi import FastAPI, UploadFile, File
from preprocess import prepare_lightcurve_from_df
from features import extract_features
from vetting import vet_flags
from models import build_baseline_model, FEATURE_COLUMNS
from evaluate import evaluate_and_save

2: Loading and Processing of Database

In [None]:
# clean.py

def load_and_preprocess(file_path="dataset.csv"):
    # Load dataset
    df = pd.read_csv(file_path)

    # Drop duplicates (if any)
    df.drop_duplicates(inplace=True)

    # Handle missing values: fill with mean or drop (choose wisely)
    df.fillna(df.mean(numeric_only=True), inplace=True)

    # Select useful features for ML
    features = [
        "teff", "logg", "feh", "radius", "mass", "dens", "kepmag"
    ]
    target = "nconfp"  # Number of confirmed planets (can be binary label)

    # Keep only selected columns if present
    available_features = [f for f in features if f in df.columns]
    if target in df.columns:
        df = df[available_features + [target]]
    else:
        df = df[available_features]

    return df


In [None]:
# preprocess.py

DATASET_PATH = "datasets/"
OUTPUT_FILE = "dataset.csv"

def load_and_merge_datasets(path=DATASET_PATH):
    # Collect all CSV files in datasets/
    csv_files = glob.glob(os.path.join(path, "*.csv"))
    print(f"Found {len(csv_files)} CSV files.")

    dataframes = []
    for file in csv_files:
        try:
            df = pd.read_csv(file, low_memory=False)
            print(f"Loaded {file} with shape {df.shape}")
            dataframes.append(df)
        except Exception as e:
            print(f"❌ Could not load {file}: {e}")

    # Merge datasets (only common columns kept)
    merged_df = pd.concat(dataframes, axis=0, ignore_index=True, sort=False)

    print(f"Total merged shape: {merged_df.shape}")
    return merged_df


def preprocess_data(df):
    # Drop duplicate rows
    df = df.drop_duplicates()

    # Drop completely empty columns
    df = df.dropna(axis=1, how="all")

    # Fill missing values with median (numeric) or mode (categorical)
    for col in df.columns:
        if df[col].dtype in ["float64", "int64"]:
            df[col] = df[col].fillna(df[col].median())
        else:
            df[col] = df[col].fillna(df[col].mode()[0])

    return df


def save_dataset(df, filename=OUTPUT_FILE):
    df.to_csv(filename, index=False)
    print(f"✅ Processed dataset saved to {filename}")


if __name__ == "__main__":
    merged = load_and_merge_datasets()
    processed = preprocess_data(merged)
    save_dataset(processed)


3: Feature Engineering

In [None]:
"""
features.py
Compute transit-search features using Box Least Squares (BLS) and other heuristics.
Designed to produce tabular features for lightweight ML (RandomForest).
"""

def compute_bls(time, flux, min_period=0.3, max_period=30.0, n_periods=2000, q_min=0.005, q_max=0.1):
    # time: days, flux: relative (around 0)
    try:
        model = BoxLeastSquares(time, flux)
        periods = np.linspace(min_period, max_period, n_periods)
        durations = np.linspace(q_min, q_max, 10) * periods[:, None]
        power = model.power(periods, durations)
        idx = np.nanargmax(power.power)
        best_period = power.period[idx]
        # select corresponding best duration & t0 (closest index)
        best_duration = power.duration[idx]
        best_t0 = power.transit_time[idx]
        # depth approximate
        phase_mask = ((time - best_t0 + 0.5*best_period) % best_period) / best_period
        in_transit = (phase_mask > 0.5 - best_duration/(2*best_period)) & (phase_mask < 0.5 + best_duration/(2*best_period))
        depth = np.nanmedian(flux[in_transit]) - np.nanmedian(flux[~in_transit]) if np.sum(in_transit) > 0 else 0.0
        snr = power.power[idx] / (np.nanstd(flux) + 1e-9)
        return {"period": float(best_period), "duration": float(best_duration), "t0": float(best_t0),
                "depth": float(abs(depth)), "snr": float(snr)}, power
    except Exception:
        return {"period": np.nan, "duration": np.nan, "t0": np.nan, "depth": np.nan, "snr": np.nan}, None

def transit_shape_metric(time, flux, period, t0, duration):
    if np.isnan(period) or np.isnan(duration):
        return np.nan
    phase = ((time - t0 + 0.5*period) % period) / period
    in_transit = (phase > 0.5 - duration/(2*period)) & (phase < 0.5 + duration/(2*period))
    if np.sum(in_transit) < 5:
        return np.nan
    depth = np.nanmedian(flux[in_transit]) - np.nanmedian(flux[~in_transit])
    edges = np.gradient(flux.astype(float))
    slope = np.nanmedian(np.abs(edges[in_transit]))
    return np.abs(slope) / (abs(depth) + 1e-9)

def odd_even_depth_diff(time, flux, period, t0, duration):
    if np.isnan(period):
        return np.nan
    phase = ((time - t0) % period) / period
    in_transit = (phase < duration/period) | (phase > 1 - duration/period)
    epochs = np.floor((time - t0) / period + 0.5).astype(int)
    odd = in_transit & (epochs % 2 != 0)
    even = in_transit & (epochs % 2 == 0)
    try:
        d_odd = np.nanmedian(flux[odd]) - np.nanmedian(flux[~odd]) if np.sum(odd) > 0 else 0.0
        d_even = np.nanmedian(flux[even]) - np.nanmedian(flux[~even]) if np.sum(even) > 0 else 0.0
        return abs(d_odd - d_even)
    except Exception:
        return np.nan

def secondary_eclipse_signal(time, flux, period, t0, duration):
    if np.isnan(period):
        return np.nan
    t0_sec = t0 + 0.5 * period
    phase = ((time - t0_sec) % period) / period
    in_sec = (phase < duration/period) | (phase > 1 - duration/period)
    if np.sum(in_sec) < 3:
        return 0.0
    return np.nanmedian(flux[in_sec]) - np.nanmedian(flux[~in_sec])

def periodicity_features(time, flux):
    try:
        fs = 1.0 / np.median(np.diff(time))
        f, pxx = periodogram(flux, fs=fs)
        pxx = pxx / (np.sum(pxx) + 1e-12)
        peak_power = float(np.nanmax(pxx))
        entropy = -float(np.nansum(pxx * np.log(pxx + 1e-12)))
        return {"psd_peak": peak_power, "psd_entropy": entropy}
    except Exception:
        return {"psd_peak": np.nan, "psd_entropy": np.nan}

def extract_features(time, flux):
    feat = {}
    bls, power = compute_bls(time, flux)
    feat.update(bls)
    feat["shape_metric"] = transit_shape_metric(time, flux, feat["period"], feat["t0"], feat["duration"])
    feat["odd_even_diff"] = odd_even_depth_diff(time, flux, feat["period"], feat["t0"], feat["duration"])
    feat["secondary_depth"] = secondary_eclipse_signal(time, flux, feat["period"], feat["t0"], feat["duration"])
    feat.update(periodicity_features(time, flux))
    return feat


5: Model Training

In [None]:
# train_xgb.py

# -----------------------------
# 1️ Load and preprocess dataset
# -----------------------------
df = load_and_preprocess("dataset.csv")

# -----------------------------
# 2️ Binary target: 1 if any planet, 0 if none
# -----------------------------
df["has_planet"] = df["nconfp"].apply(lambda x: 1 if x > 0 else 0)

# -----------------------------
# 3️ Features and target
# -----------------------------
feature_columns = df.drop(columns=["nconfp", "has_planet"]).columns.tolist()
X = df[feature_columns]
y = df["has_planet"]

# Save feature columns for API
joblib.dump(feature_columns, "feature_columns.pkl")
print("✅ feature_columns.pkl saved")

# -----------------------------
# 4️ Train/test split (stratified)
# -----------------------------
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# -----------------------------
# 5️ Train XGBoost classifier
# -----------------------------
xgb_model = XGBClassifier(
    n_estimators=200,
    max_depth=5,
    scale_pos_weight=(len(y_train)-y_train.sum())/y_train.sum(),  # handle imbalance
    random_state=42,
    use_label_encoder=False,
    eval_metric="logloss"
)

xgb_model.fit(X_train, y_train)

# -----------------------------
# 6️ Calibrate probabilities
# -----------------------------
calibrated_model = CalibratedClassifierCV(xgb_model, method='isotonic')
calibrated_model.fit(X_train, y_train)

# -----------------------------
# 7️ Evaluate
# -----------------------------
y_pred = calibrated_model.predict(X_test)
print(classification_report(y_test, y_pred))

# -----------------------------
# 8️ Save model
# -----------------------------
joblib.dump(calibrated_model, "exoplanet_model_xgb.pkl")
print("✅ Model saved as exoplanet_model_xgb.pkl")


To train the class imbalances, the following code was executed:

In [None]:
# train.py
# -----------------------------
# 1️ Load and preprocess dataset
# -----------------------------
df = load_and_preprocess("dataset.csv")

# -----------------------------
# 2️ Binary target: 1 if any planet, 0 if none
# -----------------------------
df["has_planet"] = df["nconfp"].apply(lambda x: 1 if x > 0 else 0)

# -----------------------------
# 3️ Features and target
# -----------------------------
feature_columns = df.drop(columns=["nconfp", "has_planet"]).columns.tolist()
X = df[feature_columns]
y = df["has_planet"]

# Save feature columns for FastAPI
joblib.dump(feature_columns, "feature_columns.pkl")
print("✅ feature_columns.pkl saved")

# -----------------------------
# 4️ Train/test split (stratified)
# -----------------------------
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# -----------------------------
# 5️ Apply SMOTE oversampling
# -----------------------------
sm = SMOTE(random_state=42)
X_train_res, y_train_res = sm.fit_resample(X_train, y_train)
print(f"✅ Training data balanced: {y_train_res.value_counts().to_dict()}")

# -----------------------------
# 6️ Train RandomForest with balanced class weight
# -----------------------------
model = RandomForestClassifier(
    n_estimators=100,
    random_state=42,
    class_weight="balanced"
)
model.fit(X_train_res, y_train_res)

# -----------------------------
# 7️ Evaluate
# -----------------------------
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))

# -----------------------------
# 8️ Save model
# -----------------------------
joblib.dump(model, "exoplanet_model.pkl")
print("✅ Model saved as exoplanet_model.pkl")

In [None]:
"""
features.py
Compute transit-search features using Box Least Squares (BLS) and other heuristics.
Designed to produce tabular features for lightweight ML (RandomForest).
"""

def compute_bls(time, flux, min_period=0.3, max_period=30.0, n_periods=2000, q_min=0.005, q_max=0.1):
    # time: days, flux: relative (around 0)
    try:
        model = BoxLeastSquares(time, flux)
        periods = np.linspace(min_period, max_period, n_periods)
        durations = np.linspace(q_min, q_max, 10) * periods[:, None]
        power = model.power(periods, durations)
        idx = np.nanargmax(power.power)
        best_period = power.period[idx]
        # select corresponding best duration & t0 (closest index)
        best_duration = power.duration[idx]
        best_t0 = power.transit_time[idx]
        # depth approximate
        phase_mask = ((time - best_t0 + 0.5*best_period) % best_period) / best_period
        in_transit = (phase_mask > 0.5 - best_duration/(2*best_period)) & (phase_mask < 0.5 + best_duration/(2*best_period))
        depth = np.nanmedian(flux[in_transit]) - np.nanmedian(flux[~in_transit]) if np.sum(in_transit) > 0 else 0.0
        snr = power.power[idx] / (np.nanstd(flux) + 1e-9)
        return {"period": float(best_period), "duration": float(best_duration), "t0": float(best_t0),
                "depth": float(abs(depth)), "snr": float(snr)}, power
    except Exception:
        return {"period": np.nan, "duration": np.nan, "t0": np.nan, "depth": np.nan, "snr": np.nan}, None

def transit_shape_metric(time, flux, period, t0, duration):
    if np.isnan(period) or np.isnan(duration):
        return np.nan
    phase = ((time - t0 + 0.5*period) % period) / period
    in_transit = (phase > 0.5 - duration/(2*period)) & (phase < 0.5 + duration/(2*period))
    if np.sum(in_transit) < 5:
        return np.nan
    depth = np.nanmedian(flux[in_transit]) - np.nanmedian(flux[~in_transit])
    edges = np.gradient(flux.astype(float))
    slope = np.nanmedian(np.abs(edges[in_transit]))
    return np.abs(slope) / (abs(depth) + 1e-9)

def odd_even_depth_diff(time, flux, period, t0, duration):
    if np.isnan(period):
        return np.nan
    phase = ((time - t0) % period) / period
    in_transit = (phase < duration/period) | (phase > 1 - duration/period)
    epochs = np.floor((time - t0) / period + 0.5).astype(int)
    odd = in_transit & (epochs % 2 != 0)
    even = in_transit & (epochs % 2 == 0)
    try:
        d_odd = np.nanmedian(flux[odd]) - np.nanmedian(flux[~odd]) if np.sum(odd) > 0 else 0.0
        d_even = np.nanmedian(flux[even]) - np.nanmedian(flux[~even]) if np.sum(even) > 0 else 0.0
        return abs(d_odd - d_even)
    except Exception:
        return np.nan

def secondary_eclipse_signal(time, flux, period, t0, duration):
    if np.isnan(period):
        return np.nan
    t0_sec = t0 + 0.5 * period
    phase = ((time - t0_sec) % period) / period
    in_sec = (phase < duration/period) | (phase > 1 - duration/period)
    if np.sum(in_sec) < 3:
        return 0.0
    return np.nanmedian(flux[in_sec]) - np.nanmedian(flux[~in_sec])

def periodicity_features(time, flux):
    try:
        fs = 1.0 / np.median(np.diff(time))
        f, pxx = periodogram(flux, fs=fs)
        pxx = pxx / (np.sum(pxx) + 1e-12)
        peak_power = float(np.nanmax(pxx))
        entropy = -float(np.nansum(pxx * np.log(pxx + 1e-12)))
        return {"psd_peak": peak_power, "psd_entropy": entropy}
    except Exception:
        return {"psd_peak": np.nan, "psd_entropy": np.nan}

def extract_features(time, flux):
    feat = {}
    bls, power = compute_bls(time, flux)
    feat.update(bls)
    feat["shape_metric"] = transit_shape_metric(time, flux, feat["period"], feat["t0"], feat["duration"])
    feat["odd_even_diff"] = odd_even_depth_diff(time, flux, feat["period"], feat["t0"], feat["duration"])
    feat["secondary_depth"] = secondary_eclipse_signal(time, flux, feat["period"], feat["t0"], feat["duration"])
    feat.update(periodicity_features(time, flux))
    return feat


7: Deployment

In [None]:
"""
vetting.py
Use features to create human-interpretable vetting flags:
- odd-even mismatch -> eclipsing binaries
- secondary eclipse -> eclipsing binaries or self-luminous companion
- v-shape metric -> grazing/stellar eclipse
- variability domination -> star spots/ pulsations
- short single events -> moving objects (meteor/asteroid)
"""

def vet_flags(feat):
    flags = {}
    depth = abs(feat.get("depth", 0.0) + 1e-9)
    # Eclipsing binary indicators
    flags["odd_even_flag"] = (feat.get("odd_even_diff", 0.0) > 0.1 * depth)
    flags["secondary_flag"] = (abs(feat.get("secondary_depth", 0.0)) > 0.3 * depth)
    flags["v_shape_flag"] = (feat.get("shape_metric", np.nan) > 0.6)
    # implausible duration fraction (very long duration relative to period)
    period = feat.get("period", np.nan)
    duration = feat.get("duration", np.nan)
    try:
        flags["duration_flag"] = (duration / period) > 0.15
    except Exception:
        flags["duration_flag"] = False
    # variability dominance
    flags["var_flag"] = (feat.get("psd_peak", 0.0) > 0.25) and (feat.get("psd_entropy", 999.0) < 6.0)
    # moving object heuristic (short, single, very asymmetric event) - placeholder: rely on data shape / cadence externally
    flags["moving_object_flag"] = False  # requires event-level analysis; default False here
    flags["any_flag"] = any(flags.get(k, False) for k in flags if k.endswith("_flag"))
    return flags


In [None]:
"""
run_pipeline.py
End-to-end runner. Use --demo to run synthetic data that demonstrates planet vs non-planet signals.
Use --lightcurves to point to CSV files (time,flux).
"""

def simulate_lightcurve(kind="planet", n_points=3000, period=5.0, duration_frac=0.03, depth=0.005, noise=0.001):
    rng = np.random.default_rng()
    time = np.cumsum(np.full(n_points, 0.02))  # uniform cadence ~0.02 day (~30 min)
    flux = rng.normal(1.0, noise, size=n_points)
    if kind == "planet":
        phase = (time % period) / period
        in_tr = (phase < duration_frac) | (phase > 1 - duration_frac)
        flux[in_tr] -= depth
    elif kind == "ebinary":
        phase = (time % period) / period
        in_tr = (phase < duration_frac) | (phase > 1 - duration_frac)
        # deeper, asymmetric
        flux[in_tr] -= depth * (1.5 + 0.5 * np.sin(2*np.pi*phase))
        # secondary
        phase2 = ((time + 0.5*period) % period) / period
        sec = (phase2 < duration_frac) | (phase2 > 1 - duration_frac)
        flux[sec] -= 0.6 * depth
    elif kind == "variable":
        flux += 0.01 * np.sin(2*np.pi*time/period) + 0.003 * np.sin(2*np.pi*time/(0.7*period))
    else:
        # meteor-like short spike
        i = rng.integers(low=100, high=n_points-100)
        flux[i:i+2] -= 0.1  # sharp dip
    return time, flux

def load_lightcurves_from_globs(patterns):
    files = []
    for p in patterns:
        files.extend(glob.glob(p))
    data = []
    for fp in files:
        try:
            df = pd.read_csv(fp)
            t, f = prepare_lightcurve_from_df(df)
            if len(t) > 0:
                data.append((fp, t, f))
        except Exception as e:
            print("Failed to load", fp, ":", e)
    return data

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--demo", action="store_true", help="Run synthetic demo")
    ap.add_argument("--lightcurves", nargs="*", help="Glob patterns to CSV files (time,flux[,flux_err])")
    ap.add_argument("--outdir", default="artifacts", help="Save outputs here")
    args = ap.parse_args()
    os.makedirs(args.outdir, exist_ok=True)

    X = []
    y = []
    meta = []

    if args.demo:
        kinds = ["planet","planet","ebinary","variable","planet","ebinary","variable","planet","meteor"]
        for k in kinds:
            t, f = simulate_lightcurve(kind=k)
            feat = extract_features(t, f-1.0)  # features expect relative flux near 0
            row = [feat.get(c, np.nan) for c in FEATURE_COLUMNS]
            X.append(row)
            y.append(1 if k == "planet" else 0)
            meta.append({"kind": k, "feat": feat, "vet": vet_flags(feat)})
    elif args.lightcurves:
        data = load_lightcurves_from_globs(args.lightcurves)
        for fp, t, f in data:
            feat = extract_features(t, f)
            row = [feat.get(c, np.nan) for c in FEATURE_COLUMNS]
            X.append(row)
            # placeholder label: unknown; team should label or use provided training labels
            y.append(0)
            meta.append({"file": fp, "feat": feat, "vet": vet_flags(feat)})
    else:
        ap.print_help()
        return

    X = np.array(X, dtype=float)
    y = np.array(y, dtype=int)

    # Baseline model
    model = build_baseline_model()
    # Safe: if all labels equal, skip fit
    if len(np.unique(y)) > 1:
        model.fit(X, y)
        proba = model.predict_proba(X)[:,1]
    else:
        # trivial case for demo: use depth threshold
        proba = np.array([0.8 if (row[2] > 0.001) else 0.2 for row in X])  # depth index in FEATURE_COLUMNS
    metrics = evaluate_and_save(y, proba, outdir=args.outdir)

    # Save per-object predictions and flags
    rows = []
    for i, m in enumerate(meta):
        rows.append({
            "index": i,
            "pred_proba": float(proba[i]),
            "label": int(y[i]),
            "meta": m
        })
    with open(os.path.join(args.outdir, "predictions.json"), "w") as f:
        json.dump(rows, f, indent=2)

    # Save feature table
    feat_df = pd.DataFrame(X, columns=FEATURE_COLUMNS)
    feat_df["label"] = y
    feat_df["pred_proba"] = proba
    feat_df.to_csv(os.path.join(args.outdir, "features_predictions.csv"), index=False)

    print("Done. Outputs saved to:", args.outdir)
    print("Metrics:", json.dumps(metrics, indent=2))

if __name__ == "__main__":
    main()


In [None]:


app = FastAPI()

# Load trained model and feature columns
model = joblib.load("exoplanet_model.pkl")
feature_columns = joblib.load("feature_columns.pkl")

@app.post("/predict/")
async def predict(file: UploadFile = File(...), threshold: float = 0.5):
    try:
        # Load CSV
        df = pd.read_csv(file.file)

        # Keep only the features used in training
        X = df[feature_columns]

        # Predict probabilities
        probs = model.predict_proba(X)[:, 1]

        # Apply threshold
        preds = (probs >= threshold).astype(int)

        # Return predictions with probability
        results = []
        for p, prob in zip(preds, probs):
            results.append({
                "prediction": int(p),
                "probability": float(prob),
                "label": "Planet detected" if p == 1 else "No planet"
            })

        return {"results": results}

    except Exception as e:
        return {"error": str(e)}



In [None]:
For deployment of our application:

Run: 

uvicorn main:app --reload

In the bash terminal, and then click on the link:

http://127.0.0.1:8000/docs

To access the deployed application.

8: Conclusion

Project Completed Successfully!

This solution addresses NASA Space Apps Hackathon challenges with global impact.