# Import Standard Libraries and Set Up CUDA Environment
In this cell, we import essential Python libraries and determine the paths to the CUDA libraries based on the conda environment. By setting the environment variables (CUDA_PATH, LD_LIBRARY_PATH, and XLA_FLAGS), we ensure that TensorFlow can locate and use the GPU libraries for optimal performance.

In [None]:
import os
import sys
import getpass
from datetime import datetime

# Get conda environment path and determine CUDA library paths
conda_env_path = os.path.dirname(os.path.dirname(sys.executable))
cuda_path = os.path.join(conda_env_path, "lib")

# Set environment variables to help locate CUDA libraries for TensorFlow
os.environ['CUDA_PATH'] = conda_env_path
os.environ['LD_LIBRARY_PATH'] = f"{cuda_path}:{os.environ.get('LD_LIBRARY_PATH', '')}"
os.environ['XLA_FLAGS'] = f"--xla_gpu_cuda_data_dir={conda_env_path}"

print(f"Set CUDA_PATH to: {conda_env_path}")
print(f"Set XLA_FLAGS to: {os.environ['XLA_FLAGS']}")


# Import Machine Learning Libraries and Modules

This cell loads the key libraries required for building and training the model. These include TensorFlow (and related modules), MLflow for tracking experiments, and additional libraries for data processing, visualization, and command-line argument parsing. Utility functions from photoz_utils and DataMakerPlus are also imported for custom data handling.

In [None]:
import tensorflow as tf
import tensorflow_probability as tfp
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import h5py
import keras
import tensorboard
import mlflow
import mlflow.tensorflow
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Input, Concatenate
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler
from tensorboard.plugins.hparams import api as hp
import argparse

# Import utility functions for handling data and domain-specific processing
from photoz_utils import *
from DataMakerPlus import *

print(f"TensorFlow version: {tf.__version__}")


# Configure GPU Settings
This cell checks for available GPUs and enables memory growth. Enabling memory growth ensures that TensorFlow only allocates as much GPU memory as needed, rather than grabbing all available memory. This is especially useful when running multiple experiments on the same machine.

In [None]:
# List available GPUs and enable memory growth to prevent TensorFlow from allocating all GPU memory at once
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"Found {len(gpus)} GPU(s)")
    for i, gpu in enumerate(gpus):
        print(f"  GPU {i}: {gpu}")
        try:
            tf.config.experimental.set_memory_growth(gpu, True)
            print(f"  Memory growth enabled for GPU {i}")
        except RuntimeError as e:
            print(f"  Error setting memory growth: {e}")
else:
    print("No GPUs found. Please check your TensorFlow installation.")
    sys.exit(1)


# Parse Command-Line Arguments
This cell utilizes the argparse module to define and parse command-line arguments. These arguments allow you to easily customize training parameters (such as image size, number of epochs, batch size, and learning rate) and MLflow experiment details (experiment and run names) without modifying the code.

In [None]:
# Define command-line arguments to configure hyperparameters and experiment details
parser = argparse.ArgumentParser(description="Train CNN for redshift estimation with MLFlow tracking.")
parser.add_argument('--image_size', type=int, default=64, choices=[64, 127], help="Image size (default: 64)")
parser.add_argument('--epochs', type=int, default=200, help="Number of training epochs (default: 200)")
parser.add_argument('--batch_size', type=int, default=256, help="Batch size (default: 256)")
parser.add_argument('--learning_rate', type=float, default=0.0001, help="Learning rate (default: 0.0001)")
parser.add_argument('--experiment_name', type=str, default="Galaxy_CNN_Redshift_Estimation", help="MLflow experiment name")
parser.add_argument('--run_name', type=str, default=None, help="MLflow run name (default: auto-generated)")
parser.add_argument('--gpu_id', type=int, default=0, help="GPU ID to use (default: 0)")
args = parser.parse_args()


# GPU Selection for Multi-GPU Systems

When multiple GPUs are detected, this cell sets the CUDA_VISIBLE_DEVICES environment variable to select the specific GPU (as given by the command-line argument). This ensures that the training runs on the intended GPU, which can be important for resource allocation and reproducibility.

In [None]:
# If multiple GPUs are available, use the specified GPU from the command-line argument
if len(gpus) > 1:
    os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu_id)
    print(f"Using GPU {args.gpu_id}")


# Confirm GPU Usage for Training
This cell uses TensorFlow’s device context to verify that a GPU is available for training. It confirms that the code is running on a GPU device (here assumed to be /GPU:0), which is critical for taking advantage of hardware acceleration.

In [None]:
# Confirm that TensorFlow can access the GPU by explicitly setting the device
with tf.device('/GPU:0'):
    print("GPU is available and will be used for training")


# MLflow Tracking Setup

This cell configures MLflow for experiment tracking. It sets the tracking URI to a shared directory (/shared/mlruns) and initializes (or creates) the MLflow experiment with the name specified via the command-line argument. This integration allows you to log hyperparameters, metrics, and artifacts during training.


In [None]:
# Setup MLflow tracking directory and experiment
mlruns_dir = "/shared/mlruns"
os.makedirs(mlruns_dir, exist_ok=True)
mlflow.set_tracking_uri(f"file://{mlruns_dir}")
mlflow.set_experiment(args.experiment_name)


# Create Directories for Checkpoints and Logs

In this cell, directories are created to store model checkpoints and training logs. Organizing these outputs helps manage experiment artifacts and makes it easier to review model performance later. The checkpoints are particularly important as they capture the model's state at a given time, which MLflow logs as an artifact

In [None]:
# Define base directory for experiments and create directories for checkpoints and logs
base_dir = "/shared/experiments"
checkpoint_dir = os.path.join(base_dir, "MLCheckpoints")
log_dir = os.path.join(base_dir, "MLlogs")

os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(log_dir, exist_ok=True)


# Define Hyperparameters and Data Paths

This cell sets the hyperparameters for the model and training process using the parsed command-line arguments. It also stores these values in a dictionary (hparams) which will later be logged in MLflow for reproducibility. In addition, file paths for the training, validation, and test datasets are defined and checked for existence to avoid runtime errors.

In [None]:
# Define hyperparameters and dataset file paths
image_size = args.image_size
BATCH_SIZE = args.batch_size
NUM_EPOCHS = args.epochs
LEARNING_RATE = args.learning_rate
NUM_DENSE_UNITS = 200
Z_MAX = 4
DATA_FORMAT = 'channels_first'

# Store hyperparameters in a dictionary for MLflow logging
hparams = {
    'image_size': image_size,
    'num_dense_units': NUM_DENSE_UNITS,
    'batch_size': BATCH_SIZE,
    'num_epochs': NUM_EPOCHS,
    'learning_rate': LEARNING_RATE,
    'z_max': Z_MAX,
    'data_format': DATA_FORMAT
}

# Dataset file paths for training, validation, and testing
TRAIN_PATH = '/shared/astrodata/5x64x64_training_with_morphology.hdf5'
VAL_PATH = '/shared/astrodata/5x64x64_validation_with_morphology.hdf5'
TEST_PATH = '/shared/astrodata/5x64x64_testing_with_morphology.hdf5'

# Verify that each dataset exists before proceeding
for path in [TRAIN_PATH, VAL_PATH, TEST_PATH]:
    if not os.path.exists(path):
        raise FileNotFoundError(f"Dataset not found: {path}")


# Define Unique Checkpoint File Naming

This cell creates a unique file name for the model checkpoint by incorporating the current user's name and a timestamp. This practice ensures that each training run’s checkpoint is uniquely identifiable, which is useful when tracking and comparing multiple experiments in MLflow.

In [None]:
# Generate a unique checkpoint filepath based on username and current timestamp
username = getpass.getuser()
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
checkpoint_filepath = os.path.join(checkpoint_dir, f"{username}_cp_{timestamp}.weights.h5")


# Define the Model Architecture 

This cell defines the function create_model(), which constructs a Keras model with two input branches. One branch (CNN) processes image data while the other (NN) handles additional numerical features. The branches are concatenated before producing the final output. The model is compiled with the Adam optimizer, mean squared error loss, and RMSE as a metric. This function encapsulates the model architecture, making it reusable in different parts of the notebook.

In [None]:
def create_model():
    # Define two separate inputs: one for image data (CNN branch) and one for auxiliary numeric data (NN branch)
    input_cnn = Input(shape=(5, image_size, image_size))
    input_nn = Input(shape=(5,))
    
    # CNN branch: two convolutional layers followed by max pooling
    conv1 = Conv2D(32, kernel_size=(3, 3), activation='tanh', padding='same', data_format=DATA_FORMAT)(input_cnn)
    pool1 = MaxPooling2D(pool_size=(2, 2), data_format=DATA_FORMAT)(conv1)
    conv2 = Conv2D(64, kernel_size=(3, 3), activation='tanh', padding='same', data_format=DATA_FORMAT)(pool1)
    pool2 = MaxPooling2D(pool_size=(2, 2), data_format=DATA_FORMAT)(conv2)
    
    # Flatten and process the CNN branch
    flatten = Flatten()(pool2)
    dense1 = Dense(512, activation='tanh')(flatten)
    dense2 = Dense(128, activation='tanh')(dense1)
    dense3 = Dense(32, activation='tanh')(dense2)
    
    # NN branch: process the auxiliary input through dense layers
    hidden1 = Dense(NUM_DENSE_UNITS, activation="relu")(input_nn)
    hidden2 = Dense(NUM_DENSE_UNITS, activation="relu")(hidden1)
    
    # Concatenate features from both branches and output a single prediction
    concat = Concatenate()([dense3, hidden2])
    output = Dense(1)(concat)
    
    # Compile the model with Adam optimizer and mean squared error loss; RMSE is tracked as a metric
    model = Model(inputs=[input_cnn, input_nn], outputs=output)
    model.compile(optimizer=Adam(learning_rate=LEARNING_RATE), loss='mse', metrics=[tf.keras.metrics.RootMeanSquaredError()])
    return model


# Define the MLflow Callback for Logging Metrics

Here we create a custom Keras callback—MLflowCallback—that logs each metric (e.g., loss and RMSE) to MLflow at the end of every epoch. This integration is crucial for tracking model performance over time and enables detailed experiment logging and later analysis via MLflow’s UI.

In [None]:
# Define a custom callback to log training metrics to MLflow at the end of each epoch
class MLflowCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        for name, value in logs.items():
            mlflow.log_metric(name, value, step=epoch)


# Define the Training Function with MLflow Integration

This cell defines the train_model_with_mlflow() function, which encapsulates the full training process:

* MLflow Integration: Starts an MLflow run, logs hyperparameters, and sets a run tag.
* Model Training: Initializes the model, sets up data generators, and begins training while the custom MLflow callback logs metrics.
* Artifact Logging: After training, the model is saved and the checkpoint is logged as an artifact in MLflow.
* Evaluation: The model is evaluated on a test dataset, and performance metrics (loss and RMSE) are logged. This comprehensive integration with MLflow ensures that every run is tracked, reproducible, and easily comparable.

In [None]:
def train_model_with_mlflow():
    # Generate a run name if not provided, which includes key hyperparameter values and the username
    run_name = args.run_name or f"GalaxyCNN_Size{image_size}_Batch{BATCH_SIZE}_LR{LEARNING_RATE}_Epochs{NUM_EPOCHS}_{username}"
    
    # Start an MLflow run to track this experiment
    with mlflow.start_run(run_name=run_name):
        mlflow.set_tag("username", username)
        mlflow.log_params(hparams)  # Log all hyperparameters
        
        # Create and compile the model
        model = create_model()
        
        # Create data generators for training and validation
        train_gen = HDF5DataGenerator(TRAIN_PATH, mode='train', batch_size=BATCH_SIZE)
        val_gen = HDF5DataGenerator(VAL_PATH, mode='train', batch_size=BATCH_SIZE)
        
        # Train the model with the MLflow callback to log metrics after each epoch
        model.fit(train_gen, epochs=NUM_EPOCHS, validation_data=val_gen, callbacks=[MLflowCallback()], verbose=1)
        
        # Save the model checkpoint and log it as an MLflow artifact
        model.save(checkpoint_filepath)
        mlflow.log_artifact(checkpoint_filepath)
        
        # Evaluate the model on the test set and log the performance metrics
        test_gen = HDF5DataGenerator(TEST_PATH, mode='test', batch_size=BATCH_SIZE)
        test_loss, test_rmse = model.evaluate(test_gen, verbose=1)
        mlflow.log_metric("test_loss", test_loss)
        mlflow.log_metric("test_rmse", test_rmse)

        print(f"Training complete. MLflow Run ID: {mlflow.active_run().info.run_id}")


# Main Execution Guard 

The final cell checks if the script is being run as the main program. If so, it calls the training function. This structure allows the notebook to be executed as a standalone script while keeping the code modular and easy to test.

In [None]:
if __name__ == "__main__":
    train_model_with_mlflow()
