In [1]:
#import pyarrow.parquet as pq
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import explode, col, lit
from pyspark.sql.functions import month, year, sum as spark_sum
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder \
    .appName("Stack Parquet Files") \
    .getOrCreate()

file_paths = [
    "C:/Users/Tarun/Documents/BDP_Project/fhvhv_tripdata_2021-01.parquet"
]

dfs = []
for file_path in file_paths:
    df = spark.read.parquet(file_path)
    dfs.append(df)

stacked_df = dfs[0]
for df in dfs[1:]:
    stacked_df = stacked_df.union(df)

stacked_df = stacked_df.withColumn("airport_fee", when(stacked_df["airport_fee"].isNull(), 0).otherwise(stacked_df["airport_fee"]))
stacked_df.show(5)

main_df = stacked_df.select(
    "hvfhs_license_num",
    "pickup_datetime",
    "dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "trip_miles",
    "trip_time",
    "base_passenger_fare",
    "tolls",
    "bcf",
    "sales_tax",
    "congestion_surcharge",
    "airport_fee",
    "tips",
    "driver_pay"
)
main_df.show(5)

main_df = main_df.na.fill(0, subset=[
    'base_passenger_fare',
    'tolls',
    'bcf',
    'sales_tax',
    'congestion_surcharge',
    'airport_fee',
    'tips'
])

comp_code = {
    "HV0003": "Uber",
    "HV0004": "Via",
    "HV0005": "Lyft"
}

df_uber = main_df.filter(main_df['hvfhs_license_num'] == 'HV0003')
df_via = main_df.filter(main_df['hvfhs_license_num'] == 'HV0004')
df_lyft = main_df.filter(main_df['hvfhs_license_num'] == 'HV0005')

df_uber.show(5)
df_via.show(5)
df_lyft.show(5)


#spark.stop()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [3]:
# Start Spark session
"""spark = SparkSession.builder \
    .appName("WeatherDataProcessing") \
    .getOrCreate()"""

# Read the CSV file into a PySpark DataFrame
weather_df = spark.read.csv("C:/Users/Tarun/Documents/BDP_Project/nyc 2021-01-01 to 2021-12-31.csv", header=True, inferSchema=True)

# Show the first few rows of the weather DataFrame
weather_df.show(5)

# Remove specified columns from the DataFrame
columns_to_drop = ["name", "address", "resolvedAddress", "severerisk"]
weather_df = weather_df.drop(*columns_to_drop)

# Replace null values in the windgust column with 0
weather_df = weather_df.fillna({'windgust': 0})

# Show the modified DataFrame
weather_df.show(5)

# Stop Spark session
#spark.stop()


+----+-------+--------------------+----------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+-------+----------+
|name|address|     resolvedAddress|  datetime|temp|feelslike| dew|humidity|precip|precipprob|preciptype|snow|snowdepth|windgust|windspeed|winddir|sealevelpressure|cloudcover|visibility|uvindex|severerisk|
+----+-------+--------------------+----------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+-------+----------+
| nyc|    nyc|New York, NY, Uni...|2021-01-01| 2.5|     -0.2|-3.0|    67.8| 15.33|       100|      rain| 0.0|      0.0|    NULL|     15.5|   69.8|          1028.9|      50.6|      14.0|      3|      NULL|
| nyc|    nyc|New York, NY, Uni...|2021-01-02| 5.8|      3.6| 1.2|    74.0|  2.38|       100|      rain| 1.9|      0.6|    54.6|     25.5|  246.9|          1012.4|      63.9|      

In [4]:
data = df_uber

In [5]:
type(data)

pyspark.sql.dataframe.DataFrame

In [6]:
from pyspark.sql.functions import col, unix_timestamp

data = data.withColumn('pickup_timestamp', unix_timestamp('pickup_datetime'))
data = data.withColumn('dropoff_timestamp', unix_timestamp('dropoff_datetime'))
data = data.withColumn('trip_duration', (col('dropoff_timestamp') - col('pickup_timestamp')) / 60)
data = data.withColumn('driver_pay_per_mile', col('driver_pay') / col('trip_miles'))
data = data.withColumn('driver_pay_per_minute', col('driver_pay') / col('trip_duration'))

data.show()

+-----------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+----------------+-----------------+------------------+-------------------+---------------------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|pickup_timestamp|dropoff_timestamp|     trip_duration|driver_pay_per_mile|driver_pay_per_minute|
+-----------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+----------------+-----------------+------------------+-------------------+---------------------+
|           HV0003|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|      5.26|      923|              2

In [7]:
data.columns

['hvfhs_license_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'bcf',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'pickup_timestamp',
 'dropoff_timestamp',
 'trip_duration',
 'driver_pay_per_mile',
 'driver_pay_per_minute']

In [8]:
weather_df.columns

['datetime',
 'temp',
 'feelslike',
 'dew',
 'humidity',
 'precip',
 'precipprob',
 'preciptype',
 'snow',
 'snowdepth',
 'windgust',
 'windspeed',
 'winddir',
 'sealevelpressure',
 'cloudcover',
 'visibility',
 'uvindex']

In [9]:
type(weather_df)

pyspark.sql.dataframe.DataFrame

In [10]:
from pyspark.sql.functions import col, to_date
data = data.withColumn('pickup_date', to_date(col('pickup_datetime')))

merged_df = data.join(weather_df, data.pickup_date == weather_df.datetime, 'inner')

merged_df = merged_df.drop('pickup_date')

merged_df.show()

+-----------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+----------------+-----------------+------------------+-------------------+---------------------+----------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+-------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|pickup_timestamp|dropoff_timestamp|     trip_duration|driver_pay_per_mile|driver_pay_per_minute|  datetime|temp|feelslike| dew|humidity|precip|precipprob|preciptype|snow|snowdepth|windgust|windspeed|winddir|sealevelpressure|cloudcover|visibility|uvindex|
+-----------------+-------------------+-------------------+------------+------------+----------+--

In [11]:
weather_df.printSchema()

root
 |-- datetime: date (nullable = true)
 |-- temp: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- dew: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- precipprob: integer (nullable = true)
 |-- preciptype: string (nullable = true)
 |-- snow: double (nullable = true)
 |-- snowdepth: double (nullable = true)
 |-- windgust: double (nullable = false)
 |-- windspeed: double (nullable = true)
 |-- winddir: double (nullable = true)
 |-- sealevelpressure: double (nullable = true)
 |-- cloudcover: double (nullable = true)
 |-- visibility: double (nullable = true)
 |-- uvindex: integer (nullable = true)



In [12]:
weather_df.head(1)

[Row(datetime=datetime.date(2021, 1, 1), temp=2.5, feelslike=-0.2, dew=-3.0, humidity=67.8, precip=15.33, precipprob=100, preciptype='rain', snow=0.0, snowdepth=0.0, windgust=0.0, windspeed=15.5, winddir=69.8, sealevelpressure=1028.9, cloudcover=50.6, visibility=14.0, uvindex=3)]

In [13]:
data.head(1)

[Row(hvfhs_license_num='HV0003', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, trip_miles=5.26, trip_time=923, base_passenger_fare=22.28, tolls=0.0, bcf=0.67, sales_tax=1.98, congestion_surcharge=2.75, airport_fee=0.0, tips=0.0, driver_pay=14.99, pickup_timestamp=1609461224, dropoff_timestamp=1609462147, trip_duration=15.383333333333333, driver_pay_per_mile=2.8498098859315593, driver_pay_per_minute=0.9744312026002168, pickup_date=datetime.date(2021, 1, 1))]

In [14]:
data.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = false)
 |-- tolls: double (nullable = false)
 |-- bcf: double (nullable = false)
 |-- sales_tax: double (nullable = false)
 |-- congestion_surcharge: double (nullable = false)
 |-- airport_fee: double (nullable = false)
 |-- tips: double (nullable = false)
 |-- driver_pay: double (nullable = true)
 |-- pickup_timestamp: long (nullable = true)
 |-- dropoff_timestamp: long (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- driver_pay_per_mile: double (nullable = true)
 |-- driver_pay_per_minute: double (nullable = true)
 |-- pickup_date: date (nullable = true)



In [15]:
weather_df.printSchema()

root
 |-- datetime: date (nullable = true)
 |-- temp: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- dew: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- precipprob: integer (nullable = true)
 |-- preciptype: string (nullable = true)
 |-- snow: double (nullable = true)
 |-- snowdepth: double (nullable = true)
 |-- windgust: double (nullable = false)
 |-- windspeed: double (nullable = true)
 |-- winddir: double (nullable = true)
 |-- sealevelpressure: double (nullable = true)
 |-- cloudcover: double (nullable = true)
 |-- visibility: double (nullable = true)
 |-- uvindex: integer (nullable = true)



In [16]:
merged_df.columns

['hvfhs_license_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'bcf',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'pickup_timestamp',
 'dropoff_timestamp',
 'trip_duration',
 'driver_pay_per_mile',
 'driver_pay_per_minute',
 'datetime',
 'temp',
 'feelslike',
 'dew',
 'humidity',
 'precip',
 'precipprob',
 'preciptype',
 'snow',
 'snowdepth',
 'windgust',
 'windspeed',
 'winddir',
 'sealevelpressure',
 'cloudcover',
 'visibility',
 'uvindex']

In [17]:
merged_df.count()

8704128

In [18]:
data.count()

8704128

In [19]:
weather_df.count()

365

In [20]:
merged_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = false)
 |-- tolls: double (nullable = false)
 |-- bcf: double (nullable = false)
 |-- sales_tax: double (nullable = false)
 |-- congestion_surcharge: double (nullable = false)
 |-- airport_fee: double (nullable = false)
 |-- tips: double (nullable = false)
 |-- driver_pay: double (nullable = true)
 |-- pickup_timestamp: long (nullable = true)
 |-- dropoff_timestamp: long (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- driver_pay_per_mile: double (nullable = true)
 |-- driver_pay_per_minute: double (nullable = true)
 |-- datetime: date (nullable = true)
 |-- temp: double (nullable = true)
 |-- feelsli

In [21]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator



In [22]:
# 1. Prepare the features
feature_columns = ['trip_miles','trip_time', 'base_passenger_fare','tolls', 'bcf',
                   'sales_tax', 'congestion_surcharge', 'airport_fee', 'tips', 'temp',
                   'feelslike', 'dew', 'humidity', 'precip', 'precipprob', 
                   'snow', 'snowdepth', 'windgust', 'windspeed', 'winddir', 
                   'sealevelpressure', 'cloudcover', 'visibility', 'uvindex']

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data_with_features = assembler.transform(merged_df)



In [23]:
# 2. Split the dataset into training and testing sets
(train_data, test_data) = data_with_features.randomSplit([0.8, 0.2], seed=42)



In [24]:
# 3. Train the Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="driver_pay", seed=42)


In [25]:

model = rf.fit(train_data.limit(1000))



In [26]:
# 4. Make predictions
predictions = model.transform(test_data)



In [27]:
# 5. Evaluate the model
evaluator = RegressionEvaluator(labelCol="driver_pay", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 5.09902
