# Step 1 Load the data

In [0]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr, dayofweek, weekofyear, date_format, asc, udf, lit, month
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql.types import IntegerType
import datetime
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Specify Azure Blob Storage account details
storage_account_name = "qlintaxi"
storage_account_access_key = "pMMk6yCETTB5NBV4q4HT1Wor8G/oGClcBQ2MR0jBygsW0fb7F5fNfn001nlBj5G7OpIqSh1YSLzm+ASt4NoAwg=="
container_name = "taxi"

# Configure the Azure Blob Storage account credentials
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_access_key
)

# Build the storage file path
taxi_file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/clean_data.parquet"
# Read the Parquet data
taxi_df = spark.read.parquet(taxi_file_path)

# Step 2 Join the Data

In [0]:
# Get the Taxi Zone
zone_file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/taxi-zone-lookup.csv"

# Read the CSV data
zone_df = spark.read.csv(zone_file_path, header=True, inferSchema=True)

In [0]:
# Join filtered weather data with df_2017_Jan_Feb on matching datetime values
final_df = taxi_df.join(zone_df, taxi_df.puLocationId == zone_df.LocationID)

# Drop the datetime column from the joined_df DataFrame
final_df = final_df.drop("puLocationId", "LocationID", "Borough")

In [0]:
from pyspark.sql.functions import year, month, dayofmonth, hour, to_timestamp
# Convert 'Date_Hour' from string to timestamp if it's not already
final_df = final_df.withColumn('Date_Hour', to_timestamp(col('Date_Hour'), 'yyyy-MM-dd\'T\'HH:mm:ss.SSSXXX'))

# Extract year, month, day, and hour from 'Date_Hour'
final_df = final_df.withColumn('Year', year(col('Date_Hour')))
final_df = final_df.withColumn('Month', month(col('Date_Hour')))
final_df = final_df.withColumn('Day', dayofmonth(col('Date_Hour')))
final_df = final_df.withColumn('Hour', hour(col('Date_Hour')))

# Sort the DataFrame by Date_Hour in ascending order
final_df = final_df.orderBy('Date_Hour')


In [0]:
display(final_df)

# Step 3 Modeling

## 3.1 Train/Valid/Test Split

In [0]:

# Ensure the DataFrame is ordered by time
final_df = final_df.orderBy("Date_Hour")

# Calculate split dates
total_rows = final_df.count()
train_split_index = int(total_rows * 0.7)
val_split_index = train_split_index + int(total_rows * 0.15)

# Use Window function to assign row numbers
windowSpec = Window.orderBy("Date_Hour")
final_df = final_df.withColumn("row_number", F.row_number().over(windowSpec))

# Find the dates corresponding to split indices
train_split_date = final_df.where(col("row_number") == train_split_index).select("Date_Hour").collect()[0]["Date_Hour"]
val_split_date = final_df.where(col("row_number") == val_split_index).select("Date_Hour").collect()[0]["Date_Hour"]

# Remove the temporary column
final_df = final_df.drop("row_number")


In [0]:
# Split the data
train_data = final_df.where(col("Date_Hour") <= train_split_date)
validation_data = final_df.where((col("Date_Hour") > train_split_date) & (col("Date_Hour") <= val_split_date))
test_data = final_df.where(col("Date_Hour") > val_split_date)

# Verify the splits by checking the min and max dates in each DataFrame
# train_data.select(F.min("Date_Hour"), F.max("Date_Hour")).show()
# validation_data.select(F.min("Date_Hour"), F.max("Date_Hour")).show()
# test_data.select(F.min("Date_Hour"), F.max("Date_Hour")).show()


In [0]:
# Indexing the 'Zone' column
zone_indexer = StringIndexer(inputCol='Zone', outputCol='ZoneIndexed')

# One-hot encoding the indexed 'Zone' column (optional depending on the model and size of the dataset)
zone_encoder = OneHotEncoder(inputCols=['ZoneIndexed'], outputCols=['ZoneVec'])

# Update the assembler with the new columns
assembler = VectorAssembler(
    inputCols=['Year', 'Month', 'Day', 'Hour', 'day_of_week', 'week_number', 'is_weekend', 'is_holiday', 'ZoneVec'],
    outputCol='features'
)


## 3.2 XGBoost with Hyperopt

Step 1: Define the Search Space for Hyperopt

In [0]:
from hyperopt import hp, fmin, tpe, STATUS_OK, Trials, SparkTrials
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np

def get_or_create_spark_session():
    return SparkSession.builder.getOrCreate()
spark = SparkSession.builder.getOrCreate()
# Define the search space
space = {
    'max_depth': hp.quniform('max_depth', 3, 10, 1),  # Clear range, requires rounding
    'learning_rate': hp.loguniform('learning_rate', np.log(0.01), np.log(0.2)),  # Correctly specified log-uniform
    'subsample': hp.uniform('subsample', 0.5, 1),  # Continuous, as it makes sense for ratios
    'colsample_bytree': hp.uniform('colsample_bytree', 0.5, 1),  # Continuous, same reason
    'gamma': hp.uniform('gamma', 0, 5),  # Continuous, assuming a wide exploration is needed
    'n_estimators': hp.quniform('n_estimators', 100, 1000, 1),  # Range with rounding, for clarity
    'min_child_weight': hp.uniform('min_child_weight', 1, 10)  # Continuous, for more nuanced control
}

Step 2: Define the Objective Function for Hyperop

In [0]:
import mlflow

def objective(hyperparams):
    # Ensure MLflow is tracking to the correct experiment
    mlflow.set_experiment('/Users/qifan.lin@vanderbilt.edu/XGBoost Tuning')

    with mlflow.start_run(nested=True):
        # Log only the hyperparameters being tested
        mlflow.log_params(hyperparams)
        
        # Create and train the model
        xgbRegressor = SparkXGBRegressor(
            features_col="features",
            label_col="demand",
            max_depth=int(hyperparams['max_depth']),
            learning_rate=hyperparams['learning_rate'],
            subsample=hyperparams['subsample'],
            colsample_bytree=hyperparams['colsample_bytree'],
            gamma = hyperparams['gamma'],
            n_estimators = int(hyperparams['n_estimators']),
            min_child_weight = hyperparams['min_child_weight'],
            verbosity=0,
            objective='reg:squarederror'    
        )
       
       # Use the stages from before, just replace the XGBRegressor with new params
        pipeline = Pipeline(stages=[zone_indexer, zone_encoder, assembler, xgbRegressor])
    
        # Train the model
        model = pipeline.fit(train_data)
        
        # Evaluate the model on the validation set
        val_predictions = model.transform(validation_data)
        evaluator = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="rmse")
        val_rmse = evaluator.evaluate(val_predictions)
        
        # Log the key metric
        mlflow.log_metric("validation_rmse", val_rmse)

    return {'loss': val_rmse, 'status': STATUS_OK}

Step 3: Run Hyperopt for Hyperparameter Tuning

In [0]:
import mlflow

mlflow.set_experiment('/Users/qifan.lin@vanderbilt.edu/XGBoost Tuning')

trials = Trials()
# Start a parent run for the hyperparameter tuning process
with mlflow.start_run(run_name='Hyperparameter Tuning Parent Run'):
    best_hyperparams = fmin(fn=objective, 
                            space=space, 
                            algo=tpe.suggest, 
                            max_evals=128, 
                            trials=trials)
    mlflow.log_params(best_hyperparams)  # Optionally log the best hyperparameters at the parent run level


In [0]:
print(best_hyperparams)

Step 4: Train the Final Model and Evaluate on Test Data


In [0]:
# Combine training and validation sets for final model training
final_train_data = train_data.union(validation_data)

# Train the final model using the best hyperparameters found
best_hyperparams = {
    'max_depth': int(best_hyperparams['max_depth']),
    'learning_rate': best_hyperparams['learning_rate'],
    'subsample': best_hyperparams['subsample'],
    'colsample_bytree': best_hyperparams['colsample_bytree'],
    'gamma': best_hyperparams['gamma'],
    'n_estimators': int(best_hyperparams['n_estimators']),
    'min_child_weight': best_hyperparams['min_child_weight'],
}

# Re-create the model with the best hyperparameters
final_xgbRegressor = SparkXGBRegressor(
    features_col="features",
    label_col="demand",
    maxDepth=best_hyperparams['max_depth'],
    learning_rate=best_hyperparams['learning_rate'],
    subsample=best_hyperparams['subsample'],
    colsampleBytree=best_hyperparams['colsample_bytree'],
    gamma=best_hyperparams['gamma'],
    numRound=best_hyperparams['n_estimators'],
    minChildWeight=best_hyperparams['min_child_weight'],
    objective='reg:squarederror'
)

final_pipeline = Pipeline(stages=[zone_indexer, zone_encoder, assembler, final_xgbRegressor])

# Train the final model on the combined training and validation dataset
final_model = final_pipeline.fit(final_train_data)

# Evaluate the final model on the test set
test_predictions = final_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="rmse")
test_rmse = evaluator.evaluate(test_predictions)
print("Test RMSE:", test_rmse)

# Step 4 Model Saving

In [0]:
%sh
rm -rf /dbfs/tmp/xgboost/pipeline_001
rm -rf /dbfs/tmp/xgboost/pipelineModel_001

In [0]:
# Save the pipeline that created the model
final_pipeline.save('/tmp/xgboost/pipeline_001')
 
# Save the model itself
final_model.save('/tmp/xgboost/pipelineModel_001')