In [1]:
import subprocess, sys, os
import cml.workers_v1 as workers

DASHBOARD_PORT = os.environ['CDSW_READONLY_PORT']
DASHBOARD_IP = os.environ['CDSW_IP_ADDRESS']

command = "ray start --head --block --include-dashboard=true --dashboard-port=$CDSW_READONLY_PORT --num-gpus=0 &" 

subprocess.run(command, shell = True, executable="/bin/bash")

with open("RAY_HEAD_IP", 'w') as output_file:
    output_file.write(DASHBOARD_IP)
            
ray_head_addr = DASHBOARD_IP + ':6379'
ray_url = f"ray://{DASHBOARD_IP}:10001" 
worker_start_cmd = f"!ray start --block --address={ray_head_addr}"

ray_workers = workers.launch_workers(
    n=5, 
    cpu=1, 
    memory=16,
    nvidia_gpu=0,
    code=worker_start_cmd,
)

Skipping addon with invalid or excluded ID: {'type': 'cmladdon', 'path': '/runtime-addons/cmladdon-2.0.49-b279', 'spec': '\nenv:\n  MLFLOW_TRACKING_URI: cml://localhost\n  MLFLOW_REGISTRY_URI: cml://localhost\n  PYTHONPATH: ${PYTHONPATH}:/opt/cmladdons/python/site-customize\n  R_LIBS_SITE: ${R_LIBS_SITE}:/opt/cmladdons/r/libs\npaths:\n  - /opt/cmladdons', 'version': '', 'id': -1}
2025-07-15 11:07:44,460	INFO usage_lib.py:467 -- Usage stats collection is enabled by default without user confirmation because this terminal is detected to be non-interactive. To disable this, add `--disable-usage-stats` to the command that starts the cluster, or run the following command: `ray disable-usage-stats` before starting the cluster. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.
2025-07-15 11:07:44,460	INFO scripts.py:971 -- [37mLocal node IP[39m: [1m10.42.1.93[22m
2025-07-15 11:07:56,667	SUCC scripts.py:1007 -- [32m--------------------[39m
2025-07-15 11:07:56,66

In [2]:
import ray
from ray.data import Dataset
from ray.train import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
import xgboost as xgb
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix
import time
import os

from ray.data import Dataset

def calculate_all_features_for_group(group_df: pd.DataFrame) -> pd.DataFrame:
    """Calculates aggregated features for a single user (a group of records)."""
    group_df['is_fraud'] = group_df['is_fraud'].astype(bool)
    nocturnal_hours = (group_df['hour_of_day'] >= 22) | (group_df['hour_of_day'] <= 6)
    features = {
        'total_calls': len(group_df),
        'outgoing_call_ratio': (group_df['call_direction'] == 'outgoing').mean(),
        'avg_duration': group_df['duration'].mean(),
        'std_duration': group_df['duration'].std(),
        'nocturnal_call_ratio': nocturnal_hours.mean(),
        'mobility': group_df['cell_tower'].nunique(),
        'is_fraud': group_df['is_fraud'].iloc[0]
    }
    return pd.DataFrame([features], index=[group_df['msisdn'].iloc[0]])

def feature_engineering_ray(ds: ray.data.Dataset) -> ray.data.Dataset:
    """Performs feature engineering on the raw call data using Ray Data."""
    print("Performing feature engineering...")
    user_features_ds = ds.groupby('msisdn').map_groups(
        calculate_all_features_for_group
    )
    return user_features_ds

def prepare_data(dataset: Dataset) -> tuple[Dataset, Dataset, Dataset]:
    """Splits the dataset into train (70%), validation (15%), and test (15%) sets."""
    print("Splitting data into training, validation, and test sets...")
    seed = 42
    train_dataset, rest = dataset.train_test_split(test_size=0.3, shuffle=True, seed=seed)
    valid_dataset, test_dataset = rest.train_test_split(test_size=0.5, shuffle=True, seed=seed)
    
    print(f"Train set size: {train_dataset.count()}")
    print(f"Validation set size: {valid_dataset.count()}")
    print(f"Test set size: {test_dataset.count()}")
    
    return train_dataset, valid_dataset, test_dataset

def train_fraud_detection_model_xgb_ray(train_ds: ray.data.Dataset, valid_ds: ray.data.Dataset):
    """Trains a fraud detection model using XGBoost with Ray Train."""
    print("\nTraining the XGBoost model with Ray Train...")

    train_ds = train_ds.map_batches(lambda df: df.fillna(0), batch_format="pandas")
    valid_ds = valid_ds.map_batches(lambda df: df.fillna(0), batch_format="pandas")

    print("Calculating scale_pos_weight from training data for class imbalance...")
    num_fraud = train_ds.filter(lambda row: row["is_fraud"] == True).count()
    num_non_fraud = train_ds.filter(lambda row: row["is_fraud"] == False).count()

    if num_fraud > 0 and num_non_fraud > 0:
        scale_pos_weight = num_non_fraud / num_fraud
        print(f"scale_pos_weight determined to be: {scale_pos_weight:.2f}")
    else:
        scale_pos_weight = 1.0
        print(f"Warning: Insufficient classes to calculate scale_pos_weight. Defaulting to 1.0.")

    xgb_params = {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "tree_method": "hist",
        "scale_pos_weight": scale_pos_weight,
        "random_state": 42,
    }

    label_column = 'is_fraud'
    
    trainer = XGBoostTrainer(
        scaling_config=ScalingConfig(num_workers=5, use_gpu=False),
        label_column=label_column,
        params=xgb_params,
        datasets={"train": train_ds, "valid": valid_ds},
    )

    result = trainer.fit()
    print("\nModel Training Complete.")
    
    # Corrected: Use get_best_checkpoint method
    best_checkpoint = result.get_best_checkpoint(metric="valid-logloss", mode="min")
    
    if best_checkpoint:
        with best_checkpoint.as_directory() as checkpoint_dir:
            #model_path = os.path.join(checkpoint_dir, "model.xgb")
            model_path = os.path.join(checkpoint_dir, "model.ubj")
            if os.path.exists(model_path):
                booster = xgb.Booster()
                booster.load_model(model_path)
                return booster
    print("Could not load a model from checkpoint.")
    return None

def evaluate_model(booster: xgb.Booster, test_ds: ray.data.Dataset):
    """Evaluates the trained model on the unseen test dataset."""
    print("\n--- Model Evaluation on Unseen Test Data ---")

    test_ds = test_ds.map_batches(lambda df: df.fillna(0), batch_format="pandas")

    feature_columns = [col for col in test_ds.columns() if col != 'is_fraud']
    label_column = 'is_fraud'
    
    test_df = test_ds.to_pandas()
    
    if test_df.empty:
        print("Test dataset is empty. Cannot evaluate.")
        return

    X_test = test_df[feature_columns]
    y_test = test_df[label_column]
    dmatrix_test = xgb.DMatrix(X_test)
    
    y_pred_proba = booster.predict(dmatrix_test)
    y_pred = (y_pred_proba > 0.5).astype(int)

    print("\nConfusion Matrix (Test Data):")
    print(confusion_matrix(y_test, y_pred))
    print("\nClassification Report (Test Data):")
    print(classification_report(y_test, y_pred))

    feature_scores = booster.get_score(importance_type='weight')
    if feature_scores:
        feature_importances = pd.Series(feature_scores).sort_values(ascending=False)
        print("\nFeature Importances:")
        print(feature_importances)


if __name__ == '__main__':
    
    raw_data_filename = 'cdr_data.csv'
    model_output_filename = 'fraud_detection_model_xgb_ray.json'

    try:
        print(f"\nReading '{raw_data_filename}' with Ray Data...")
        raw_ds = ray.data.read_csv(
            raw_data_filename,
        )
    except Exception as e:
        print(f"Error reading raw data file: {e}")
        exit()

    start_time = time.time()

    # 1. Split data into three sets
    train_raw_ds, valid_raw_ds, test_raw_ds = prepare_data(raw_ds)

    # 2. Apply feature engineering to each split
    train_features_ds = feature_engineering_ray(train_raw_ds)
    valid_features_ds = feature_engineering_ray(valid_raw_ds)
    test_features_ds = feature_engineering_ray(test_raw_ds)

    # 3. Train the model
    fraud_model_booster = train_fraud_detection_model_xgb_ray(train_features_ds, valid_features_ds)

    # 4. Evaluate and save the final model
    if fraud_model_booster:
        evaluate_model(fraud_model_booster, test_features_ds)
        
        fraud_model_booster.save_model(model_output_filename)
        print(f"\nTrained XGBoost model saved to '{model_output_filename}'")
    else:
        print("Model training failed, so no evaluation or saving was performed.")

    print(f"\nProcess complete in {time.time() - start_time:.2f} seconds.")
    
    # ray.shutdown()

2025-07-15 11:09:40,990	INFO worker.py:1723 -- Connecting to existing Ray cluster at address: 10.42.1.93:6379...
2025-07-15 11:09:41,025	INFO worker.py:1908 -- Connected to Ray cluster. View the dashboard at [1m[32m127.0.0.1:8100 [39m[22m



Reading 'cdr_data.csv' with Ray Data...


2025-07-15 11:09:51,089	INFO logging.py:295 -- Registered dataset logger for dataset dataset_2_0


Splitting data into training, validation, and test sets...


2025-07-15 11:09:51,367	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_2_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
2025-07-15 11:09:51,368	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_2_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[RandomShuffle] -> AggregateNumRows[AggregateNumRows]


Running 0:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- ReadCSV->SplitBlocks(384) 1: 0.00 row [00:00, ? row/s]

- RandomShuffle 2: 0.00 row [00:00, ? row/s]

Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- AggregateNumRows 5:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

2025-07-15 11:10:39,489	INFO streaming_executor.py:227 -- ✔️  Dataset dataset_2_0 execution finished in 48.11 seconds
2025-07-15 11:10:39,549	INFO logging.py:295 -- Registered dataset logger for dataset dataset_1_0
2025-07-15 11:10:39,568	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_1_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
2025-07-15 11:10:39,571	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_1_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[RandomShuffle]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(384) 1: 0.00 row [00:00, ? row/s]

- RandomShuffle 2: 0.00 row [00:00, ? row/s]

Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

2025-07-15 11:10:59,311	INFO streaming_executor.py:227 -- ✔️  Dataset dataset_1_0 execution finished in 19.74 seconds
2025-07-15 11:10:59,720	INFO logging.py:295 -- Registered dataset logger for dataset dataset_5_0
2025-07-15 11:10:59,725	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_5_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
2025-07-15 11:10:59,727	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_5_0: InputDataBuffer[Input] -> AllToAllOperator[RandomShuffle]


Running 0: 0.00 row [00:00, ? row/s]

- RandomShuffle 1: 0.00 row [00:00, ? row/s]

Shuffle Map 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

2025-07-15 11:11:01,135	INFO streaming_executor.py:227 -- ✔️  Dataset dataset_5_0 execution finished in 1.40 seconds
2025-07-15 11:11:01,315	INFO logging.py:295 -- Registered dataset logger for dataset dataset_17_0
2025-07-15 11:11:01,327	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_17_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
2025-07-15 11:11:01,330	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_17_0: InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>)->Filter(<lambda>)] -> AggregateNumRows[AggregateNumRows]


Train set size: 3000917
Validation set size: 643054
Test set size: 643054
Performing feature engineering...
Performing feature engineering...
Performing feature engineering...

Training the XGBoost model with Ray Train...
Calculating scale_pos_weight from training data for class imbalance...


Running 0:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- Sort 1: 0.00 row [00:00, ? row/s]

Sort Sample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>)->Filter(<lambda>) 5: 0.00 row [00:00, ? r…

- AggregateNumRows 6:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

2025-07-15 11:11:11,606	INFO streaming_executor.py:227 -- ✔️  Dataset dataset_17_0 execution finished in 10.27 seconds
2025-07-15 11:11:11,645	INFO logging.py:295 -- Registered dataset logger for dataset dataset_19_0
2025-07-15 11:11:11,654	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_19_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
2025-07-15 11:11:11,656	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_19_0: InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>)->Filter(<lambda>)] -> AggregateNumRows[AggregateNumRows]


Running 0:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- Sort 1: 0.00 row [00:00, ? row/s]

Sort Sample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>)->Filter(<lambda>) 5: 0.00 row [00:00, ? r…

- AggregateNumRows 6:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

2025-07-15 11:11:21,358	INFO streaming_executor.py:227 -- ✔️  Dataset dataset_19_0 execution finished in 9.69 seconds
2025-07-15 11:11:21,573	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


scale_pos_weight determined to be: 19.00




== Status ==
Current time: 2025-07-15 11:11:22 (running for 00:00:00.20)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-07-15 11:11:27 (running for 00:00:05.25)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-07-15 11:11:32 (running for 00:00:10.31)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 PENDING)


[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m Started distributed worker processes: 
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m - (node_id=5b95818d9f030342332c29f4928293bccf49d5216a1cd1fef3aec377, ip=10.42.3.72, pid=4512) world_rank=0, local_rank=0, node_rank=0
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m - (node_id=5b95818d9f030342332c29f4928293bccf49d5216a1cd1fef3aec377, ip=10.42.3.72, pid=4513) world_rank=1, local_rank=1, node_rank=0
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m - (node_id=5b95818d9f030342332c29f4928293bccf49d5216a1cd1fef3aec377, ip=10.42.3.72, pid=4511) world_rank=2, local_rank=2, node_rank=0
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m - (node_id=5b95818d9f030342332c29f4928293bccf49d5216a1cd1fef3aec377, ip=10.42.3.72, pid=4515) world_rank=3, local_rank=3, node_rank=0
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m - (node_id=5b95818d9f030342332c29f4928293bccf49d5216a1cd1fef3aec377, ip=10.42.3.72, pid=4514) world_rank=4, local_rank=4, nod

== Status ==
Current time: 2025-07-15 11:11:52 (running for 00:00:30.59)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-07-15 11:11:58 (running for 00:00:35.64)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(RayTrainWorker pid=4512, ip=10.42.3.72)[0m [11:11:59] Task [xgboost.ray-rank=00000000]:9d84c89a15f8a22c00bde6bc01000000 got rank 0
[36m(SplitCoordinator pid=4856, ip=10.42.3.72)[0m Registered dataset logger for dataset train_20_0
[36m(SplitCoordinator pid=4856, ip=10.42.3.72)[0m Starting execution of Dataset train_20_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
[36m(SplitCoordinator pid=4856, ip=10.42.3.72)[0m Execution plan of Dataset train_20_0: InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>)] -> OutputSplitter[split(5, equal=True)]


(pid=4856, ip=10.42.3.72) Running 0: 0.00 row [00:00, ? row/s]

(pid=4856, ip=10.42.3.72) - Sort 1: 0.00 row [00:00, ? row/s]

(pid=4856, ip=10.42.3.72) Sort Sample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

(pid=4856, ip=10.42.3.72) Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

(pid=4856, ip=10.42.3.72) Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

(pid=4856, ip=10.42.3.72) - MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>) 5: 0.00 row [00…

(pid=4856, ip=10.42.3.72) - split(5, equal=True) 6: 0.00 row [00:00, ? row/s]

== Status ==
Current time: 2025-07-15 11:12:03 (running for 00:00:40.70)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-07-15 11:12:08 (running for 00:00:45.75)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-07-15 11:12:13 (running for 00:00:50.80)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


[36m(SplitCoordinator pid=4856, ip=10.42.3.72)[0m ✔️  Dataset train_20_0 execution finished in 123.57 seconds
[36m(RayTrainWorker pid=4514, ip=10.42.3.72)[0m [11:11:59] Task [xgboost.ray-rank=00000004]:8c09f59564e2a4d97305afa801000000 got rank 4[32m [repeated 4x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m


== Status ==
Current time: 2025-07-15 11:14:04 (running for 00:02:42.05)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(RayTrainWorker pid=4512, ip=10.42.3.72)[0m Registered dataset logger for dataset dataset_24_0
[36m(SplitCoordinator pid=4857, ip=10.42.3.72)[0m Starting execution of Dataset valid_21_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
[36m(SplitCoordinator pid=4857, ip=10.42.3.72)[0m Execution plan of Dataset valid_21_0: InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>)] -> OutputSplitter[split(5, equal=True)]


(pid=4857, ip=10.42.3.72) Running 0: 0.00 row [00:00, ? row/s]

(pid=4857, ip=10.42.3.72) - Sort 1: 0.00 row [00:00, ? row/s]

(pid=4857, ip=10.42.3.72) Sort Sample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

(pid=4857, ip=10.42.3.72) Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

(pid=4857, ip=10.42.3.72) Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

(pid=4857, ip=10.42.3.72) - MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>) 5: 0.00 row [00…

(pid=4857, ip=10.42.3.72) - split(5, equal=True) 6: 0.00 row [00:00, ? row/s]

== Status ==
Current time: 2025-07-15 11:14:09 (running for 00:02:47.11)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-07-15 11:14:14 (running for 00:02:52.17)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(RayTrainWorker pid=4512, ip=10.42.3.72)[0m Registered dataset logger for dataset dataset_28_0[32m [repeated 6x across cluster][0m
[36m(SplitCoordinator pid=4857, ip=10.42.3.72)[0m ✔️  Dataset valid_21_0 execution finished in 10.24 seconds
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m [11:14:16] [0]	train-logloss:0.43757	train-error:0.00000	valid-logloss:0.43897	valid-error:0.00005
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m [11:14:16] [1]	train-logloss:0.29638	train-error:0.00000	valid-logloss:0.29788	valid-error:0.00005
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m [11:14:16] [2]	train-logloss:0.20741	train-error:0.00000	valid-logloss:0.21508	valid-error:0.00000
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m [11:14:16] [3]	train-logloss:0.14788	train-error:0.00000	valid-logloss:0.15392	valid-error:0.00000
[36m(XGBoostTrainer pid=4438, ip=10.42.3.72)[0m [11:14:16] [4]	train-logloss:0.10669	train-error:0.00000	valid-logloss:0.11163	valid-error:0.00000
[36m

== Status ==
Current time: 2025-07-15 11:14:18 (running for 00:02:56.05)
Using FIFO scheduling algorithm.
Logical resource usage: 6.0/192 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-07-15_11-07-44_461720_157/artifacts/2025-07-15_11-11-21/XGBoostTrainer_2025-07-15_11-11-21/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)



Model Training Complete.

--- Model Evaluation on Unseen Test Data ---


Running 0: 0.00 row [00:00, ? row/s]

- Sort 1: 0.00 row [00:00, ? row/s]

Sort Sample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>) 5: 0.00 row [00:00, ? row/s]

- limit=1 6: 0.00 row [00:00, ? row/s]

2025-07-15 11:14:22,440	INFO streaming_executor.py:227 -- ✔️  Dataset dataset_33_0 execution finished in 3.93 seconds
2025-07-15 11:14:22,461	INFO logging.py:295 -- Registered dataset logger for dataset dataset_32_0
2025-07-15 11:14:22,466	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_32_0. Full logs are in /tmp/ray/session_2025-07-15_11-07-44_461720_157/logs/ray-data
2025-07-15 11:14:22,466	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_32_0: InputDataBuffer[Input] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>)]


Running 0: 0.00 row [00:00, ? row/s]

- Sort 1: 0.00 row [00:00, ? row/s]

Sort Sample 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Map 3:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

Shuffle Reduce 4:   0%|          | 0.00/1.00 [00:00<?, ? row/s]

- MapBatches(calculate_all_features_for_group)->MapBatches(<lambda>) 5: 0.00 row [00:00, ? row/s]

2025-07-15 11:14:26,937	INFO streaming_executor.py:227 -- ✔️  Dataset dataset_32_0 execution finished in 4.46 seconds



Confusion Matrix (Test Data):
[[18981     0]
 [    0  1000]]

Classification Report (Test Data):
              precision    recall  f1-score   support

       False       1.00      1.00      1.00     18981
        True       1.00      1.00      1.00      1000

    accuracy                           1.00     19981
   macro avg       1.00      1.00      1.00     19981
weighted avg       1.00      1.00      1.00     19981


Feature Importances:
total_calls             9.0
outgoing_call_ratio     3.0
avg_duration            3.0
nocturnal_call_ratio    3.0
dtype: float64

Trained XGBoost model saved to 'fraud_detection_model_xgb_ray.json'

Process complete in 276.25 seconds.
