In [1]:
import pandas as pd
import xgboost as xgb

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import  StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator

# from pyspark.ml.torch.distributor import TorchDistributor

import mlflow
import mlflow.spark
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll import scope

# from utils import read_process_df, prepare_data, evaluate_model, train_model

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Setting default log level to "WARN".
# To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

In [4]:
spark = SparkSession.builder.appName('modelling').config("spark.driver.memory", "2g").config("spark.executor.memory", "6g")\
    .config('spark.executor.cores',2).config('spark.default.parallelism',4).config("spark.jars.packages", "org.mlflow:mlflow-spark:2.2.0")\
       .getOrCreate() # .config("spark.sql.shuffle.partitions",10)
spark

your 131072x1 screen size is bogus. expect trouble
23/09/09 22:23:34 WARN Utils: Your hostname, Bhaiyu resolves to a loopback address: 127.0.1.1; using 172.17.120.207 instead (on interface eth0)
23/09/09 22:23:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/abhishek-wsl/miniconda3/envs/mlops/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/abhishek-wsl/.ivy2/cache
The jars for the packages stored in: /home/abhishek-wsl/.ivy2/jars
org.mlflow#mlflow-spark added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e42da452-aa6b-4da8-9732-153723f9a5f6;1.0
	confs: [default]
	found org.mlflow#mlflow-spark;2.2.0 in central
	found org.slf4j#slf4j-api;1.7.25 in central
:: resolution report :: resolve 178ms :: artifacts dl 8ms
	:: modules in use:
	org.mlflow#mlflow-spark;2.2.0 from central in [default]
	org.slf4j#slf4j-api;1.7.25 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-------------------------------------------------------------------

In [5]:
#mlflow.spark.autolog()

In [6]:
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment(experiment_name="modelling_duration_exp")

<Experiment: artifact_location='/home/abhishek-wsl/codes/MLops_project/final_codes/mlruns/1', creation_time=1693228766019, experiment_id='1', last_update_time=1693228766019, lifecycle_stage='active', name='modelling_duration_exp', tags={}>

In [7]:
spark.sparkContext.setLogLevel("ERROR")

In [8]:
# spark = SparkSession.builder.appName('modelling').config("spark.driver.memory", "5g").config("spark.rapids.force.caller.classloader","false" )\
#     .config("spark.executor.extraClassPath", r"\home\abhishek-wsl\rapid_plugin\rapids-4-spark_2.12-23.08.1.jar")\
#     .config("spark.plugins","com.nvidia.spark.SQLPlugin")\
#         .config("spark.rapids.sql.enabled","true").config("spark.driver.resource.gpu.discoveryScript","./discover_gpu.sh")\
#     .config("spark.executor.resource.gpu.amount", "2")\
#     .config("spark.driver.resource.gpu.amount", "1").config("spark.sql.shuffle.partitions",10).getOrCreate()
# spark

In [9]:
spark.conf.get("spark.driver.memory")

'2g'

In [10]:
#df.printSchema()
#df.show(5)
#df.select('VendorID').distinct().show()
#df.select('duration').summary("count", "min", "1%","25%", "50%", "75%", "95%", "98%",  "99%","max").show()

- minimum duration is -54
- max looks like heavily skewed
- lets consider minimum as 0.05 min (1 percentile) and max as 82 mins (99 percentile)

In [11]:
def read_process_df(path):
    df = spark.read.format('parquet').load(path)
    df = df.select('VendorID','lpep_pickup_datetime','lpep_dropoff_datetime','PULocationID','DOLocationID','trip_distance')
    df = df.withColumn('duration',\
        round((col('lpep_dropoff_datetime')-col('lpep_pickup_datetime'))\
        .cast("long")/60,2))
    df = df.filter(col('duration')>=0.05).filter(col('duration')<=82)
    df = df.withColumn('PU_DO',concat(col('PULocationID'),lit('_'),col('DOLocationID')))
    df = df.withColumn('pu_hour',hour(col('lpep_pickup_datetime')))
    df = df.withColumn('pu_weekday',dayofweek(col('lpep_pickup_datetime')))
    
    df = df.select('VendorID','pu_hour','pu_weekday','PU_DO', 'trip_distance','duration')
    # y = df.select('')

    print(df.count(), len(df.columns))
    return df

In [12]:
# pdf = sdf.toPandas() #.describe()
# sdf = spark.createDataFrame(pdf)

In [13]:
def prepare_data(df_processed,categorical_cols,indexer_final=None,encoder_final=None, is_test=True):
    indexers = [StringIndexer(inputCol=col,outputCol=col+'_index').fit(df_processed) for col in categorical_cols]
    if is_test:
        df_processed = df_processed.dropna(subset='duration')
        [indexer.setHandleInvalid("keep") for indexer in indexers]
    indexer_pipeline = Pipeline(stages=indexers)
    if indexer_final==None:
        indexer_final = indexer_pipeline.fit(df_processed)
    
    indexed_df = indexer_final.transform(df_processed)

    encoder = [OneHotEncoder(inputCol=col+'_index',outputCol=col+'_onehot') for col in categorical_cols]
    encoder_pipeline = Pipeline(stages = encoder)
    if encoder_final==None:
        encoder_final = encoder_pipeline.fit(indexed_df)
    encoded_df = encoder_final.transform(indexed_df)
    
    return encoded_df, indexer_final, encoder_final
    

In [14]:
def train_model(encoded_df,feature_cols,label_col,regressor = LinearRegression,**kwargs):
    assembler = VectorAssembler(inputCols = feature_cols, outputCol = 'features')
    regressor = regressor(featuresCol = 'features', labelCol= label_col,**kwargs )
    pipeline = Pipeline(stages = [assembler,regressor])
    model = pipeline.fit(encoded_df)
    return model

In [15]:
def evaluate_model(model,encoded_df,label_col,metric='rmse'):
    predictions = model.transform(encoded_df)
    evaluator = RegressionEvaluator(labelCol=label_col,predictionCol='prediction',metricName=metric)
    out = evaluator.evaluate(predictions)
    print(f"{metric}  : {out}")
    
    return out


In [16]:
df_train_processed = read_process_df('/home/abhishek-wsl/codes/MLops_project/data/*.parquet')
df_test_processed1 = read_process_df('/home/abhishek-wsl/codes/MLops_project/data/test_data/green_tripdata_2022-01.parquet')
df_test_processed2 = read_process_df('/home/abhishek-wsl/codes/MLops_project/data/test_data/green_tripdata_2023-01.parquet')

categorical_cols = ['VendorID','pu_hour','pu_weekday','PU_DO']
encoded_df_train, indexer_final, encoder_final = prepare_data(df_train_processed,categorical_cols)
encoded_df_test1, _, _ = prepare_data(df_test_processed1,categorical_cols,indexer_final,encoder_final,is_test=True)
encoded_df_test2, _, _ = prepare_data(df_test_processed2,categorical_cols,indexer_final,encoder_final,is_test=True)


                                                                                

333262 6
61753 6
67613 6


                                                                                

In [17]:
feature_cols = ['trip_distance','VendorID_onehot','pu_hour_onehot','pu_weekday_onehot','PU_DO_onehot']
label_col = 'duration'

### Experimentation

In [18]:
# feature_cols = ['trip_distance','VendorID_onehot','pu_hour_onehot','pu_weekday_onehot','PU_DO_onehot']
# label_col = 'duration'
# def objective(params):

#     with mlflow.start_run(run_name="linear regression hyperopt"):
#         mlflow.set_tag("model","linear regression")

#         mlflow.log_params(params)
#         reg = train_model(encoded_df_train,feature_cols,label_col,regressor = LinearRegression,maxIter=100,**params)

#         rmse = evaluate_model(reg,encoded_df_test1,label_col,metric='rmse')
#         mlflow.log_metric("val_rmse",rmse)
#     return {'loss':rmse, 'status':STATUS_OK}

In [19]:
# search_space = {
#     'regParam': hp.loguniform('regParam',-2,-1),
#     'fitIntercept': hp.choice('fitIntercept',[True,False]),
#     'elasticNetParam': hp.uniform('elasticNetParam',0,1),
#     # 'objective': 'reg:linear',
#     # 'seed':7
# }
# best_result = fmin(
#     fn=objective,
#     space=search_space,
#     algo= tpe.suggest,
#     max_evals=50,
#     trials=Trials()
# )


In [20]:
# feature_cols = ['trip_distance','VendorID_onehot','pu_hour_onehot','pu_weekday_onehot','PU_DO_onehot']
# label_col = 'duration'
# def objective(params):

#     with mlflow.start_run(run_name="random forest hyperopt"):
#         mlflow.set_tag("model","random forest")

#         mlflow.log_params(params)
#         reg = train_model(encoded_df_train,feature_cols,label_col,regressor = RandomForestRegressor,**params)

#         rmse = evaluate_model(reg,encoded_df_test1,label_col,metric='rmse')
#         mlflow.log_metric("val_rmse",rmse)
#     return {'loss':rmse, 'status':STATUS_OK}

In [21]:
# feature_cols = ['trip_distance','VendorID_onehot','pu_hour_onehot','pu_weekday_onehot','PU_DO_onehot']
# label_col = 'duration'
# def objective(params):

#     with mlflow.start_run(run_name="random forest hyperopt"):
#         mlflow.spark.autolog()
#         reg = train_model(encoded_df_train,feature_cols,label_col,regressor = RandomForestRegressor,**params)

#         rmse = evaluate_model(reg,encoded_df_test1,label_col,metric='rmse')
#         #mlflow.log_metric("val_rmse",rmse)
#     return {'loss':rmse, 'status':STATUS_OK}

In [22]:
# search_space = {
#     'maxDepth': scope.int(hp.quniform('maxDepth', 4, 40, 1)),
#     'numTrees': scope.int(hp.quniform("numTrees", 5, 60, 5)),
#     'minInfoGain': hp.uniform('minInfoGain', 0.4, 1),
#     'minInstancesPerNode': scope.int(hp.quniform("minInstancesPerNode", 500, 10000, 10)),
#     # 'objective': 'reg:linear',
#     'seed': 7
# }
# best_result = fmin(
#     fn=objective,
#     space=search_space,
#     algo= tpe.suggest,
#     max_evals=50,
#     trials=Trials()
# )


In [23]:
# feature_cols = ['trip_distance','VendorID_onehot','pu_hour_onehot','pu_weekday_onehot','PU_DO_onehot']
# label_col = 'duration'
# with mlflow.start_run(run_name="gbt_model_default_params"):
#     mlflow.set_tag("developer","Abhishek Singh")

#     mlflow.log_param("training_data_path","/home/abhishek-wsl/codes/MLops_project/data/*.parquet")
#     mlflow.log_param("val_data_path","/home/abhishek-wsl/codes/MLops_project/data/test_data/green_tripdata_2022-01.parquet")

#     #reg_name = GBTRegressor
#     fitIntercept = True
#     standardization = True
#     # mlflow.log_params({'fitIntercept':fitIntercept,'standardization':standardization})
#     lr_model = train_model(encoded_df_train,feature_cols,label_col,fitIntercept=fitIntercept,standardization=standardization)
#     # rf_model = train_model(encoded_df_train,feature_cols,label_col,regressor=RandomForestRegressor)
#     #gbt_model = train_model(encoded_df_train,feature_cols,label_col,regressor=GBTRegressor)
#     # mlflow.reg_name.log_model(lr_model,'model')

#     rmse = evaluate_model(lr_model,encoded_df_test1,label_col,metric='rmse')

#     mlflow.log_metric("val_rmse",rmse)

- Note: autolog() in mlflow-spark currently does not support Spark ml models: mlflow.spark.autolog()

- The proper way would be to train and experiment models using sklearn/tensorflow 
    - and log model as mlflow model
        - Then deploy using spark

In [24]:
# params = {'elasticNetParam':	0.07666740311526124,
#             'fitIntercept':	True,
#             'regParam':	0.1452221778539822}



# with mlflow.start_run(run_name="gbt regression"):
#     mlflow.set_tag("developer","Abhishek Singh")
#     mlflow.set_tag("model","GBT")
#     # mlflow.log_params(params)

#     reg = train_model(encoded_df_train,feature_cols,label_col,regressor = GBTRegressor)
#     rmse = evaluate_model(reg,encoded_df_test1,label_col,metric='rmse')
#     mlflow.log_metric("val_rmse",rmse)

#     # reg.write().overwrite().save( "/home/abhishek-wsl/codes/MLops_project/trained_models/lr_PipelineModel")
#     # mlflow.log_artifact(local_path="/home/abhishek-wsl/codes/MLops_project/trained_models/lr_PipelineModel",artifact_path='lr1_spark/')
#     mlflow.log_artifact("/home/abhishek-wsl/codes/MLops_project/trained_models/stringindexer_PipelineModel", artifact_path='preprocessors')
#     mlflow.log_artifact("/home/abhishek-wsl/codes/MLops_project/trained_models/encoderindexer_PipelineModel", artifact_path='preprocessors')

#     mlflow.spark.log_model(reg,artifact_path='gbt_spark_final')

In [25]:


params = {'elasticNetParam':	0.07666740311526124,
            'fitIntercept':	True,
            'regParam':	0.1452221778539822}



with mlflow.start_run(run_name="linear regression final"):
    mlflow.set_tag("developer","Abhishek Singh")
    mlflow.set_tag("model","linear regression")
    mlflow.log_params(params)

    reg = train_model(encoded_df_train,feature_cols,label_col,regressor = LinearRegression,maxIter=100,**params)
    rmse = evaluate_model(reg,encoded_df_test1,label_col,metric='rmse')
    mlflow.log_metric("val_rmse",rmse)

    # reg.write().overwrite().save( "/home/abhishek-wsl/codes/MLops_project/trained_models/lr_PipelineModel")
    # mlflow.log_artifact(local_path="/home/abhishek-wsl/codes/MLops_project/trained_models/lr_PipelineModel",artifact_path='lr1_spark/')
    # mlflow.log_artifact("/home/abhishek-wsl/codes/MLops_project/trained_models/stringindexer_PipelineModel", artifact_path='preprocessors')
    # mlflow.log_artifact("/home/abhishek-wsl/codes/MLops_project/trained_models/encoderindexer_PipelineModel", artifact_path='preprocessors')

    mlflow.spark.log_model(reg,artifact_path='lr_spark_final')
    mlflow.spark.log_model(indexer_final,artifact_path='indexer')
    mlflow.spark.log_model(encoder_final,artifact_path='encoder')

In [26]:
# mlflow.spark.autolog(disable=True)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/abhishek-wsl/miniconda3/envs/mlops/lib/python3.9/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/abhishek-wsl/miniconda3/envs/mlops/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/abhishek-wsl/miniconda3/envs/mlops/lib/python3.9/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [37]:
import mlflow
logged_model = 'runs:/eff1613c78114ce39c6b0b622b9d294e/lr_spark_final'

# Load model
loaded_model = mlflow.spark.load_model(logged_model)

2023/09/02 21:35:08 INFO mlflow.spark: 'runs:/eff1613c78114ce39c6b0b622b9d294e/lr_spark_final' resolved as '/home/abhishek-wsl/codes/MLops_project/final_codes/mlruns/1/eff1613c78114ce39c6b0b622b9d294e/artifacts/lr_spark_final'
2023/09/02 21:35:08 INFO mlflow.spark: URI 'runs:/eff1613c78114ce39c6b0b622b9d294e/lr_spark_final/sparkml' does not point to the current DFS.
2023/09/02 21:35:08 INFO mlflow.spark: File 'runs:/eff1613c78114ce39c6b0b622b9d294e/lr_spark_final/sparkml' not found on DFS. Will attempt to upload the file.


In [38]:
loaded_model

PipelineModel_e9fe1b0caf0a

In [41]:
# Perform inference via model.transform()
loaded_model.transform(encoded_df_test1).show(5)

+--------+-------+----------+-------+-------------+--------+--------------+-------------+----------------+-----------+---------------+---------------+-----------------+-------------------+--------------------+------------------+
|VendorID|pu_hour|pu_weekday|  PU_DO|trip_distance|duration|VendorID_index|pu_hour_index|pu_weekday_index|PU_DO_index|VendorID_onehot| pu_hour_onehot|pu_weekday_onehot|       PU_DO_onehot|            features|        prediction|
+--------+-------+----------+-------+-------------+--------+--------------+-------------+----------------+-----------+---------------+---------------+-----------------+-------------------+--------------------+------------------+
|       2|      0|         7|  42_42|         0.44|     1.2|           0.0|         17.0|             5.0|       25.0|  (3,[0],[1.0])|(24,[17],[1.0])|    (7,[5],[1.0])| (14288,[25],[1.0])|(14323,[0,1,21,33...|1.8948750528738465|
|       1|      0|         7| 116_41|          2.1|    8.72|           1.0|         

In [45]:
rmse = evaluate_model(loaded_model,encoded_df_test2,label_col,metric='rmse')

rmse  : 6.238286150945589


In [None]:
save_models=False
if save_models:
    lr_model.save( "/home/abhishek-wsl/codes/MLops_project/trained_models/lr_PipelineModel")
    #rf_model.save("/home/abhishek-wsl/codes/MLops_project/trained_models/rf_PipelineModel")
    indexer_final.save("/home/abhishek-wsl/codes/MLops_project/trained_models/stringindexer_PipelineModel")
    encoder_final.save("/home/abhishek-wsl/codes/MLops_project/trained_models/encoderindexer_PipelineModel")

In [None]:
print('training rmse using linear regresion model:')
_ = evaluate_model(lr_model,encoded_df_train,label_col,metric='rmse')
# print('training rmse using random forest regresion model:')
# _ = evaluate_model(rf_model,encoded_df_train,label_col,metric='rmse')

training rmse using linear regresion model:


NameError: name 'lr_model' is not defined

In [None]:
print('test rmse Jan 2022 using linear regresion model:')
_ = evaluate_model(lr_model,encoded_df_test1,label_col,metric='rmse')
# print('test rmse Jan 2022 using random forest regresion model:')
# _ = evaluate_model(rf_model,encoded_df_test1,label_col,metric='rmse')

test rmse Jan 2022 using linear regresion model:


23/08/27 00:03:06 WARN DAGScheduler: Broadcasting large task binary with size 1646.3 KiB


rmse  : 7.664661368974521


In [None]:
print('test rmse Jan 2023 using linear regresion model:')
_ = evaluate_model(lr_model,encoded_df_test2,label_col,metric='rmse')
# print('test rmse Jan 2023 using random forest regresion model:')
# _ = evaluate_model(rf_model,encoded_df_test2,label_col,metric='rmse')

test rmse Jan 2023 using linear regresion model:


23/08/27 00:03:22 WARN DAGScheduler: Broadcasting large task binary with size 1646.3 KiB


rmse  : 6.429551134903919


- test rmse Jan 2022 using linear regresion model:
rmse  : 7.664661368978054
- test rmse Jan 2022 using random forest regresion model:
rmse  : 7.
- test rmse Jan 2023 using linear regresion model:
rmse  : 6.429551134910861
- test rmse Jan 2023 using random forest regresion model:
rmse  : 6.67
##### By looking at the test results we can say linear regression is performing better in this case

In [None]:
stop_session = False
if stop_session:
    spark.stop()

In [None]:
#df_processed.show(5)

In [None]:
#encoded_df.show(5)

In [None]:
# print(encoded_df.rdd.getNumPartitions())
# encoded_df = encoded_df.repartition(100)

In [None]:
# spark.conf.get("spark.storage.memoryFraction")
# from pyspark import StorageLevel
# encoded_df = encoded_df.rdd.persist(StorageLevel.MEMORY_AND_DISK)
# encoded_df.count()


In [None]:
# print("RDD Storage Level:", encoded_df.getStorageLevel())

In [None]:
# feature_cols = ['trip_distance','VendorID_onehot','pu_hour_onehot','pu_weekday_onehot','PU_DO_onehot']
# label_col = 'duration'
# assembler = VectorAssembler(inputCols = feature_cols, outputCol = 'features')
# regressor = RandomForestRegressor(featuresCol = 'features', labelCol= 'duration' )
# pipeline = Pipeline(stages = [assembler,regressor])
# model = pipeline.fit(encoded_df)

In [None]:
import tensorflow as tf

ModuleNotFoundError: No module named 'tensorflow'