# Predicting Taxi Fare with PySpark

# 1. Import Libraries

In [1]:
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, dayofweek, month, col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 2. Understanding our data

In [3]:
# Initialize Spark Session
spark = SparkSession.builder.appName('NYC-Yellow-Taxi-Trip').getOrCreate()

# Change LogLevel to ERROR
spark.sparkContext.setLogLevel("ERROR")

# Load CSV files into separate DataFrames
df_2015_01 = spark.read.csv('/kaggle/input/nyc-yellow-taxi-trip-data/yellow_tripdata_2015-01.csv', header=True, inferSchema=True)
df_2016_01 = spark.read.csv('/kaggle/input/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-01.csv', header=True, inferSchema=True)
df_2016_02 = spark.read.csv('/kaggle/input/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-02.csv', header=True, inferSchema=True)
df_2016_03 = spark.read.csv('/kaggle/input/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-03.csv', header=True, inferSchema=True)

# Combine all DataFrames into one
df = df_2015_01.union(df_2016_01).union(df_2016_02).union(df_2016_03)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/26 17:03:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [4]:
# Show the schema of the DataFrame
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [5]:
# Display sample rows
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|  -73.993896484375|  40.7501106262207|         1|    

In [6]:
df.describe().show()



+-------+------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+--------------------+-----------------+------------------+---------------------+------------------+
|summary|          VendorID|   passenger_count|    trip_distance|   pickup_longitude|   pickup_latitude|        RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|       payment_type|       fare_amount|             extra|             mta_tax|       tip_amount|      tolls_amount|improvement_surcharge|      total_amount|
+-------+------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+--------------------+-----------------+------------------+---------------------+------

                                                                                

In [7]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|  -73.993896484375|  40.7501106262207|         1|    

# 3. Preprocessing

In [8]:
df = df.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')
df = df.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [9]:
df.na.drop()

DataFrame[VendorID: int, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, RateCodeID: int, store_and_fwd_flag: string, dropoff_longitude: double, dropoff_latitude: double, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double]

In [10]:
df.dropDuplicates()

DataFrame[VendorID: int, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, RateCodeID: int, store_and_fwd_flag: string, dropoff_longitude: double, dropoff_latitude: double, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double]

In [11]:
df = df.filter(col("trip_distance") > 0)
df = df.filter((df.fare_amount > 0) & (df.fare_amount < 250))
df = df.filter((df.passenger_count > 0) & (df.passenger_count <= 6))

In [12]:
# Extract relevant datetime features from 'pickup_datetime' for modeling
df = df.withColumn("hour", hour(col("pickup_datetime")))
df = df.withColumn("day_of_week", dayofweek(col("pickup_datetime")))  # 1=niedziela, 7=sobota
df = df.withColumn("month", month(col("pickup_datetime")))
df = df.withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), 1).otherwise(0))
df = df.withColumn("is_night", when((col("hour") < 6) | (col("hour") >= 22), 1).otherwise(0))

# 4. Analysing

In [13]:
# Select only relevant columns
feature_cols = ["month", "day_of_week", "is_weekend", "hour", "is_night", "passenger_count",
                "RateCodeID", "trip_distance", "pickup_longitude", "pickup_latitude", 
                "dropoff_longitude", "dropoff_latitude"]

label = "fare_amount"

In [14]:
# Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [15]:
# Scaller features
#scaler = StandardScaler(inputCol="features", outputCol="features", withStd=True, withMean=False)

In [16]:
# Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol=label)

In [17]:
pipeline = Pipeline(stages=[assembler, lr])

In [18]:
df_sample = df.sample(False, 0.01, seed=21)

In [19]:
# Split data into training and testing sets
train_data, test_data = df_sample.randomSplit([0.8, 0.2], seed=21)

# Train the model
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

                                                                                

In [20]:
evaluator = RegressionEvaluator(labelCol=label, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.2f}")



RMSE: 3.29


                                                                                