In [1]:
import dask
import time

import cml.workers_v1 as workers
dask_scheduler = workers.launch_workers(
    n=1,
    cpu=2,
    memory=8,
    code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090",
)

# Wait for the scheduler to start.
time.sleep(10)

scheduler_workers = workers.list_workers()
scheduler_id = dask_scheduler[0]["id"]
scheduler_ip = [
    worker["ip_address"] for worker in scheduler_workers if worker["id"] == scheduler_id
][0]

scheduler_url = f"tcp://{scheduler_ip}:8786"

k8s_pods = 5
dask_workers = workers.launch_workers(
    n=k8s_pods,
    cpu=1,
    memory=8,
    code=f"!dask-worker {scheduler_url}",
)

# Wait for the workers to start.
time.sleep(10)

print("\nDask Diagnostic dashboard:")
print("//".join(dask_scheduler[0]["app_url"].split("//")))

Skipping addon with invalid or excluded ID: {'type': 'cmladdon', 'path': '/runtime-addons/cmladdon-2.0.49-b279', 'spec': '\nenv:\n  MLFLOW_TRACKING_URI: cml://localhost\n  MLFLOW_REGISTRY_URI: cml://localhost\n  PYTHONPATH: ${PYTHONPATH}:/opt/cmladdons/python/site-customize\n  R_LIBS_SITE: ${R_LIBS_SITE}:/opt/cmladdons/r/libs\npaths:\n  - /opt/cmladdons', 'version': '', 'id': -1}
Skipping addon with invalid or excluded ID: {'type': 'cmladdon', 'path': '/runtime-addons/cmladdon-2.0.49-b279', 'spec': '\nenv:\n  MLFLOW_TRACKING_URI: cml://localhost\n  MLFLOW_REGISTRY_URI: cml://localhost\n  PYTHONPATH: ${PYTHONPATH}:/opt/cmladdons/python/site-customize\n  R_LIBS_SITE: ${R_LIBS_SITE}:/opt/cmladdons/r/libs\npaths:\n  - /opt/cmladdons', 'version': '', 'id': -1}

Dask Diagnostic dashboard:
https://4dow92pvmcytb8u1.cmlws5.apps.dlee5.cldr.example/


In [2]:
# Train xgboost model with dask
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
from xgboost.dask import DaskXGBClassifier
from dask_ml.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import time

def feature_engineering_dask(ddf):
    print("Performing feature engineering with Dask...")
    ddf = ddf.set_index('msisdn') # ddf refers the input file to be declared in the '__main__' construct.

    # New columns
    ddf['is_outgoing'] = (ddf['call_direction'] == 'outgoing').astype(int) # 1 = call_direction is 'outgoing' and 0 otherwise.
    ddf['is_nocturnal'] = ((ddf['hour_of_day'] >= 22) | (ddf['hour_of_day'] <= 6)).astype(int) # 1 if the hour_of_day is 10pm-6am.

    # Declare new columns
    meta = {
        'total_calls': 'int64',
        'outgoing_call_ratio': 'float64',
        'avg_duration': 'float64',
        'std_duration': 'float64',
        'nocturnal_call_ratio': 'float64',
        'mobility': 'int64',
        'is_fraud': 'bool'
    }

    # groups the DataFrame by msisdn and applies the calculate_all_features_for_group function to each user's group of records.
    user_features_ddf = ddf.groupby('msisdn').apply(
        calculate_all_features_for_group,
        meta=meta # Declare the new columns' data type
    ).persist() # Keep the result in the RAM to avoid re-computing these features later.

    return user_features_ddf

def calculate_all_features_for_group(group):
    group['is_fraud'] = group['is_fraud'].astype(bool)

    nocturnal_hours = (group['hour_of_day'] >= 22) | (group['hour_of_day'] <= 6)
    features = {
        'total_calls': len(group), # The total number of records for that msisdn.
        'outgoing_call_ratio': (group['call_direction'] == 'outgoing').mean(),
        'avg_duration': group['duration'].mean(),
        'std_duration': group['duration'].std(), # The standard deviation of the calls.
        'nocturnal_call_ratio': nocturnal_hours.mean(), # The proportion of their calls made at night.
        'mobility': group['cell_tower'].nunique(), # The number of unique cell towers they connected to, a measure of their movement.
        'is_fraud': group['is_fraud'].iloc[0] # The fraud label for that msisdn
    }
    return pd.Series(features) # Returns the calculated features as a pandas Series, which Dask will then assemble into the new DataFrame.

def train_fraud_detection_model_xgb_dask(features_ddf, client):
    print("\nTraining the XGBoost model with Dask...") 
    features_ddf = features_ddf.fillna(0) # Fills any missing values (e.g., std_duration could be NaN if a user made only one call) with 0.

    X = features_ddf.drop('is_fraud', axis=1) # axis=1 refers to the column. Drop entire column.
    y = features_ddf['is_fraud'] # make prediction based on X features.

    # Splits the data into a training set (80%) and a testing set (20%).
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, shuffle=True
    )

    # Handle class imbalance safely
    print("Calculating scale_pos_weight for class imbalance...")
    class_counts = y_train.value_counts().compute() # tells Dask to perform the calculation and bring the result back to the main process.
    neg_count = class_counts.get(False, 1) # Get count for label False
    pos_count = class_counts.get(True, 1)  # Get count for label True
    scale_pos_weight = neg_count / pos_count #Calculates the ratio of non-fraudulent to fraudulent msisdn. 
    print(f"scale_pos_weight: {scale_pos_weight:.2f}") #This value tells the model to pay more attention to the fraudulent cases.

    model = DaskXGBClassifier(
        n_estimators=100, # 100 trees
        random_state=42, # ensures this randomness is the same every time code runs.
        scale_pos_weight=scale_pos_weight, #count(negative)/count(positive), focus on fraudulent cases.
        objective='binary:logistic', #binary classification and the model output the probability of a sample belonging to the positive class.
        eval_metric='logloss', #report the logloss on a test set after each tree is built.
        tree_method='hist' # find the best split points when building a tree.
    )

    model.fit(X_train, y_train)

    print("\nModel Evaluation on Test Set...")
    y_pred = model.predict(X_test) #lazy operation.
    y_test_computed, y_pred_computed = client.compute([y_test, y_pred], sync=True)

    print(confusion_matrix(y_test_computed, y_pred_computed))
    print(classification_report(y_test_computed, y_pred_computed))

    # Get booster and feature importances
    booster = model.get_booster()
    feature_scores = booster.get_score(importance_type='weight')
    feature_importances = pd.Series(feature_scores).sort_values(ascending=False) # Sort items from the highest value to the lowest value.

    print("\nFeature Importances:")
    print(feature_importances)

    return booster  # Return Booster object

if __name__ == '__main__':
    client = Client(scheduler_url) # Connects to the Dask scheduler. scheduler_url is already declared outside of this script.
    print(f"Dask client connected: {client}")

    raw_data_filename = '3G_cdr_data.csv'
    model_output_filename = 'fraud_detection_model_xgb.json'

    try:
        print(f"\nReading '{raw_data_filename}' with Dask...")
        raw_ddf = dd.read_csv(raw_data_filename, blocksize="128MB") # tells Dask to partition the file into 128MB chunks. Default = 64MB.
    except FileNotFoundError:
        print(f"Error: Raw data file not found at '{raw_data_filename}'.")
        print("Please run the data creation script first.")
        client.close() # Close connection to Dask cluster.
        exit()

    start_time = time.time()

    features_ddf = feature_engineering_dask(raw_ddf)
    fraud_model_booster = train_fraud_detection_model_xgb_dask(features_ddf, client)

    # Save the model in JSON format
    fraud_model_booster.save_model(model_output_filename)

    print(f"\nTrained XGBoost model saved to '{model_output_filename}'")
    print(f"Process complete in {time.time() - start_time:.2f} seconds.")
    
    client.close()

Dask client connected: <Client: 'tcp://10.42.1.216:8786' processes=5 threads=160, memory=36.78 GiB>

Reading '3G_cdr_data.csv' with Dask...
Performing feature engineering with Dask...

Training the XGBoost model with Dask...
Calculating scale_pos_weight for class imbalance...
scale_pos_weight: 18.96

Model Evaluation on Test Set...
[[45093     4]
 [    0  2352]]
              precision    recall  f1-score   support

       False       1.00      1.00      1.00     45097
        True       1.00      1.00      1.00      2352

    accuracy                           1.00     47449
   macro avg       1.00      1.00      1.00     47449
weighted avg       1.00      1.00      1.00     47449


Feature Importances:
mobility                177.0
total_calls             119.0
avg_duration            100.0
outgoing_call_ratio      85.0
nocturnal_call_ratio     48.0
std_duration              6.0
dtype: float64

Trained XGBoost model saved to 'fraud_detection_model_xgb.json'
Process complete in 507.69