In [None]:
# df_ride_info = spark.read \
#   .parquet("s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year-multiple-files/ride-info/") 

# df_ride_fare = spark.read \
#   .parquet("s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year-multiple-files/ride-fare/")

# df_model_training = df_ride_info.join(df_ride_fare, on="ride_id") \
#                                 .drop(df_ride_fare.ride_id) \
#                                 .drop(df_ride_fare.year)

# df_model_training = df_model_training \
#   .drop("ride_id") \
#   .drop("pickup_at") \
#   .drop("dropoff_at") \
#   .drop("store_and_fwd_flag")

# df_train = df_model_training # for now, we keep them the same as we want all 1 billion rows to be used for training
# df_test = df_model_training # for now, we keep them the same as we are not actually comparing RMSE between the models

# df_train, df_test = df_model_training.randomSplit([0.70, 0.30], seed = 0)

# df_train = spark.read.option("recursiveFileLookup", "true").parquet('s3://dsoaws/gsml-nyc-taxi-full-etl-ml-test-4-custompyspark-export-s3-via-notebook/export-flow-2023-03-02-03-32-10-53926e35/output/training/')

# df_test = spark.read.option("recursiveFileLookup", "true").parquet('s3://dsoaws/gsml-nyc-taxi-full-etl-ml-test-4-custompyspark-export-s3-via-notebook/export-flow-2023-03-02-03-32-10-53926e35/output/validation/')

# print("There are %d training and %d test examples." % (df_train.count(), df_test.count()))  

In [None]:
df_train = spark.read.option("recursiveFileLookup", "true") \
  .parquet('s3://dsoaws/nyc-taxi-orig-cleaned-dropped-parquet-all-years-multiple-files-1TB/')

df_train.cache()

df_train.count()

In [None]:
# adjust for too much data above ^^
# df_train, _ = df_train.randomSplit([0.70, 0.30], seed = 0)

# _, df_test = df_test.randomSplit([0.70, 0.30], seed = 0)

# print("There are %d training and %d test examples." % (df_train.count(), df_test.count()))  

In [None]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
import xgboost
from xgboost.spark import SparkXGBRegressor

featuresCols = df_train.columns
featuresCols.remove('total_amount')

vectorAssembler = VectorAssembler(inputCols=featuresCols,
                                  outputCol="rawFeatures",
                                  handleInvalid="skip")

vectorIndexer = VectorIndexer(inputCol="rawFeatures", 
                              outputCol="features", 
                              maxCategories=2147483647, # Max Java Int (limitation of this XGB impl)                              
                              #maxCategories=100, 
                              handleInvalid="skip")

xgb_regressor = SparkXGBRegressor(num_workers=480,
                                  label_col="total_amount", 
                                  missing=0.0,
                                  eta=0.2,
                                  gamma=4,
                                  max_depth=5,
                                  min_child_weight=6,
                                  num_round=50,
                                  objective='reg:squarederror',
                                  subsample=0.7)

pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, xgb_regressor])      

pipelineModel = pipeline.fit(df_train)

In [None]:
featuresCols

## Predict

In [None]:
# predictions = pipelineModel.transform(df_test)

# display(predictions.select("total_amount", "prediction", *featuresCols))

## Evaluate root mean squared error (`rmse`)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol=xgb_regressor.getLabelCol(),
                                predictionCol=xgb_regressor.getPredictionCol())
rmse = evaluator.evaluate(predictions)

print("RMSE on our test set: %g" % rmse)

## Save and reload the model

In [None]:
%sh
rm -rf /dbfs/tmp/xgboost/pipeline_001
rm -rf /dbfs/tmp/xgboost/pipelineModel_001

In [None]:
# Save the pipeline that created the model
pipeline.save('/tmp/xgboost/pipeline_001')

# Save the model itself
pipelineModel.save('/tmp/xgboost/pipelineModel_001')

# Load the pipeline
loaded_pipeline = Pipeline.load('/tmp/xgboost/pipeline_001')

## Predict from loaded pipeline

In [None]:
# # Load and use the model
# from pyspark.ml import PipelineModel

# loaded_pipelineModel = PipelineModel.load('/tmp/xgboost/pipelineModel_001')

# # To represent new data, use the first 3 rows of the test dataset
# new_data = df_test.limit(3)

# # Make predictions with the loaded model
# new_preds = loaded_pipelineModel.transform(new_data)
# display(new_preds.select("total_amount", "prediction", *featuresCols))