# Widget Parameters

### parameters

No. of Hyperparameter Settings (_max_evals_) : Number of hyperparameter settings to try (the number of models to fit). 




In [0]:
max_evals_widget = int(dbutils.widgets.get("max_evals"))

censor_training_widget = int(dbutils.widgets.get("censor_training"))
censor_validation_widget = int(dbutils.widgets.get("censor_validation"))

hyp_search_algo_widget = dbutils.widgets.get("algo")


# Spark + Hyeropt

Using spark enhanced version of xgboost and hyperparameter tuning with Hyperopt

## load data

In [0]:
def read_hive_table(schema, table_name, version):
    """
    Reads Hive table into a PySpark DataFrame. 
    (lazy eval)

    Args:
        schema (str): database name
        table_name (str): name of the Hive table.
        version (int): table version

    Returns:
        DataFrame: PySpark DataFrame.
    """
    try:
        df = spark.read \
            .format("delta") \
            .option("versionAsOf", version) \
            .option("header", "true") \
            .option("mode", "FAILFAST") \
            .table(schema + "." + table_name)
        return df
    except Exception as e:
        print(f"Error reading table '{table_name}': {str(e)}")
        return None
    
df_train = read_hive_table("sandbox","diabetestrain",2)    
df_test = read_hive_table("sandbox","diabetestest",1)    
df_val = read_hive_table("sandbox","diabetesvalidation",1)   

df_train.cache
df_test.cache
df_val.cache


<bound method DataFrame.cache of DataFrame[Pregnancies: bigint, Glucose: bigint, BloodPressure: bigint, SkinThickness: bigint, Insulin: bigint, BMI: double, DiabetesPedigreeFunction: double, Age: bigint, Outcome: bigint, abc: string]>

## data prep

general data prep such as log transform our response and include a censor function

In [0]:
def log_transform_response(spark_df, response_var):
    """
    converts response variables using log + 1, and returns new col

    Args:
        spark_df : spark dataframe
        response_var (str): numeric response/label/outcome variable

    Returns:
        Dataframe: Pyspark DataFrame containing a log transformed response
    """
    from pyspark.sql.functions import col, log1p

    logTrainDF = spark_df.withColumn(f"log_{response_var}", log1p(col(response_var)))

    return logTrainDF

In [0]:
def censor(spark_df, response_var,  censor_amt=None):
    """
    Replaces values over censor_amt with censor_amt, if no censor amt return dataframe without changes

    Args:
        spark_df : spark dataframe
        response_var (str): numeric response/label/outcome variable
        censor_amt (float/int): numeric value to set censor_amt

    Returns:
        DataFrame: PySpark DataFrame containing the data from the specified table.
    """
    from pyspark.sql.functions import when, col

    if censor_amt is None or censor_amt == 0:
        return spark_df
    
    return spark_df.withColumn(f'{response_var}', when(col(response_var) > censor_amt, censor_amt).otherwise(col(response_var)))

## spark prep


Spark requires all data to be in numeric format and all features fit under a single column. To achieve this we use transformers.

In [0]:

def column_types(spark_df, response_var):
    """
    Identifies categorical and numeric columns. Numeric columns is set to exclude response variable.

    Args:
        spark_df : spark dataframe
        response_var (str): numeric response/label/outcome variable

    Returns:
        2 vectors holding numeric and categorical column names
    """    
    categoricalCols = [field for (field, dataType) in spark_df.dtypes 
                    if dataType == "string"]

    numericCols = [field for (field, dataType) in spark_df.dtypes 
                if ((dataType in ["double","bigint"]) & (field != response_var or field !="log_" + response_var))]

    return categoricalCols, numericCols


Training with distributed processing requires additional steps, we address these steps with components below. Requirements include only numeric type data, and all features must be saved in an array in their own individual column.

Function below creates component templates (transformers) for pipeline. No actual transformations occur in this function, but rather identifies appropriate columns to be fed into transformers. Transformers include both reference to input and output columns.

E.g. categorical/string col -> StringIndexer -> OheEncoder

Fitting data to these requires the transform method. This will be done in the objective function below.


In [0]:


def spark_prep(spark_df, response_var):

    """
    Creates component templates (transformers) for pipeline. Fitting data to these requires the transform method.

    Components: 
    - StringIndexer: replaces categorical values with numeric (non-ordinal)
    - OneHotEncoder: one hot encode / dummy vars
    - VectorAssembler: creates column holding all features

    Args:
        spark_df : spark dataframe
        response_var (str): numeric response/label/outcome variable

    Returns:
        3 transformers to be applied to our dataset
    """       

    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

    cat_cols, num_cols = column_types(spark_df=spark_df, response_var=response_var)

    indexOutputCols = [x + "Index" for x in cat_cols]

    oheOutputCols = [x + "OHE" for x in cat_cols]

    stringIndexer = StringIndexer(inputCols=cat_cols, 
                                outputCols=indexOutputCols, 
                                handleInvalid="skip")
    oheEncoder = OneHotEncoder(inputCols=indexOutputCols, 
                            outputCols=oheOutputCols)

    # combine untransformed numeric columns with our ohe encoded columns
    assemblerInputs = oheOutputCols + num_cols

    vecAssembler = VectorAssembler(inputCols=assemblerInputs, 
                                outputCol="features")

    return stringIndexer, oheEncoder, vecAssembler         


# define hyperparameter search space

In [0]:
from hyperopt import hp

search_space = {
  'learning_rate': hp.uniform('learning_rate', .01, 1),
  'max_depth': hp.quniform('max_depth', 2, 10, 2),
  'min_child_weight': hp.quniform('min_child_weight', .2, 10, 2),
  'subsample': hp.uniform('subsample', .01, 1),
  'colsample_bytree': hp.uniform('colsample_bytree', .01, 1),
  'gamma': hp.choice('gamma', [0,1]),
  'scale_pos_weight': hp.choice('scale_pos_weight', [1]),
}

## define objective function

This will be fed into hyperopts _fmin()_ function. This step is what ties all our preprocessing (censoring, log transform), transformer components (stringIndexer, oheEncoder, vecAssembler), and model object (xgb_regressor) given different combinations of hyperparameters.

In [0]:
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from hyperopt import STATUS_OK
from pyspark.sql.functions import col, exp
from pyspark.ml import Pipeline

def objective(params):

    response_var = "BloodPressure"
    # Initialize the XGBoost regressor with current parameters
    xgb_regressor = SparkXGBRegressor(
        features_col="features",
        label_col=f"log_{response_var}",
        prediction_col="log_prediction",
        objective="reg:squarederror", # adjust according to your needs
        numWorkers=4,  # Adjust according to your cluster
        learning_rate=params['learning_rate'],
        max_depth=int(params['max_depth']),
        min_child_weight=params['min_child_weight'],
        subsample=params['subsample'],
        colsample_bytree=params['colsample_bytree'],
        gamma=int(params['gamma']),
        scale_pos_weight=int(params['scale_pos_weight'])        
    )

    # censor - training data
    censored_df_train = censor(spark_df = df_train, response_var = response_var,  censor_amt=censor_training_widget)

    # censor - validation data
    censored_df_val = censor(spark_df = df_val, response_var = response_var,  censor_amt=censor_validation_widget)

    # log transform response - training data
    log_df_train =log_transform_response(spark_df = censored_df_train, response_var = response_var)

    # log transform response - validation data
    log_df_val = log_transform_response(spark_df = censored_df_val, response_var = response_var)   

    # define transformers
    stringIndexer, oheEncoder, vecAssembler = spark_prep(spark_df = log_df_train , response_var = response_var)

    # Define the pipeline
    pipeline = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler, xgb_regressor])
    
    # Train the model
    model = pipeline.fit(log_df_train)

    # Make predictions
    predictions = model.transform(log_df_val)
    
    # Exponentiate
    exp_predictions = predictions.withColumn("prediction", exp(col("log_prediction"))-1)

    # Evaluate the model
    evaluator = RegressionEvaluator(
        labelCol=response_var, 
        predictionCol="prediction", 
        metricName="mae"
    )
    
    mae = evaluator.evaluate(exp_predictions)
    
    # Hyperopt minimizes the objective, so return RMSE as a loss to minimize
    return {'loss': mae, 'status': STATUS_OK}

## select a search algorithm

In [0]:
from hyperopt import tpe, rand

def hyper_search_algorithm(widget):
    if hyp_search_algo_widget == "bayesian":
        return tpe.suggest
    else:
        return rand.suggest
    
algo = hyper_search_algorithm(widget=hyp_search_algo_widget)    

## run hyperparameter tuning

In [0]:
from hyperopt import fmin, Trials, SparkTrials

# Run the hyperparameter search using the Tree of Parzen Estimators (TPE) algorithm
#trials = SparkTrials(parallelism=1)
trials = Trials() # using Trials because we are using Spark Xgboost

best_params = fmin(
    fn=objective,
    space=search_space,
    algo=algo,
    max_evals=max_evals_widget,  # Adjust based on how long you're willing to wait
    trials=trials,
    #loss_threshold=0.9999999,
    #timeout=60*100
)

print(f"Best hyperparameters:{best_params}")

  0%|          | 0/5 [00:00<?, ?trial/s, best loss=?]




 20%|██        | 1/5 [00:07<00:31,  7.83s/trial, best loss: 1.410969015237947] 40%|████      | 2/5 [00:14<00:21,  7.32s/trial, best loss: 0.5332417943669792] 60%|██████    | 3/5 [00:21<00:14,  7.26s/trial, best loss: 0.5332417943669792] 80%|████████  | 4/5 [00:28<00:06,  6.98s/trial, best loss: 0.5332417943669792]100%|██████████| 5/5 [00:34<00:00,  6.75s/trial, best loss: 0.21200174272823794]100%|██████████| 5/5 [00:34<00:00,  6.97s/trial, best loss: 0.21200174272823794]
Best hyperparameters:{'colsample_bytree': 0.2651387398101446, 'gamma': 1, 'learning_rate': 0.8293317629024581, 'max_depth': 10.0, 'min_child_weight': 0.0, 'scale_pos_weight': 0, 'subsample': 0.20325259608635998}


In [0]:
trials.best_trial

{'state': 2,
 'tid': 4,
 'spec': None,
 'result': {'loss': 0.21200174272823794, 'status': 'ok'},
 'misc': {'tid': 4,
  'cmd': ('domain_attachment', 'FMinIter_Domain'),
  'workdir': None,
  'idxs': {'colsample_bytree': [4],
   'gamma': [4],
   'learning_rate': [4],
   'max_depth': [4],
   'min_child_weight': [4],
   'scale_pos_weight': [4],
   'subsample': [4]},
  'vals': {'colsample_bytree': [0.2651387398101446],
   'gamma': [1],
   'learning_rate': [0.8293317629024581],
   'max_depth': [10.0],
   'min_child_weight': [0.0],
   'scale_pos_weight': [0],
   'subsample': [0.20325259608635998]}},
 'exp_key': None,
 'owner': None,
 'version': 0,
 'book_time': datetime.datetime(2024, 4, 19, 22, 58, 27, 163000),
 'refresh_time': datetime.datetime(2024, 4, 19, 22, 58, 33, 493000)}