In [0]:
#reading file from dbfs
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession

df_path= "/dbfs/mnt/bde2/combined_df"
combined_df= spark.read.parquet(df_path)

In [0]:
#number of columns 
combined_df.columns

Out[30]: ['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge',
 'year',
 'weekday',
 'hour',
 'month',
 'day_of_year',
 'trip_duration_sec',
 'trip_distance_km',
 'trip_duration_min',
 'speed',
 'Borough_pu',
 'Zone_pu',
 'service_zone_pu',
 'Borough_do',
 'Zone_do',
 'service_zone_do',
 'airport',
 'airport_fee',
 'colour']

# Data Prepration

In [0]:
#Copy df into new df called combined_df_cleaned
combined_df_cleaned = combined_df

In [0]:
# List of columns to drop
columns_to_drop = ['fare_amount', 'trip_duration_sec', 'trip_distance']

# Drop the specified columns using the `select` method
combined_df_cleaned = combined_df_cleaned.select([col for col in combined_df_cleaned.columns if col not in columns_to_drop])

In [0]:
# wanted columns
cols_list=[
  'weekday',
 'hour',
 'year',
 'month',
 'trip_distance_km',
 'trip_duration_min',
 'total_amount',
 'speed',
 'Zone_pu']



In [0]:
#subset df
combined_df_cleaned = combined_df_cleaned.select(cols_list)

In [0]:
######################################################################################
###Start small to build and validate your code before running on the entire dataset.##
######################################################################################

num_cols = [
 'weekday',
 'hour',
 'year',
 'month',
 'trip_distance_km',
 'trip_duration_min',
 'speed']



cat_cols =[
'Zone_pu'
]

In [0]:
# Subset the dataframe with the columns list cols_list
combined_df=combined_df.select(cols_list)
combined_df.show()

+-------+----+----+-----+------------------+------------------+------------+------------------+--------------------+
|weekday|hour|year|month|  trip_distance_km| trip_duration_min|total_amount|             speed|             Zone_pu|
+-------+----+----+-----+------------------+------------------+------------+------------------+--------------------+
|      6|   0|2015|    5|1.7541806000000002| 5.083333333333333|         7.8| 20.70508249180328|Williamsburg (Nor...|
|      6|   0|2015|    5|         9.2215182|28.816666666666666|       27.96|19.200384916136496|Williamsburg (Nor...|
|      6|   0|2015|    5|          3.781949|             10.35|        10.8| 21.92434202898551|   East Williamsburg|
|      6|   0|2015|    5|         4.3130312|             12.65|       15.38|20.457064980237153|      Bushwick South|
|      6|   0|2015|    5| 7.628271600000001|25.083333333333332|        20.8|18.247028411960137|Williamsburg (Nor...|
|      6|   0|2015|    5|        10.4124298| 42.56666666666667| 

In [0]:
from pyspark.sql.functions import col
#split the data
train_df = combined_df_cleaned.filter(
    (col("tpep_pickup_datetime") >= "2020-01-01") &
    (col("tpep_pickup_datetime") < "2022-12-31")
)

# only keep data after 2020

In [0]:
#import libraries
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

#create empty stage
stages = []

#Iterate through cat_cols and instantiate StringIndexer and OneHotEncoder for each column and them to stages
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoder(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]

#Create a new list called cat_cols_ohe that will add the suffix _ohe to each element of cat_cols
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

#initiate vector assembler
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")

#Add assembler to stages
stages += [assembler]

#pipeline

pipeline = Pipeline(stages=stages)

In [0]:
#Fit the pipeline with dataframes
pipeline_model_train = pipeline.fit(train_df)


In [0]:
#Apply pipeline to dataframes
train_df= pipeline_model_train.transform(train_df)


In [0]:
from pyspark.sql.functions import col
#split the data Time period 1: From '2020-01-01' to '2022-10-01'
train_data = train_df.filter(
    (col("tpep_pickup_datetime") >= "2020-01-01") &
    (col("tpep_pickup_datetime") < "2022-10-01")
)

# # Time period 2: From '2022-10-01' to '2022-12-31'
test_data = train_df.filter(
    (col("tpep_pickup_datetime") >= "2022-10-01") &
    (col("tpep_pickup_datetime") <= "2022-12-31"))

In [0]:
#train LinearRegression
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='total_amount')

In [0]:
#fit the model on training set
lr_model = lr.fit(train_data)

In [0]:
#make pred on train set
train_preds = lr_model.transform(train_data)

In [0]:
#show columns vs pred train
train_preds.select(cols_list + ['prediction']).show(10)

+-------+----+----+-----+------------------+------------------+------------+------------------+--------------------+------------------+
|weekday|hour|year|month|  trip_distance_km| trip_duration_min|total_amount|             speed|             Zone_pu|        prediction|
+-------+----+----+-----+------------------+------------------+------------+------------------+--------------------+------------------+
|      6|  23|2022|    9|         1.6415268|               7.1|       12.36|13.872057464788734| Little Italy/NoLiTa|12.454592664907238|
|      6|  23|2022|    9|          2.092142| 9.666666666666666|        11.8|12.985708965517242|      Midtown Center|14.237301304978644|
|      6|  23|2022|    9|1.6737136000000001|               4.4|       11.62|22.823367272727275|Upper West Side S...|11.145139989509516|
|      6|  23|2022|    9|          1.448406|              6.45|         9.8|13.473544186046514|                SoHo| 12.05646622503383|
|      6|  23|2022|    9|         5.8579976|15.1

In [0]:
#make pred on test set
test_preds = lr_model.transform(test_data)

In [0]:
#show columns vs pred train
test_preds.select(["total_amount"] + ['prediction']).show(10)

+------------+------------------+
|total_amount|        prediction|
+------------+------------------+
|       15.95|15.701707831443684|
|         9.3|  9.56646138557312|
|       12.36|14.273423028490015|
|       12.35|11.865978531893234|
|         9.1|10.849942833968157|
|        29.3|  33.9369646988892|
|        14.3|15.480508290320756|
|        47.8|  54.3483558177748|
|       31.01| 35.36487269476879|
|        19.8|32.443657485823906|
+------------+------------------+
only showing top 10 rows



In [0]:
#evaluate with RMSE
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse',labelCol='total_amount',predictionCol='prediction')

In [0]:
#print rmse train
rmse_train =evaluator.evaluate(train_preds)
print('rmse_train=',rmse_train)

rmse_train= 188.3974532124786


In [0]:
rmse_test =evaluator.evaluate(test_preds)
print('rmse_test=',rmse_test)

rmse_test= 6.059042097969246
