In [19]:
import os
import pandas as pd
import numpy as np
import joblib
from glob import glob
from astropy.cosmology import Planck18 as cosmo

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split, StratifiedKFold
from lightgbm import LGBMClassifier, early_stopping, log_evaluation
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
from sklearn.base import BaseEstimator, TransformerMixin

# ----------------------
# SETTINGS & DATA LOAD
# ----------------------
ROOT_DIR = r'C:\Users\jgmad\Research\Ibn'
DATA_DIR = os.path.join(ROOT_DIR, "data")
summary_file, = glob(os.path.join(DATA_DIR, "ZTFBTS_summary.csv"))
summary_data = pd.read_csv(summary_file)
summary_data.replace('-', np.nan, inplace=True)

DAYS_AFTER = 0
param_file, = glob(os.path.join(DATA_DIR, f"SN_interpretable_params{'_'+str(DAYS_AFTER) if DAYS_AFTER else ''}.csv"))
df = pd.read_csv(param_file)

# ----------------------
# INITIAL FILTERING
# ----------------------
try:
    df = df.rename(columns={'oid':'supernova_name'})
    df = df.drop(['oid_r','oid_g'], axis=1)
except KeyError:
    pass

df['first_det_r'] = df['first_det_r'].astype(float)
df['first_det_g'] = df['first_det_g'].astype(float)

lookup = dict(zip(summary_data['ZTFID'], summary_data['type'] == 'SN Ibn'))
redshifts = dict(zip(summary_data['ZTFID'], summary_data['redshift']))

df['Ibn'] = df['supernova_name'].map(lookup)
df = df[df['Ibn'].notna()]

df['redshift'] = df['supernova_name'].map(redshifts).astype(float)

# Drop invalid redshift rows
df = df[df['redshift'] > 0]

# Features to use
slope_features = ['rise_slope_r','rise_slope_g','decline_slope_r','decline_slope_g']
imputed_features = slope_features + ['duration_g','duration_r','peak_epoch_g','peak_epoch_r','rise_time_g','rise_time_r']
other_features = ['peak_mag_r','peak_mag_g','redshift','first_det_r','first_det_g','ndetection_g','ndetection_r']
all_features = imputed_features + other_features

# Drop rows missing critical cut features
df.replace(-9999, np.nan, inplace=True)
cut_feats = ['peak_mag_r','peak_mag_g','redshift']
df = df.dropna(subset=cut_feats)

# ----------------------
# TRAIN/TEST SPLIT
# ----------------------
full = df.reset_index(drop=True)
unique_SN = full['supernova_name'].unique()
SN_to_type = {sn: full.loc[full['supernova_name']==sn,'Ibn'].iat[0] for sn in unique_SN}
types = [SN_to_type[sn] for sn in unique_SN]
train_SN, test_SN = train_test_split(unique_SN, stratify=types, test_size=0.2, random_state=12282005)
mask = full['supernova_name'].isin(train_SN)

X_train_raw = full.loc[mask, all_features]
y_train = full.loc[mask, 'Ibn'].astype(int)
X_test_raw = full.loc[~mask, all_features]
y_test = full.loc[~mask, 'Ibn'].astype(int)

# ----------------------
# PIPELINE
# ----------------------
class CosmologyTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, cosmo):
        self.cosmo = cosmo
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        X = X.copy()
        dL = self.cosmo.luminosity_distance(X['redshift']).to('pc').value
        mu = 5 * np.log10(dL) - 5
        X['peak_mag_r'] -= mu
        X['peak_mag_g'] -= mu
        X['color'] = X['peak_mag_g'] - X['peak_mag_r']
        return X

preprocessor = ColumnTransformer([
    ('num_imp', SimpleImputer(strategy='median', add_indicator=True), imputed_features),
    ('pass', 'passthrough', other_features + ['color'])
])

best_params = {
    'n_estimators': 1000,
    'learning_rate': 0.07,
    'num_leaves': 26,
    'max_depth': 4,
    'min_child_samples': 84,
    'reg_alpha': 0.266,
    'reg_lambda': 0.093,
    'subsample': 0.667,
    'colsample_bytree': 0.665,
    'random_state': 12282005,
    'verbosity': -1
}

pipeline = ImbPipeline([
    ('cosmo', CosmologyTransformer(cosmo=cosmo)),
    ('prep', preprocessor),
    ('smote', SMOTE(random_state=12282005, sampling_strategy=0.93)),
    ('clf', LGBMClassifier(**best_params))
])

# ----------------------
# TRAINING
# ----------------------
# Fit pipeline on training data
# Prepare full labeled data
X_all_raw = full[all_features]
y_all = full['Ibn'].astype(int)
pipeline.fit(X_train_raw, y_train)

best_threshold = 0.0628

'''# Save pipeline & threshold
os.makedirs('models', exist_ok=True)
joblib.dump(pipeline, 'models/clf_pipeline.joblib')

joblib.dump(best_threshold, 'models/best_threshold.pkl')'''

# ----------------------
# EVALUATION
# ----------------------
probs = pipeline.predict_proba(X_test_raw)[:, 1]
pred = (probs >= best_threshold).astype(int)
print("Accuracy:", accuracy_score(y_test, pred))
print("Confusion Matrix:\n", confusion_matrix(y_test, pred))
print("Classification Report:\n", classification_report(y_test, pred, digits=4))


Accuracy: 0.9922480620155039
Confusion Matrix:
 [[1657   12]
 [   1    7]]
Classification Report:
               precision    recall  f1-score   support

           0     0.9994    0.9928    0.9961      1669
           1     0.3684    0.8750    0.5185         8

    accuracy                         0.9922      1677
   macro avg     0.6839    0.9339    0.7573      1677
weighted avg     0.9964    0.9922    0.9938      1677





In [17]:
print(X_train_raw.columns)

Index(['rise_slope_r', 'rise_slope_g', 'decline_slope_r', 'decline_slope_g',
       'duration_g', 'duration_r', 'peak_epoch_g', 'peak_epoch_r',
       'rise_time_g', 'rise_time_r', 'peak_mag_r', 'peak_mag_g', 'redshift',
       'first_det_r', 'first_det_g', 'ndetection_g', 'ndetection_r'],
      dtype='object')


In [None]:
import os
import pandas as pd
import numpy as np
import joblib
from glob import glob
from astropy.cosmology import Planck18 as cosmo

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
from lightgbm import LGBMClassifier
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer

# ----------------------
# SETTINGS & DATA LOAD
# ----------------------
ROOT_DIR = r'C:\Users\jgmad\Research\Ibn'
DATA_DIR = os.path.join(ROOT_DIR, "data")

# Summary file contains labels and redshifts
summary_file, = glob(os.path.join(DATA_DIR, "ZTFBTS_summary.csv"))
summary_data = pd.read_csv(summary_file)
summary_data.replace('-', np.nan, inplace=True)

# Parameter file (unlabeled + labeled combined)
DAYS_AFTER = 0
param_file, = glob(os.path.join(DATA_DIR, f"SN_interpretable_params{'_'+str(DAYS_AFTER) if DAYS_AFTER else ''}.csv"))
df_full = pd.read_csv(param_file)

# Rename identifier column to 'supernova_name'
if 'oid' in df_full.columns:
    df_full = df_full.rename(columns={'oid':'supernova_name'})
elif 'ZTFID' in df_full.columns:
    df_full = df_full.rename(columns={'ZTFID':'supernova_name'})

# ----------------------
# SEPARATE LABELED & UNLABELED
# ----------------------
# Map known SN Ibn labels from summary_data
label_lookup = dict(zip(summary_data['ZTFID'], summary_data['type'] == 'SN Ibn'))
df_full['Ibn'] = df_full['supernova_name'].map(label_lookup)
type_dict = dict(zip(summary_data['ZTFID'], summary_data['type']))

df_labeled   = df_full[~df_full['supernova_name'].map(type_dict).isnull()] # For now only labeled dataset
df_unlabeled = df_full[df_full['supernova_name'].map(type_dict).isnull()]
print(len(df_labeled),len(df_unlabeled))
# ----------------------
# INITIAL FILTERING FUNCTION
# ----------------------
def filter_dataframe(df):
    # Rename & drop oid columns if still present
    try:
        df = df.rename(columns={'oid':'supernova_name'})
        df = df.drop(['oid_r','oid_g'], axis=1)
    except KeyError:
        pass

    # Ensure numeric types
    df['first_det_r'] = df['first_det_r'].astype(float)
    df['first_det_g'] = df['first_det_g'].astype(float)

    # Map redshift
    #red_lookup = dict(zip(summary_data['ZTFID'], summary_data['redshift']))
    #df['redshift'] = df['supernova_name'].map(red_lookup).astype(float)

    # Drop invalid or missing redshift
    #df = df[df['redshift'] > 0]

    # Replace magic missing values and drop on cut features
    df.replace(-9999, np.nan, inplace=True)
    imputer = SimpleImputer(strategy='median')
    df['peak_mag_r'] = imputer.fit_transform(df[['peak_mag_r']]).flatten()
    df['peak_mag_g'] = imputer.fit_transform(df[['peak_mag_g']]).flatten()
    #df = df.dropna(subset=['peak_mag_r','peak_mag_g'])

    return df

# Apply filtering to both labeled and unlabeled
[df_labeled, df_unlabeled] = [filter_dataframe(sub) for sub in (df_labeled, df_unlabeled)]
df_unlabeled

# ----------------------
# FEATURE LIST
# ----------------------
slope_features   = ['rise_slope_r','rise_slope_g','decline_slope_r','decline_slope_g']
imputed_features = slope_features + ['duration_g','duration_r','peak_epoch_g','peak_epoch_r','rise_time_g','rise_time_r']+['peak_mag_r', 'peak_mag_g','first_det_g', 'first_det_r']
other_features   = ['ndetection_g','ndetection_r']
all_features     = imputed_features + other_features

# ----------------------
# TRAIN/TEST SPLIT (LABELED)
# ----------------------
unique_SN = df_labeled['supernova_name'].unique()
SN_to_type = {sn: int(df_labeled.loc[df_labeled['supernova_name']==sn,'Ibn'].iloc[0]) for sn in unique_SN}
train_SN, test_SN = train_test_split(unique_SN,
                                     stratify=[SN_to_type[sn] for sn in unique_SN],
                                     test_size=0.4,
                                     random_state=12282005)
mask_train = df_labeled['supernova_name'].isin(train_SN)
X_train_raw = df_labeled.loc[mask_train, all_features]
y_train     = df_labeled.loc[mask_train, 'Ibn'].astype(int)
X_test_raw  = df_labeled.loc[~mask_train, all_features]
y_test      = df_labeled.loc[~mask_train, 'Ibn'].astype(int)

# ----------------------
# PIPELINE DEFINITION
# ----------------------
class CosmologyTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, cosmo_model):
        self.cosmo = cosmo_model
    def fit(self, X, y=None): return self
    def transform(self, X):
        X = X.copy()
        if 'redshift' not in X.columns:
            X['color'] = X['peak_mag_g'] - X['peak_mag_r']
            return X
        dL = self.cosmo.luminosity_distance(X['redshift']).to('pc').value
        mu = 5 * np.log10(dL) - 5
        X['peak_mag_r'] -= mu
        X['peak_mag_g'] -= mu
        X['color'] = X['peak_mag_g'] - X['peak_mag_r']
        return X

preprocessor = ColumnTransformer([
    ('imp', SimpleImputer(strategy='median', add_indicator=True), imputed_features),
    ('pass', 'passthrough', other_features + ['color'])
])

best_params = {
    'n_estimators': 1000,
    'learning_rate': 0.07,
    'num_leaves': 26,
    'max_depth': 4,
    'min_child_samples': 84,
    'reg_alpha': 0.266,
    'reg_lambda': 0.093,
    'subsample': 0.667,
    'colsample_bytree': 0.665,
    'random_state': 12282005,
    'verbosity': -1
}

pipeline = ImbPipeline([
    ('cosmo', CosmologyTransformer(cosmo)),
    ('prep', preprocessor),
    ('smote', SMOTE(random_state=12282005, sampling_strategy=0.93)),
    ('clf', LGBMClassifier(**best_params))
])

# ----------------------
# TRAINING & EVALUATION
# ----------------------
pipeline.fit(X_train_raw, y_train)
probs_test = pipeline.predict_proba(X_test_raw)[:,1]
threshold  = 0.0628
pred_test  = (probs_test >= threshold).astype(int)
print("=== Labeled Test Set Performance ===")
print("Accuracy:", accuracy_score(y_test, pred_test))
print("Confusion Matrix:\n", confusion_matrix(y_test, pred_test))
print(classification_report(y_test, pred_test, digits=4))

# Save model and threshold
os.makedirs('models', exist_ok=True)
joblib.dump(pipeline, 'models/clf_pipeline.joblib')
joblib.dump(threshold, 'models/best_threshold.pkl')

# ----------------------
# INFERENCE ON UNLABELED
# ----------------------
X_unlab = df_unlabeled[all_features]
print(X_unlab)
probs_unlab = pipeline.predict_proba(X_unlab)[:,1]
labels_unlab = (probs_unlab >= threshold).astype(int)

df_unlabeled['Ibn_prob'] = probs_unlab
df_unlabeled['Ibn_pred'] = labels_unlab
df_unlabeled.to_csv(os.path.join(DATA_DIR, 'SN_unlabeled_with_predictions.csv'), index=False)
print("Unlabeled predictions saved to SN_unlabeled_with_predictions.csv")


8732 3450




=== Labeled Test Set Performance ===
Accuracy: 0.9919839679358717
Confusion Matrix:
 [[3449   27]
 [   1   16]]
              precision    recall  f1-score   support

           0     0.9997    0.9922    0.9960      3476
           1     0.3721    0.9412    0.5333        17

    accuracy                         0.9920      3493
   macro avg     0.6859    0.9667    0.7646      3493
weighted avg     0.9967    0.9920    0.9937      3493

       rise_slope_r  rise_slope_g  ...  ndetection_g  ndetection_r
6         -0.004496     -0.000640  ...             8             8
12              NaN           NaN  ...            11            11
13         0.015177      0.040738  ...             8             8
14         0.106782      0.077486  ...             5             8
17         0.039143      0.050825  ...            11            18
...             ...           ...  ...           ...           ...
12169      0.023857      0.000387  ...             7             8
12175      0.051351      

In [None]:
print(df_unlabeled[df_unlabeled['Ibn_prob'] > 0.99965])

      supernova_name  decline_slope_g  decline_slope_r  duration_g  \
5268    ZTF21aakylsb         0.121511         0.128678    9.044167   
5894    ZTF21aaxbidh         0.124035         0.098193   13.004074   
12029   ZTF22aadgetr         0.088476         0.107720   12.987847   

       duration_r filt_g filt_r   first_det_g   first_det_r  last_nondet_g  \
5268     6.926620      g      r  59260.173611  59260.239699            NaN   
5894    12.995636      g      r  59325.443137  59325.481667            NaN   
12029   12.948113      g      r  59670.496690  59670.476748            NaN   

       ...  rise_slope_r  rise_time_flag_g  rise_time_flag_r  rise_time_g  \
5268   ...           NaN               NaN               NaN          NaN   
5894   ...           NaN               NaN               NaN          NaN   
12029  ...           NaN               NaN               NaN          NaN   

       rise_time_r  s0_g  s0_r    Ibn  Ibn_prob  Ibn_pred  
5268           NaN   NaN   NaN  False

In [26]:
# SEND ALEX UNCLASSIFIED 
df_unlabeled = df_unlabeled.reset_index(drop=True)
df_unlabeled[['supernova_name']].to_csv('unlabeled_in_parameters.csv',index=False)

In [1]:
import pandas as pd

df = pd.read_csv('unlabeled_in_parameters.csv')

df

Unnamed: 0,supernova_name
0,ZTF25aafocmy
1,ZTF21abkqvdo
2,ZTF19aafmjfw
3,ZTF19acbkanq
4,ZTF20acwyicy
...,...
3445,ZTF23aamzrto
3446,ZTF22abvtvmc
3447,ZTF20aatzcxk
3448,ZTF22abnawii


In [9]:
z = pd.read_csv('data/ZTFBTS_summary.csv')

names = df['supernova_name'].tolist()

N = z[z['ZTFID'].isin(names)][['ZTFID','RA','Dec']]

N.to_csv('unlabeled_in_parameters.csv',index=False)