# Generalise and modularise the code

Now that I can make features, run the initialising loop and the subesquent loops, and that I'm played with ML Flow, it's time to make more general and modular code that can go in `finkvra` and be called in smaller script. 

Also should be able to use config files to set up experiments more easily.

In [1]:
from glob import glob
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from mlflow.models.signature import infer_signature
from datetime import datetime
import os
from finkvra.utils.features import make_features as fvra_make_features
from finkvra.utils.labels import cli_label_one_object as fvra_cli_label_one_object
import json
from mlflow.tracking import MlflowClient
import logging
from mlflow.models.signature import infer_signature

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.WARNING, 
                    format="%(asctime)s [%(levelname)s] %(name)s.%(funcName)s: %(message)s")

# Step-by-Step

In [2]:
### Should go somewhere in finkvra? or as class attributes?

label2galclass = {'real': np.nan,
                  'extragal': 0,
                  'gal': 1,
                  'agn': 0,
                  'bogus': np.nan,
                  'varstar': 1,
                  None: np.nan,
                  np.nan: np.nan,
                 }

label2realclass = {'real': 1,
                  'extragal': 1,
                  'gal': 1,
                  'agn': 1,
                  'bogus': 0,
                  'varstar': 1,
                  None: np.nan,
                  np.nan: np.nan,
                 }

label2transclass = {'real': np.nan,
                  'extragal': 1,
                  'gal': 1,
                  'agn': 0,
                  'bogus': np.nan,
                  'varstar': 0,
                  None: np.nan,
                  np.nan: np.nan,
                 }

label2class = {'gal': label2galclass,
               'real': label2realclass,
               'trans': label2transclass
              }

In [3]:
## CONFIG - TO REPLACE BY READING A JSON FILE - these will be dafault but should have CL args for e.g parquet_paths
EXPERIMENT = "SEP25gal_model_AL"#"test_experiment" 
SAMPLING_STRATEGY = "uncertainty"
HOST = "127.0.0.1"
PORT = "6969"
PARQUET_GLOB_PATH = '/home/stevance/Data/FinkZTFStream/*.parquet' #None # if not given prompt user 
#PARQUET_GLOB_PATH = '/home/stevance/Data/FinkZTFStream/20250602*.parquet'

# TODO: CHECK MODEL TYPE IS gal, real or trans
MODEL_TYPE = 'gal'
LABELS_PATH = '/home/stevance/Data/FinkZTFStream/labeled.csv'
OUTPUT_ROOT = '/home/stevance/Science/fink-vra-notebooks/test_outputs/'
N_batch_0 = 30 # first batch size
N_batch_i = 5 # subsequent batches

PARAMS={
    "l2_regularization": 10,
    "learning_rate": 0.1,
    "random_state": 42
}

# Check if the {OUTPUT_ROOT}{EXPERIMENT} directory exists, if not create it
if not os.path.exists(f"{OUTPUT_ROOT}{EXPERIMENT}"):
    # log a warning and then create the directory
    logger.warning(f"Output directory {OUTPUT_ROOT}{EXPERIMENT} does not exist. Creating it.")
    os.makedirs(f"{OUTPUT_ROOT}{EXPERIMENT}")

# +++++++++++++++++++++
# Constants from Config
# +++++++++++++++++++++

model_subpath = f"{MODEL_TYPE}_model"
training_ids_path = f"{OUTPUT_ROOT}{EXPERIMENT}/{SAMPLING_STRATEGY}_{MODEL_TYPE}_training_ids.csv"
training_ids_artifact_path = f"{SAMPLING_STRATEGY}_{MODEL_TYPE}_training_ids.csv"
labels = pd.read_csv(LABELS_PATH, index_col=0)




In [4]:
# +++++++++++++++++++++
# ML Flow set up
# +++++++++++++++++++++

mlflow.set_tracking_uri(f"http://{HOST}:{PORT}")
mlflow.set_experiment(EXPERIMENT)

client = MlflowClient()
experiment = client.get_experiment_by_name(EXPERIMENT)

# Get the run idea of the last SUCCESSFUL run
experiment_id = experiment.experiment_id

runs = client.search_runs(
    experiment_ids=[experiment_id],
    filter_string="attributes.status = 'FINISHED'",
    order_by=["start_time DESC"],
    max_results=1,
)

mlflow_uri = mlflow.get_tracking_uri()




In [9]:
training_ids_artifact_path = "training_ids.csv"

In [10]:
# +++++++++++++++++++++++++++++++++++++
# Load artifacts and set ROUND number
# ++++++++++++++++++++++++++++++++++++

if not runs:
    CURRENT_ROUND = 0
    BATCH_SIZE = N_batch_0
    logger.info(f"This is ROUND 0 of EXPERIMENT {EXPERIMENT}. Batch size is {BATCH_SIZE}")
else:
    last_run = runs[0]
    prev_run_id = last_run.info.run_id
    BATCH_SIZE = N_batch_i
    logger.info(f"Found Successful run - ID: {prev_run_id}. Batch size is {BATCH_SIZE}")
    
    
    logger.info(f"Loading artifacts from previous run: Previous training IDs")
    previous_ids_path = client.download_artifacts(prev_run_id, training_ids_artifact_path)
    previous_ids_df = pd.read_csv(previous_ids_path)
    CURRENT_ROUND= previous_ids_df.iloc[-1]['round'] + 1
    logger.info(f"Previous trainings IDs found - CURRENT ROUND: {CURRENT_ROUND}")
                
    logger.info(f"Loading artifacts from previous run: Model")
    model_uri = f"runs:/{prev_run_id}/{model_subpath}"
    clf = mlflow.sklearn.load_model(model_uri)
    
    

2025-09-30 15:35:44,647 [INFO] __main__.<module>: Found Successful run - ID: 4e2766df1a5f466f9aab9d5eb9d62c64. Batch size is 5
2025-09-30 15:35:44,648 [INFO] __main__.<module>: Loading artifacts from previous run: Previous training IDs


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

2025-09-30 15:35:44,664 [INFO] __main__.<module>: Previous trainings IDs found - CURRENT ROUND: 4
2025-09-30 15:35:44,665 [INFO] __main__.<module>: Loading artifacts from previous run: Model


Downloading artifacts:   0%|          | 0/7 [00:00<?, ?it/s]

In [14]:
# ++++++++++++++++++++++++++++++++++++
# Load Parquet and Make Features
# +++++++++++++++++++++++++++++++++++


files = sorted(glob(PARQUET_GLOB_PATH))
dfs = [pd.read_parquet(f) for f in files]
data = pd.concat(dfs, ignore_index=True)
logger.info(f"{len(files)} .parquet files loaded from {PARQUET_GLOB_PATH}")


X, meta = fvra_make_features(data)
# remove samples that have no positive detections
valid_candid = list(X[X.ndets > 0].index.values)
X = X.loc[valid_candid]
meta = meta.loc[valid_candid]
logger.info(f"Features made for {X.shape[0]} samples (unique candid) - {meta.objectId.unique().shape[0]} unique objects")

2025-09-30 15:44:31,394 [INFO] __main__.<module>: 408 .parquet files loaded from /home/stevance/Data/FinkZTFStream/*.parquet
2025-09-30 15:47:18,628 [INFO] __main__.<module>: Features made for 6217 samples (unique candid) - 4414 unique objects


In [12]:
    
# REMOVE AFTER TESTING
#X = X.drop('ebv', axis=1)

### Here only happens if not round 0 

In [15]:
if CURRENT_ROUND > 0:
    # TODO - turn this into a function?
    train_ids = previous_ids_df["candid"].tolist()
    X_pool = X.drop(index=train_ids, errors='ignore')
    
    meta_pool = meta.drop(index=train_ids, errors='ignore')
    logger.info(f"Training pool contains {X_pool.shape[0]} samples ({meta_pool.objectId.unique().shape[0]} unique)")
    
    logger.info(f"Predicting class for training pool using previous model")
    y_pred = clf.predict_proba(X_pool)[:, 1]  
    y_pred_pool = pd.DataFrame( np.vstack((X_pool.index.values.astype(str), y_pred)).T, columns= ['candid', 'pred']).set_index('candid')

2025-09-30 15:47:55,058 [INFO] __main__.<module>: Training pool contains 6147 samples (4377 unique)
2025-09-30 15:47:55,059 [INFO] __main__.<module>: Predicting class for training pool using previous model


In [16]:
# ----------------------------------------------------
# 6. Active Learning sampling with dynamic labeling
# ----------------------------------------------------

### here need a function that gives back sampling score: low is "good" (we want)
y_pred_pool['sampling_score'] = np.abs(y_pred_pool["pred"].astype(float) - 0.5) 
y_pred_pool.sort_values('sampling_score', ascending=True, inplace=True)



In [17]:
candid_loop = y_pred_pool.index.astype(np.int64)

In [18]:
if CURRENT_ROUND == 0:
    # could scramble them if you wanted more randomness
    candid_loop = X.index.astype(np.int64)

### This happens too if ROUND 0 

In [19]:
# ----------------------------------------------------
# Labeling
# ----------------------------------------------------

new_labels = []
new_label_candid = []
new_sample_candid = []

N_i = 0

for _candid in candid_loop:
    try: 
        try:
            classification = label2galclass[labels.loc[_candid].label]
        except TypeError:
            logging.error(f"Shit! You got duplicate labels in {LABELS_PATH}")
            logging.error(_candid)
            exit()
        if not np.isnan(classification):
            new_sample_candid.append(_candid)
            N_i += 1
    except KeyError: 
        # if _candid not in labels then labels.loc[_candid] will throw a KeyError 
        # and we can activate the logic below
        _objectId = meta.loc[_candid].objectId
        
        # this is where we need the labeling 
        label = fvra_cli_label_one_object(_objectId)
        
        if label is None: 
            continue
        new_labels.append(label)
        new_label_candid.append(_candid)
        classification = label2galclass[label]
        if not np.isnan(classification):
            new_sample_candid.append(_candid)
            N_i += 1
        
    if N_i == BATCH_SIZE:
        logging.info(f'Batch size ({BATCH_SIZE}) reached.')
        break

if N_i < BATCH_SIZE:
    logging.warning(f'Batch size ({BATCH_SIZE}) not reached: {N_i}')


 Object: ZTF19abiwysm
Opening https://lasair-ztf.lsst.ac.uk/objects/ZTF19abiwysm...
Label [r/x/g/a/b/v] (s=skip, q=quit): x
Labeled as 'extragal'

 Object: ZTF20abaowcx
Opening https://lasair-ztf.lsst.ac.uk/objects/ZTF20abaowcx...
Label [r/x/g/a/b/v] (s=skip, q=quit): a
Labeled as 'agn'

 Object: ZTF20abaowcx
Opening https://lasair-ztf.lsst.ac.uk/objects/ZTF20abaowcx...
Label [r/x/g/a/b/v] (s=skip, q=quit): a
Labeled as 'agn'

 Object: ZTF25abboazq
Opening https://lasair-ztf.lsst.ac.uk/objects/ZTF25abboazq...
Label [r/x/g/a/b/v] (s=skip, q=quit): x
Labeled as 'extragal'

 Object: ZTF25absbrai
Opening https://lasair-ztf.lsst.ac.uk/objects/ZTF25absbrai...
Label [r/x/g/a/b/v] (s=skip, q=quit): x


2025-09-30 15:50:07,242 [INFO] root.<module>: Batch size (5) reached.


Labeled as 'extragal'


In [20]:
# ----------------------------------------------------
# Update label
# ----------------------------------------------------


timestamp = datetime.utcnow().isoformat()
new_labels_df = pd.DataFrame.from_dict({
                                         'objectId': meta.loc[np.array(new_label_candid).astype(np.int64)].objectId,
                                          'label':  new_labels,
                                        'timestamp': timestamp
                                       })
updated_labels = pd.concat((labels, new_labels_df))

logging.info(f"Updated the labels at: {LABELS_PATH}")
updated_labels

2025-09-30 15:50:17,255 [INFO] root.<module>: Updated the labels at: /home/stevance/Data/FinkZTFStream/labeled.csv


Unnamed: 0_level_0,objectId,label,timestamp
candid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
3069386410515015007,ZTF19aarxyde,varstar,2025-06-02T08:37:22.993787
3069386415315015051,ZTF25aastlyj,real,2025-06-02T08:38:09.167712
3069382582215015014,ZTF25aastlfp,varstar,2025-06-02T08:38:35.964541
3069388402015015013,ZTF25aastlnm,varstar,2025-06-02T08:38:48.162550
3069382583515015017,ZTF25aastmlg,varstar,2025-06-02T09:44:44.227317
...,...,...,...
3181330900815015002,ZTF19abiwysm,extragal,2025-09-30T14:50:17.254649
3116222901915015000,ZTF20abaowcx,agn,2025-09-30T14:50:17.254649
3133242891915015004,ZTF20abaowcx,agn,2025-09-30T14:50:17.254649
3127277780015015036,ZTF25abboazq,extragal,2025-09-30T14:50:17.254649


In [21]:
# ----------------------------------------------------
# Candids for training
# ----------------------------------------------------

In [22]:
new_ids_df = pd.DataFrame({'candid': np.array(new_sample_candid).astype(np.int64),
                           'round': CURRENT_ROUND,
                          })


In [23]:
if CURRENT_ROUND > 0:
    train_ids_df = pd.concat([previous_ids_df, new_ids_df]).reset_index(drop=True)
else:
    train_ids_df = new_ids_df
    
train_ids_df.to_csv(training_ids_path, index=False)
logging.info(f'Saved training ids locally to {training_ids_path}')

2025-09-30 15:50:19,553 [INFO] root.<module>: Saved training ids locally to /home/stevance/Science/fink-vra-notebooks/test_outputs/SEP25gal_model_AL/uncertainty_gal_training_ids.csv


In [24]:
# -------------------
# 7. Make the y_train X_train for new round
# -------------------


y_train = updated_labels.loc[train_ids_df.candid].label.map(label2class[MODEL_TYPE])
X_train = X.loc[train_ids_df.candid]

y_train.to_csv(f"{OUTPUT_ROOT}{EXPERIMENT}/y_train_R{CURRENT_ROUND}.csv")
X_train.to_csv(f"{OUTPUT_ROOT}{EXPERIMENT}/X_train_R{CURRENT_ROUND}.csv")

logger.info(f"Made y_train and X_train and saved to {OUTPUT_ROOT}{EXPERIMENT}")

# Make the signature for ML Flow
signature = infer_signature(X_train, y_train)

2025-09-30 15:50:20,192 [INFO] __main__.<module>: Made y_train and X_train and saved to /home/stevance/Science/fink-vra-notebooks/test_outputs/SEP25gal_model_AL


In [25]:
X_train

Unnamed: 0_level_0,ra,dec,drb,ndets,nnondets,dets_median,dets_std,sep_arcsec,amplitude,linear_fit_reduced_chi2,...,linear_fit_slope_sigma,median,median_absolute_deviation,amplituder_,linear_fit_reduced_chi2r_,linear_fit_sloper_,linear_fit_slope_sigmar_,medianr_,median_absolute_deviationr_,ebv
candid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
3069386410515015007,286.972916,60.047391,0.999990,11.0,6.0,20.475401,0.075382,0.42265986534096434,0.070496,0.069703,...,0.004152,19.842197,0.018932,0.070496,0.069703,0.003666,0.004152,19.842197,0.018932,0.075731
3069382582215015014,293.757494,24.732624,0.999734,1.0,17.0,19.765827,0.000000,1.2398935354229732,0.000000,,...,,19.765827,0.000000,0.000000,,,,19.765827,0.000000,1.916294
3069388402015015013,334.137737,46.956847,0.995750,1.0,16.0,19.520535,0.000000,9.0563054064397,0.000000,,...,,19.520535,0.000000,0.000000,,,,19.520535,0.000000,0.159215
3069382583515015017,296.641470,26.630707,0.951255,1.0,16.0,19.625126,0.000000,3.1566587143809444,0.000000,,...,,19.625126,0.000000,0.000000,,,,19.625126,0.000000,2.957513
3069386414215015010,283.759268,62.913423,0.999961,1.0,16.0,20.100019,0.000000,5.758135128184068,0.000000,,...,,19.690671,0.000000,0.000000,,,,19.690671,0.000000,0.079199
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3181330900815015002,354.538879,-4.852683,0.999998,59.0,17.0,19.170601,0.361792,0.5640928936115532,0.099112,0.568047,...,0.001139,17.887769,0.028824,0.099112,0.568047,0.001392,0.001139,17.887769,0.028824,0.036027
3116222901915015000,266.529418,60.919060,0.999972,51.0,9.0,20.035700,0.129517,0.11000687071166554,0.203163,0.230237,...,0.003956,19.964101,0.049046,0.203163,0.230237,-0.000406,0.003956,19.964101,0.049046,0.047259
3133242891915015004,266.529522,60.919082,0.999981,41.0,6.0,20.034224,0.135806,0.11175345878539097,0.203163,0.248353,...,0.004904,19.984800,0.058260,0.203163,0.248353,-0.000010,0.004904,19.984800,0.058260,0.047255
3127277780015015036,290.458985,-26.727137,0.999925,9.0,35.0,19.520500,0.210759,2.4959839018887138,,,...,,,,,,,,,,0.117575


In [26]:
mlflow.log_artifact?

In [27]:
PARAMS

{'l2_regularization': 10, 'learning_rate': 0.1, 'random_state': 42}

In [28]:

# -------------------
# 8. Train!
# -------------------

MODEL_TAG = f"{model_subpath}_round_{CURRENT_ROUND}"

with mlflow.start_run(run_name=f"round_{CURRENT_ROUND}_{SAMPLING_STRATEGY}"):

    # Log metadata
    meta_info = {
        "round": int(CURRENT_ROUND),
        "timestamp": datetime.utcnow().isoformat(),
        "n_train": int(X_train.shape[0]),
        "sampling_strategy": str(SAMPLING_STRATEGY),
        "model_tag": str(MODEL_TAG)
    }

    with open("meta.json", "w") as f:
        json.dump(meta_info, f, indent=2)
    mlflow.log_artifact("meta.json")

    # Train model
    mlflow.log_params(PARAMS)
    clf_new = HistGradientBoostingClassifier(**PARAMS)
    clf_new.fit(X_train.values, y_train.values)

    # Evaluate on training set
    acc = accuracy_score(y_train, clf_new.predict(X_train.values))
    mlflow.log_metric("accuracy", acc)
    
    prec = precision_score(y_train, clf_new.predict(X_train.values))
    mlflow.log_metric("precision", prec)
    
    recall = recall_score(y_train, clf_new.predict(X_train.values))
    mlflow.log_metric("recall", recall)
    
    f1 = f1_score(y_train, clf_new.predict(X_train.values))
    mlflow.log_metric("f1-score", f1)

    # Log model
    signature = infer_signature(X_train, clf_new.predict(X_train))
    mlflow.sklearn.log_model(
        clf_new,
        artifact_path=model_subpath,
        signature=signature,
        input_example=X_train.iloc[:2]
    )

    # Save training state
    mlflow.log_artifact(training_ids_path)
    mlflow.log_artifact(f"{OUTPUT_ROOT}{EXPERIMENT}/y_train_R{CURRENT_ROUND}.csv")
    mlflow.log_artifact(f"{OUTPUT_ROOT}{EXPERIMENT}/X_train_R{CURRENT_ROUND}.csv")




🏃 View run round_4_uncertainty at: http://127.0.0.1:6969/#/experiments/575240719855823327/runs/14edd4ff4a1f4ca1b428c6bffffc295e
🧪 View experiment at: http://127.0.0.1:6969/#/experiments/575240719855823327




# Dev in `finkvra`

In [214]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [215]:
from finkvra.alonstream.alpipeline import ALPipeline

In [216]:
alpipe = ALPipeline(configfile='./testconfigfile.yaml')

2025-06-06 07:16:38,572 [INFO] finkvra.alonstream.alpipeline.__init__: Settings loaded from ./testconfigfile.yaml
2025-06-06 07:16:38,585 [INFO] finkvra.alonstream.alpipeline.__mlflow_setup: This is ROUND 0 of EXPERIMENT teeeeest. Batch size is 30
2025-06-06 07:16:38,585 [INFO] finkvra.alonstream.alpipeline.__init__: ML Flow set-up complete.
2025-06-06 07:16:38,585 [INFO] finkvra.alonstream.alpipeline.__load_parquet: Loading parquet files from /home/stevance/Data/FinkZTFStream/*.parquet
2025-06-06 07:16:38,883 [INFO] finkvra.alonstream.alpipeline.__load_parquet: 88 .parquet files loaded from /home/stevance/Data/FinkZTFStream/*.parquet
2025-06-06 07:16:42,373 [INFO] finkvra.alonstream.alpipeline.__init__: Features made for 140 samples (unique candid) - 138 unique objects
2025-06-06 07:16:42,374 [INFO] root.__labeling: Batch size (30) reached.
2025-06-06 07:16:42,376 [INFO] root.__labeling: Updated the labels at: /home/stevance/Data/FinkZTFStream/labeled.csv
2025-06-06 07:16:42,377 [INFO

🏃 View run round_0_uncertainty at: http://127.0.0.1:6969/#/experiments/289037665776575127/runs/59abe7e8978744adb41fee550d6774d5
🧪 View experiment at: http://127.0.0.1:6969/#/experiments/289037665776575127




In [166]:
alpipe.candid_loop

Index([3072422580115015006, 3070413571415015007, 3070410155015010015,
       3070411124115015013, 3070413103215015021, 3070412070215015010,
       3070412555215015027, 3070413104315015004, 3072421642915015021,
       3070420854615015025, 3070411124115015020, 3070419441815015028,
       3070413103915015007, 3070419442415015034, 3070418963915015013,
       3070419905315015028, 3070421792715015016, 3070420384315015019,
       3070410150515015030, 3070412552515015017, 3072422112215015019,
       3070413574215015021, 3070410643715015019, 3072414510615010007,
       3070422264115015010, 3070414060515015001, 3069382583515015017,
       3070410153515015003, 3072417854015015021, 3072419752615015008],
      dtype='int64', name='candid')