In [2]:
# --- Cell 1: Imports ---
print("--- Ray Tune Core API Implementation ---")

# Core Libraries
import pandas as pd
import numpy as np
import os
import joblib

# Scikit-learn for pipeline, splitting, metrics
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.metrics import recall_score
# Optional: If you added StandardScaler or other preprocessing in the pipeline
# from sklearn.preprocessing import StandardScaler

# XGBoost
import xgboost as xgb

# Imbalanced-learn for SMOTE
from imblearn.over_sampling import SMOTE

# Ray Tune
import ray
from ray import tune

print("Imports successful.")

--- Ray Tune Core API Implementation ---
Imports successful.


In [3]:
# Combined Data Loading, Preprocessing, Splitting, Feature Engineering, and Pipeline Creation

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import xgboost as xgb
import gc # For memory management

# --- Load and Sample Data ---
hf_csv_url = "hf://datasets/MatrixIA/FraudData/FraudData.csv"
df_full = pd.read_csv(hf_csv_url) # Requires huggingface_hub
sample_size = 1000000
df = df_full.head(sample_size).copy()
del df_full
gc.collect()

# --- Preprocessing (Selection, Encoding, Define X/y) ---
features_to_keep = ['step', 'type', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']
target = 'isFraud'
df_processed = df[features_to_keep + [target]].copy()
df_processed = pd.get_dummies(df_processed, columns=['type'], drop_first=True, dtype=int)
X = df_processed.drop(target, axis=1)
y = df_processed[target]
del df # Clean up intermediate dataframe
gc.collect()

# --- Train/Test Split ---
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
del X, y # Clean up intermediate variables
gc.collect()

# --- Feature Engineering ---
X_train_fe = X_train.copy()
X_test_fe = X_test.copy()
epsilon = 1e-6
X_train_fe['amt_ratio_orig'] = (X_train_fe['amount'] / (X_train_fe['oldbalanceOrg'] + epsilon)).fillna(0)
X_test_fe['amt_ratio_orig'] = (X_test_fe['amount'] / (X_test_fe['oldbalanceOrg'] + epsilon)).fillna(0)
del X_train, X_test # Clean up original splits if only feature-engineered ones are needed next
gc.collect()

# --- Build Pipeline Object ---
pipeline_steps = [
    # Add scaler here if needed: e.g., ('scaler', StandardScaler()),
    ('xgboost', xgb.XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='logloss'))
]
pipeline_obj = Pipeline(steps=pipeline_steps)

# --- Verification (Optional - Can be commented out after confirmation) ---
print(f"Data Preparation Complete.")
print(f"X_train_fe shape: {X_train_fe.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_test_fe shape: {X_test_fe.shape}")
print(f"y_test shape: {y_test.shape}")
print(f"Pipeline object created: {pipeline_obj}")

Data Preparation Complete.
X_train_fe shape: (800000, 11)
y_train shape: (800000,)
X_test_fe shape: (200000, 11)
y_test shape: (200000,)
Pipeline object created: Pipeline(steps=[('xgboost',
                 XGBClassifier(base_score=None, booster=None, callbacks=None,
                               colsample_bylevel=None, colsample_bynode=None,
                               colsample_bytree=None, device=None,
                               early_stopping_rounds=None,
                               enable_categorical=False, eval_metric='logloss',
                               feature_types=None, feature_weights=None,
                               gamma=None, grow_policy=None,
                               importance_type=None,
                               interaction_constraints=None, learning_rate=None,
                               max_bin=None, max_cat_threshold=None,
                               max_cat_to_onehot=None, max_delta_step=None,
                               max_

In [4]:
# --- Step 3: Define Ray Tune Training Function ---

import xgboost as xgb
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from sklearn.metrics import recall_score
from ray import tune

def train_fraud_model_ray(config):
    """Trains and validates one trial for Ray Tune."""
    # Assumes X_train_fe and y_train are accessible in the outer scope

    # 1. Internal Train/Validation Split
    try:
        X_tune_train, X_tune_val, y_tune_train, y_tune_val = train_test_split(
            X_train_fe, y_train, test_size=0.2, random_state=42, stratify=y_train
        )
    except NameError:
         print("ERROR in train_fraud_model_ray: Could not access X_train_fe or y_train.")
         tune.report(recall=0.0, error="Data loading failed")
         return

    # 2. Apply SMOTE to Internal Training Split
    smote = SMOTE(random_state=42)
    X_tune_train_res, y_tune_train_res = smote.fit_resample(X_tune_train, y_tune_train)

    # 3. Prepare DMatrix
    dtrain = xgb.DMatrix(X_tune_train_res, label=y_tune_train_res)
    dval = xgb.DMatrix(X_tune_val, label=y_tune_val)

    # 4. Train using xgb.train API
    evals_result = {}
    try:
        bst = xgb.train(
            params=config,
            dtrain=dtrain,
            num_boost_round=config.get("n_estimators", 100), # Use n_estimators from config
            evals=[(dval, "eval")],
            evals_result=evals_result,
            verbose_eval=False,
            early_stopping_rounds=10
        )

        # 5. Evaluate on Internal Validation Set
        y_pred_val_proba = bst.predict(dval)
        y_pred_val_labels = (y_pred_val_proba > 0.5).astype(int) # Threshold probabilities
        validation_recall = recall_score(y_tune_val, y_pred_val_labels, pos_label=1, zero_division=0)

        # 6. Report Results to Ray Tune
        tune.report(recall=validation_recall, done=True)

    except Exception as e:
        print(f"ERROR during training/evaluation in trial: {e}")
        tune.report(recall=0.0, error=str(e), done=True) # Report failure

print("Ray Tune training function 'train_fraud_model_ray' defined.")

Ray Tune training function 'train_fraud_model_ray' defined.


In [5]:
# --- Step 4: Define Ray Tune Search Space and Tuner ---
from ray import tune # Ensure tune is imported

print("\nDefining Ray Tune search space and tuner...")

# Define parameter search space using tune.* functions
# These keys MUST match the parameters expected by xgb.train within your training function
param_space = {
    # XGBoost Training Parameters (params argument for xgb.train)
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "recall"], # Track multiple metrics if desired
    "eta": tune.loguniform(1e-4, 1e-1),  # Learning rate (log scale)
    "max_depth": tune.randint(4, 12), # Integer between 4 and 11 (exclusive upper bound for randint)
    "min_child_weight": tune.choice([1, 2, 3, 4, 5]), # Choose from discrete values
    "subsample": tune.uniform(0.6, 1.0), # Float between 0.6 and 1.0
    "colsample_bytree": tune.uniform(0.6, 1.0), # Float between 0.6 and 1.0
    # Explicitly include n_estimators here for the training function to access
    "n_estimators": tune.randint(150, 501), # Integer between 150 and 500 (exclusive upper bound)
    "random_state": 42 # Fixed seed for XGBoost internal randomness (passed in config)
}

# Configure the Tuner
tuner = tune.Tuner(
    train_fraud_model_ray, # The trainable function defined in the previous step
    tune_config=tune.TuneConfig(
        metric="recall",       # Optimize based on the 'recall' key reported by tune.report
        mode="max",            # We want to maximize recall
        num_samples=15,       # Number of different hyperparameter combinations to try
        # Optional: Add scheduler for early stopping (uncomment to use)
        # from ray.tune.schedulers import ASHAScheduler
        # scheduler=ASHAScheduler(metric="recall", mode="max", grace_period=5, reduction_factor=2),
    ),
    param_space=param_space, # The search space defined above
    # Optional: Add run_config for naming experiment, storage etc.
    # run_config=ray.air.RunConfig(name="fraud_xgb_tune")
)

print("Ray Tune Tuner configured successfully.")


Defining Ray Tune search space and tuner...
Ray Tune Tuner configured successfully.


In [None]:
# --- REVISED Step 5: Run Ray Tune Experiment ---
import ray
import time

print("\nAttempting to start Ray Tune experiment (tuner.fit())...")

# --- Ensure Ray is shutdown before initializing ---
if ray.is_initialized():
    print("Shutting down existing Ray instance...")
    ray.shutdown()
    time.sleep(1) # Short pause

# --- Simplified Ray Initialization ---
print("Initializing Ray...")
try:
    ray.init(ignore_reinit_error=True)
    print("Ray initialized.")
except Exception as init_e:
    print(f"!!! CRITICAL ERROR DURING ray.init(): {init_e}")
    # If init fails, no point proceeding
    raise init_e # Stop execution here

start_tune_time = time.time()
best_result = None
results = None # Initialize results

try:
    print("Attempting tuner.fit()...")
    # This starts the hyperparameter tuning process
    results = tuner.fit() # <<< The actual tuning call >>>
    end_tune_time = time.time()
    print(f"\nRay Tune experiment tuner.fit() call finished. Total time: {end_tune_time - start_tune_time:.2f} seconds") # Note: This finishing doesn't guarantee success

    # --- Check Results AFTER tuner.fit() finishes ---
    if results is None:
         print("\nERROR: Tuner fit call completed but returned None. Tuning likely failed silently.")
    else:
        # Check if any trials resulted in errors
        if results.errors:
            print("\nWARNING: Some trials encountered errors:")
            for i, trial_result in enumerate(results):
                if trial_result.error:
                    trial_id = trial_result.trial_id if hasattr(trial_result, 'trial_id') else f"Trial_{i}"
                    print(f"- {trial_id}: {trial_result.error}")

        # Get the best result
        try:
             best_result = results.get_best_result(metric="recall", mode="max")
             if best_result:
                 print("\n--- Best Trial Information ---")
                 print(f"Best trial config: {best_result.config}")
                 print(f"Best trial final validation recall: {best_result.metrics.get('recall', 'N/A')}")
             else:
                  print("\nWARNING: No successful trials found or best result could not be determined.")
                  print("Check individual trial errors or tuning configuration.")
        except Exception as e_best:
             print(f"\nERROR retrieving best result: {e_best}")
             print("Possibly no trials completed successfully.")

except Exception as e:
    print(f"\n!!! UNEXPECTED ERROR during tuner.fit() call: {e}")
    import traceback
    traceback.print_exc() # Print full traceback for unexpected errors during fit
finally:
    # Always try to shut down Ray
    if ray.is_initialized():
        print("\nShutting down Ray instance...")
        ray.shutdown()
        print("Ray runtime shut down.")

# --- End of REVISED Cell 5 ---


Attempting to start Ray Tune experiment (tuner.fit())...
Initializing Ray...


2025-05-03 04:38:52,052	ERROR services.py:1362 -- Failed to start the dashboard , return code 3221226505
2025-05-03 04:38:52,053	ERROR services.py:1387 -- Error should be written to 'dashboard.log' or 'dashboard.err'. We are printing the last 20 lines for you. See 'https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#logging-directory-structure' to find where the log file is.
2025-05-03 04:38:52,061	ERROR services.py:1431 -- 
The last 20 lines of C:\Users\amiru\AppData\Local\Temp\ray\session_2025-05-03_04-38-50_215388_13868\logs\dashboard.log (it contains the error message from the dashboard): 
Traceback (most recent call last):
  File "E:\AI Prep\Projects\Project 1\realtime-fraud-detection-api\p1env\Lib\site-packages\ray\dashboard\dashboard.py", line 247, in <module>
    logging_utils.redirect_stdout_stderr_if_needed(
  File "E:\AI Prep\Projects\Project 1\realtime-fraud-detection-api\p1env\Lib\site-packages\ray\_private\logging_utils.py", line 47, in redi

In [1]:
import ray
if ray.is_initialized():
    ray.shutdown()
    print("Attempted Ray shutdown.")
else:
    print("Ray was not initialized or already shut down.")

Ray was not initialized or already shut down.


In [2]:
!pip install "ray[tune,scikit-learn]"




[notice] A new release of pip is available: 24.0 -> 25.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
!pip install scikit-optimize hyperopt

Collecting scikit-optimize
  Using cached scikit_optimize-0.10.2-py2.py3-none-any.whl.metadata (9.7 kB)
Collecting hyperopt
  Using cached hyperopt-0.2.7-py2.py3-none-any.whl.metadata (1.7 kB)
Collecting pyaml>=16.9 (from scikit-optimize)
  Using cached pyaml-25.1.0-py3-none-any.whl.metadata (12 kB)
Collecting networkx>=2.2 (from hyperopt)
  Using cached networkx-3.4.2-py3-none-any.whl.metadata (6.3 kB)
Collecting future (from hyperopt)
  Using cached future-1.0.0-py3-none-any.whl.metadata (4.0 kB)
Collecting tqdm (from hyperopt)
  Using cached tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting cloudpickle (from hyperopt)
  Using cached cloudpickle-3.1.1-py3-none-any.whl.metadata (7.1 kB)
Collecting py4j (from hyperopt)
  Using cached py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Using cached scikit_optimize-0.10.2-py2.py3-none-any.whl (107 kB)
Using cached hyperopt-0.2.7-py2.py3-none-any.whl (1.6 MB)
Using cached networkx-3.4.2-py3-none-any.whl (1.7 MB)
Using cached pyaml


[notice] A new release of pip is available: 24.0 -> 25.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
from ray import tune
from ray.tune.sklearn import TuneGridSearchCV  # For Ray ≥2.10
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedKFold

# Define parameter search space
param_distributions = {
    "xgboost__n_estimators": tune.randint(150, 500),
    "xgboost__max_depth": tune.randint(4, 12),
    "xgboost__learning_rate": tune.uniform(0.01, 0.2),
    "xgboost__subsample": tune.uniform(0.6, 1.0),
    "xgboost__colsample_bytree": tune.uniform(0.6, 1.0),
}

# Setup TuneGridSearchCV (Ray ≥2.10)
tune_search = TuneGridSearchCV(
    estimator=Pipeline([("xgboost", XGBClassifier(tree_method="hist"))]),
    param_grid=param_distributions,
    scoring="recall",
    cv=StratifiedKFold(3).split(X_train_fe_resampled, y_train_fe_resampled),
    n_jobs=-1,
    verbose=1,
    max_iters=15  # Number of trials
)

# Run tuning
tune_search.fit(X_train_fe_resampled, y_train_fe_resampled)
print("Best params:", tune_search.best_params_)

ModuleNotFoundError: No module named 'ray.tune.sklearn'

In [2]:
!pip show tune-sklearn

Name: tune-sklearn
Version: 0.5.0
Summary: A drop-in replacement for Scikit-Learn's GridSearchCV / RandomizedSearchCV with cutting edge hyperparameter tuning techniques.
Home-page: https://github.com/ray-project/tune-sklearn
Author: Michael Chau, Anthony Yu, and Ray Team
Author-email: ray-dev@googlegroups.com
License: Apache 2.0
Location: E:\AI Prep\Projects\Project 1\realtime-fraud-detection-api\p1renv\Lib\site-packages
Requires: numpy, ray, scikit-learn, scipy
Required-by: 


In [3]:
import ray
print(ray.__version__)  # Should be ≥2.0

2.45.0
