In [1]:
import dask
import time

import cml.workers_v1 as workers
dask_scheduler = workers.launch_workers(
    n=1,
    cpu=2,
    memory=8,
    code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090",
)

# Wait for the scheduler to start.
time.sleep(10)

scheduler_workers = workers.list_workers()
scheduler_id = dask_scheduler[0]["id"]
scheduler_ip = [
    worker["ip_address"] for worker in scheduler_workers if worker["id"] == scheduler_id
][0]

scheduler_url = f"tcp://{scheduler_ip}:8786"

k8s_pods = 5
dask_workers = workers.launch_workers(
    n=k8s_pods,
    cpu=1,
    memory=8,
    code=f"!dask-worker {scheduler_url}",
)

# Wait for the workers to start.
time.sleep(10)

print("\nDask Diagnostic dashboard:")
print("//".join(dask_scheduler[0]["app_url"].split("//")))

Skipping addon with invalid or excluded ID: {'type': 'cmladdon', 'path': '/runtime-addons/cmladdon-2.0.49-b279', 'spec': '\nenv:\n  MLFLOW_TRACKING_URI: cml://localhost\n  MLFLOW_REGISTRY_URI: cml://localhost\n  PYTHONPATH: ${PYTHONPATH}:/opt/cmladdons/python/site-customize\n  R_LIBS_SITE: ${R_LIBS_SITE}:/opt/cmladdons/r/libs\npaths:\n  - /opt/cmladdons', 'version': '', 'id': -1}
Skipping addon with invalid or excluded ID: {'type': 'cmladdon', 'path': '/runtime-addons/cmladdon-2.0.49-b279', 'spec': '\nenv:\n  MLFLOW_TRACKING_URI: cml://localhost\n  MLFLOW_REGISTRY_URI: cml://localhost\n  PYTHONPATH: ${PYTHONPATH}:/opt/cmladdons/python/site-customize\n  R_LIBS_SITE: ${R_LIBS_SITE}:/opt/cmladdons/r/libs\npaths:\n  - /opt/cmladdons', 'version': '', 'id': -1}

Dask Diagnostic dashboard:
https://qnsesx5st7vpkzjv.cmlws5.apps.dlee5.cldr.example/


In [2]:
import dask.dataframe as dd
from dask.distributed import Client
from dask_ml.model_selection import train_test_split
from xgboost.dask import DaskXGBRegressor
import xgboost as xgb
from sklearn.metrics import mean_absolute_error, r2_score
import joblib
import pandas as pd

def train_underwriting_model_dask(data_file='underwriting_data.csv', model_bundle_output_file='underwriting_bundle.joblib'):
    """
    Loads large-scale underwriting data using Dask, trains an XGBoost model, 
    and saves the model and its required columns into a single file.
    """
    print("--- Starting Dask-based Underwriting Model Training Process ---")
    
    # --- Set up a Dask Client ---
    # This creates a local cluster on your machine to handle the parallel processing.
    client = Client(scheduler_url)
    print(f"Dask client created. Dashboard at: {client}")
    
    # --- Load Data with Dask ---
    # Dask reads the data in partitions instead of loading it all into memory.
    # blocksize specifies how large each chunk should be.
    print(f"Loading data from '{data_file}' with Dask...")
    ddf = dd.read_csv(data_file, blocksize="64MB")
    
    # --- Feature Engineering and Preprocessing with Dask ---
    print("Performing Dask feature engineering (One-Hot Encoding)...")
    # Dask's get_dummies requires the column to be of 'category' dtype first.
    ddf = ddf.categorize(columns=['vehicle_type'])
    ddf_processed = dd.get_dummies(ddf, columns=['vehicle_type'], drop_first=True)
    
    # --- Define Features (X) and Target (y) ---
    # We drop the calculated_risk_score as the model should learn this relationship itself.
    X = ddf_processed.drop(['annual_premium_quote', 'calculated_risk_score'], axis=1)
    y = ddf_processed['annual_premium_quote']

    # Persist the Dask collections in memory across the cluster to speed up subsequent computations
    X = X.persist()
    y = y.persist()

    # Get the column layout for the inference script
    # We compute the columns to get a concrete list from the Dask object.
    model_columns = X.columns
    
    # --- Split Data ---
    # Dask-ML provides its own train_test_split function.
    # We add shuffle=True to address the FutureWarning and adopt the recommended behavior.
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)
    print(f"Data split into training and testing sets.")

    # --- Initialize and Train the Dask-XGBoost Model ---
    print("\nTraining Dask-XGBoost Regressor model for premium quotation...")
    # Use the DaskXGBRegressor class from the official xgboost.dask module
    model = DaskXGBRegressor(
        objective='reg:squarederror',
        n_estimators=1000,
        learning_rate=0.05,
        max_depth=6,
        subsample=0.7,
        colsample_bytree=0.7,
        random_state=42
    )
    
    # The .fit() method will now trigger the Dask computation graph.
    model.fit(X_train, y_train)
              
    print("Model training complete.")
    
    # --- Create a single bundle containing the standard Booster model and columns ---
    # Extract the core, non-Dask Booster object for portability
    booster = model.get_booster()
    
    model_bundle = {
        'model': booster,
        'columns': model_columns
    }
    
    # --- Save the bundle to a single file ---
    joblib.dump(model_bundle, model_bundle_output_file)
    print(f"\nModel and column info bundled and saved to '{model_bundle_output_file}'.")
    
    # --- Evaluate the Model ---
    print("\n--- Evaluating Model Performance on Test Set ---")
    # We need to compute the results since they are lazy Dask collections.
    y_pred = model.predict(X_test)
    y_test_computed, y_pred_computed = client.compute([y_test, y_pred], sync=True)
    
    mae = mean_absolute_error(y_test_computed, y_pred_computed)
    r2 = r2_score(y_test_computed, y_pred_computed)
    
    print(f"R-squared (R²): {r2:.4f}")
    print(f"Mean Absolute Error (MAE): ${mae:,.2f}")
    
    client.close()

if __name__ == "__main__":
    train_underwriting_model_dask()

--- Starting Dask-based Underwriting Model Training Process ---
Dask client created. Dashboard at: <Client: 'tcp://10.42.1.124:8786' processes=5 threads=160, memory=36.78 GiB>
Loading data from 'underwriting_data.csv' with Dask...
Performing Dask feature engineering (One-Hot Encoding)...
Data split into training and testing sets.

Training Dask-XGBoost Regressor model for premium quotation...
Model training complete.

Model and column info bundled and saved to 'underwriting_bundle.joblib'.

--- Evaluating Model Performance on Test Set ---
R-squared (R²): 0.9963
Mean Absolute Error (MAE): $126.23
