In [1]:
import pyspark
from pyspark.sql.functions import year, month, hour, sum as spark_sum, col, dayofweek
from pyspark.sql import SparkSession
import os
spark=SparkSession.builder.appName("Testing").getOrCreate()

23/12/02 17:32:03 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.60 instead (on interface enp3s0)
23/12/02 17:32:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/02 17:32:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Loading Data

In [2]:
parquet_dir = 'db'

# Get a list of all Parquet files in the directory
parquet_files = [os.path.join(parquet_dir, file) for file in os.listdir(parquet_dir) if file.endswith('.parquet')]

# Read the Parquet files into separate DataFrames
dataframes = [spark.read.parquet(file) for file in parquet_files]

# Combine DataFrames into a single DataFrame
# combined_df = dataframes[0].union(*dataframes[1:])

origDF = dataframes[0]
for df in dataframes[1:]:
    origDF = origDF.union(df)

# origDF = spark.read.parquet('yellow_tripdata_2023-06.parquet')

# Feature Extraction and Cleaning

In [3]:

origDF = origDF.drop("VendorID", "passenger_count", "RatecodeID", "store_and_fwd_flag", "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "congestion_surcharge", "Airport_fee")

In [4]:
origDF = origDF.withColumn("year", year("tpep_pickup_datetime")) \
                             .withColumn("month", month("tpep_pickup_datetime")) \
                             .withColumn("hour", hour("tpep_pickup_datetime")) \
                             .withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))

In [5]:
df_with_duration = origDF.withColumn(
    "duration",
    (origDF.tpep_dropoff_datetime - origDF.tpep_pickup_datetime).cast("int") # Duration in minutes
)
origDF = df_with_duration.drop("tpep_pickup_datetime", "tpep_dropoff_datetime")

In [6]:
origDF = origDF.na.drop()

In [7]:
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [8]:
origDF = origDF.filter(origDF['duration']<20000)
origDF = origDF.filter(origDF['duration']>180)

In [9]:
origDF = origDF.filter(origDF['trip_distance'] < 50)
origDF = origDF.filter(origDF['trip_distance'] > 0)

In [10]:
origDF = origDF.filter(origDF['total_amount'] < 500)
origDF = origDF.filter(origDF['total_amount'] >= 3)

In [11]:
origDF.show()

+-------------+------------+------------+------------+----+-----+----+-----------+--------+
|trip_distance|PULocationID|DOLocationID|total_amount|year|month|hour|day_of_week|duration|
+-------------+------------+------------+------------+----+-----+----+-----------+--------+
|          3.4|         140|         238|        33.6|2023|    6|   0|          5|    1253|
|          3.4|          50|         151|        23.6|2023|    6|   0|          5|     614|
|         10.2|         138|          97|       60.05|2023|    6|   0|          5|    1123|
|         9.83|         100|         244|       53.28|2023|    6|   0|          5|    1406|
|         1.17|         137|         234|       15.02|2023|    6|   0|          5|     514|
|          3.6|         249|          33|       28.05|2023|    6|   0|          5|     796|
|         3.08|         141|         226|        26.8|2023|    6|   0|          5|    1136|
|          1.1|         246|          50|        18.0|2023|    6|   0|          

# Feature Indexing and Model training

In [12]:
feature_columns = ["trip_distance", "PULocationID", "DOLocationID", "year", "month", "hour", "day_of_week", "duration"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(origDF)

In [13]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=265).fit(df)


                                                                                

In [14]:
df = featureIndexer.transform(df)
fin_df = df.drop("trip_distance","PULocationID", "DOLocationID", "year", "month", "hour", "day_of_week", "duration")
fin_df.show()

+------------+--------------------+--------------------+
|total_amount|            features|     indexedFeatures|
+------------+--------------------+--------------------+
|        33.6|[3.4,140.0,238.0,...|[3.4,136.0,234.0,...|
|        23.6|[3.4,50.0,151.0,2...|[3.4,49.0,148.0,3...|
|       60.05|[10.2,138.0,97.0,...|[10.2,134.0,96.0,...|
|       53.28|[9.83,100.0,244.0...|[9.83,99.0,240.0,...|
|       15.02|[1.17,137.0,234.0...|[1.17,133.0,230.0...|
|       28.05|[3.6,249.0,33.0,2...|[3.6,243.0,32.0,3...|
|        26.8|[3.08,141.0,226.0...|[3.08,137.0,222.0...|
|        18.0|[1.1,246.0,50.0,2...|[1.1,240.0,49.0,3...|
|       13.22|[0.99,186.0,234.0...|[0.99,181.0,230.0...|
|       33.96|[5.43,234.0,166.0...|[5.43,228.0,163.0...|
|       18.84|[1.68,249.0,170.0...|[1.68,243.0,167.0...|
|        22.2|[2.1,50.0,158.0,2...|[2.1,49.0,155.0,3...|
|        30.6|[3.72,239.0,90.0,...|[3.72,233.0,89.0,...|
|       14.64|[0.98,249.0,125.0...|[0.98,243.0,122.0...|
|        17.4|[1.91,148.0,170.0

In [15]:
train_data, test_data = fin_df.randomSplit([0.7, 0.3], seed=1)

In [16]:
gbt = RandomForestRegressor(featuresCol='indexedFeatures', labelCol='total_amount', seed=42, maxBins=300)
model = gbt.fit(train_data)

23/12/02 17:33:29 WARN MemoryStore: Not enough space to cache rdd_73_26 in memory! (computed 44.4 MiB so far)
23/12/02 17:33:29 WARN BlockManager: Persisting block rdd_73_26 to disk instead.
23/12/02 17:33:29 WARN MemoryStore: Not enough space to cache rdd_73_30 in memory! (computed 44.4 MiB so far)
23/12/02 17:33:29 WARN BlockManager: Persisting block rdd_73_30 to disk instead.
23/12/02 17:33:32 WARN MemoryStore: Not enough space to cache rdd_73_26 in memory! (computed 44.4 MiB so far)
23/12/02 17:33:32 WARN MemoryStore: Not enough space to cache rdd_73_30 in memory! (computed 66.6 MiB so far)
23/12/02 17:33:38 WARN MemoryStore: Not enough space to cache rdd_73_17 in memory! (computed 44.4 MiB so far)
23/12/02 17:33:38 WARN BlockManager: Persisting block rdd_73_17 to disk instead.
23/12/02 17:33:41 WARN MemoryStore: Not enough space to cache rdd_73_5 in memory! (computed 149.9 MiB so far)
23/12/02 17:33:41 WARN BlockManager: Persisting block rdd_73_5 to disk instead.
23/12/02 17:33:45

In [17]:
predictions = model.transform(test_data)



In [18]:
rmse_eval = RegressionEvaluator(labelCol='total_amount', metricName='rmse')
rmse = rmse_eval.evaluate(predictions)
rmse

                                                                                

5.913072400062082

In [19]:
model.save("model")

In [None]:
spark.stop()