In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

!pip install pyspark
!pip install geopy

import numpy as np # linear algebra
import pyspark.pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, mean, stddev, col, abs, date_format, hour, minute, when, lit, monotonically_increasing_id, round
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
from geopy.distance import geodesic

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

def compute_distance_udf(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude):
    coord1 = (pickup_latitude, pickup_longitude)
    coord2 = (dropoff_latitude, dropoff_longitude)
    return geodesic(coord1, coord2).kilometers

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812366 sha256=c347fa134009ece1ec3c7032395d874e0940dfbfb74dfcf644dbf03c55179e2e
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2




/kaggle/input/new-york-city-taxi-fare-prediction/sample_submission.csv
/kaggle/input/new-york-city-taxi-fare-prediction/GCP-Coupons-Instructions.rtf
/kaggle/input/new-york-city-taxi-fare-prediction/train.csv
/kaggle/input/new-york-city-taxi-fare-prediction/test.csv


In [2]:
class TrainTaxiCompetition:
    
    def __init__(self, frac=1, random_state=42):
        self.random_state = random_state
        
        # Initialize Spark session
        self.spark = SparkSession.builder \
            .appName("TaxiCompetition") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.memory", "4g") \
            .getOrCreate()

        self.sc = self.spark.sparkContext

        self.sc.setLogLevel("ERROR")

        self.data = self.spark.read.csv('/kaggle/input/new-york-city-taxi-fare-prediction/train.csv', header=True, inferSchema=True)
        self.sample_data = self.data.sample(fraction=frac, seed=self.random_state)
        
    def data_cleaning(self):
        self.sample_data = self.sample_data.filter((col('pickup_longitude') <= 180) & (col('pickup_longitude') >= -180) & (col('dropoff_longitude') <= 180) & (col('dropoff_longitude') >= -180) & (col('pickup_latitude') <= 90) & (col('pickup_latitude') >= -90) & (col('dropoff_latitude') <= 90) & (col('dropoff_latitude') >= -90))

        self.data_wo_dup = self.sample_data.dropDuplicates()

        self.mean = self.data_wo_dup.select(mean("fare_amount")).collect()[0][0]
        self.std = self.data_wo_dup.select(stddev("fare_amount")).collect()[0][0]

        self.data_wo_dup = self.data_wo_dup.withColumn("zscore", 
                                      (col("fare_amount") - self.mean) / self.std)

        self.data_wo_dup_out = self.data_wo_dup.filter(col('zscore') < 3)
        
    def new_columns(self):
        def compute_distance_udf(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude):
            coord1 = (pickup_latitude, pickup_longitude)
            coord2 = (dropoff_latitude, dropoff_longitude)
            return geodesic(coord1, coord2).kilometers
        
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn('long', abs(col('dropoff_longitude') - col('pickup_longitude')))
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn('lat', abs(col('dropoff_latitude') - col('pickup_latitude')))

        self.data_wo_dup_out = self.data_wo_dup_out.withColumn('weekday', date_format(col('pickup_datetime'), 'EEEE'))

        self.data_wo_dup_out = self.data_wo_dup_out.withColumn('hour_minute', hour(col('pickup_datetime')) + (minute(col('pickup_datetime'))/60))
        
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn("Tuesday", when(col("weekday") == 'Tuesday', 1).otherwise(0))
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn("Wednesday", when(col("weekday") == 'Wednesday', 1).otherwise(0))
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn("Thursday", when(col("weekday") == 'Thursday', 1).otherwise(0))
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn("Friday", when(col("weekday") == 'Friday', 1).otherwise(0))
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn("Saturday", when(col("weekday") == 'Saturday', 1).otherwise(0))
        self.data_wo_dup_out = self.data_wo_dup_out.withColumn("Sunday", when(col("weekday") == 'Sunday', 1).otherwise(0))

        # Define UDF
        distance_udf = udf(compute_distance_udf, DoubleType())
        self.data = self.data_wo_dup_out.withColumn('distance_km', distance_udf(
        col('pickup_latitude'),
        col('pickup_longitude'),
        col('dropoff_latitude'),
        col('dropoff_longitude')
        ))
        
    def train_model(self):

        self.data_selected = self.data.select(
            'fare_amount', 'passenger_count', 'long', 'lat', 'hour_minute', 'distance_km',
            'Friday', 'Saturday', 'Sunday', 'Thursday', 'Tuesday', 'Wednesday'
        )

        self.X_train, self.X_test = self.data_selected.randomSplit([0.8, 0.2], seed=self.random_state)

        self.assembler = VectorAssembler(
            inputCols=[
                'passenger_count', 'long', 'lat', 'hour_minute', 'distance_km',
                'Friday', 'Saturday', 'Sunday',
                'Thursday', 'Tuesday', 'Wednesday'
            ],
            outputCol="features"
        )
        self.train_assembled_df = self.assembler.transform(self.X_train)
        self.test_assembled_df = self.assembler.transform(self.X_test)

        self.rf = RandomForestRegressor(featuresCol="features", labelCol="fare_amount")
        self.rf_model = self.rf.fit(self.train_assembled_df)
        self.rf_predictions = self.rf_model.transform(self.test_assembled_df)

        self.evaluator = RegressionEvaluator(
            labelCol="fare_amount", predictionCol="prediction", metricName="rmse"
        )
        self.rmse = self.evaluator.evaluate(self.rf_predictions)
        print(f"Root Mean Squared Error (RMSE): {self.rmse}")
            
    def run(self):
        self.data_cleaning()
        self.new_columns()
        self.train_model()
        
        return self.rf_model

In [3]:
class TestTaxiCompetition:
    
    def __init__(self, model):
        self.model = model
        
        # Initialize Spark session
        self.spark = SparkSession.builder \
            .appName("TaxiCompetition") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.memory", "4g") \
            .getOrCreate()

        self.sc = self.spark.sparkContext

        self.sc.setLogLevel("ERROR")

        self.data = self.spark.read.csv('/kaggle/input/new-york-city-taxi-fare-prediction/test.csv', header=True, inferSchema=True)
        
    def new_columns(self):
        def compute_distance_udf(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude):
            coord1 = (pickup_latitude, pickup_longitude)
            coord2 = (dropoff_latitude, dropoff_longitude)
            return geodesic(coord1, coord2).kilometers
        
        self.data = self.data.withColumn('long', abs(col('dropoff_longitude') - col('pickup_longitude')))
        self.data = self.data.withColumn('lat', abs(col('dropoff_latitude') - col('pickup_latitude')))

        self.data = self.data.withColumn('weekday', date_format(col('pickup_datetime'), 'EEEE'))

        self.data = self.data.withColumn('hour_minute', hour(col('pickup_datetime')) + (minute(col('pickup_datetime'))/60))
        
        self.data = self.data.withColumn("Tuesday", when(col("weekday") == 'Tuesday', 1).otherwise(0))
        self.data = self.data.withColumn("Wednesday", when(col("weekday") == 'Wednesday', 1).otherwise(0))
        self.data = self.data.withColumn("Thursday", when(col("weekday") == 'Thursday', 1).otherwise(0))
        self.data = self.data.withColumn("Friday", when(col("weekday") == 'Friday', 1).otherwise(0))
        self.data = self.data.withColumn("Saturday", when(col("weekday") == 'Saturday', 1).otherwise(0))
        self.data = self.data.withColumn("Sunday", when(col("weekday") == 'Sunday', 1).otherwise(0))

        # Define UDF
        distance_udf = udf(compute_distance_udf, DoubleType())
        self.data = self.data.withColumn('distance_km', distance_udf(
        col('pickup_latitude'),
        col('pickup_longitude'),
        col('dropoff_latitude'),
        col('dropoff_longitude')
        ))
        
    def test_model(self):

        self.data_selected = self.data.select(
            'passenger_count', 'long', 'lat', 'hour_minute', 'distance_km',
            'Friday', 'Saturday', 'Sunday', 'Thursday', 'Tuesday', 'Wednesday'
        )

        self.assembler = VectorAssembler(
            inputCols=[
                'passenger_count', 'long', 'lat', 'hour_minute', 'distance_km',
                'Friday', 'Saturday', 'Sunday',
                'Thursday', 'Tuesday', 'Wednesday'
            ],
            outputCol="features"
        )
        
        self.assembled_df = self.assembler.transform(self.data_selected)

        self.predictions = self.model.transform(self.assembled_df)
        
    def submission(self):
        
        self.data_key = self.data.select('key')
        self.data.predictions = self.predictions.select('prediction')
        
        self.data_key = self.data_key.withColumn("id", monotonically_increasing_id())
        self.data.predictions = self.data.predictions.withColumn("id", monotonically_increasing_id())
        
        self.submission = self.data_key.join(self.data.predictions, on="id").drop("id")
        
        self.submission = self.submission.withColumn("fare_amount", round(col("prediction"), 2)).drop('prediction')
        
        self.submission.withColumn("key", date_format("key", "yyyy-MM-dd HH:mm:ss"))
            
    def run(self):

        self.new_columns()
        self.test_model()
        self.submission()
        
        return self.submission
        
        

In [4]:
model = TrainTaxiCompetition(frac=0.001).run()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/23 04:37:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Root Mean Squared Error (RMSE): 3.3376992574151116


                                                                                

In [5]:
output = TestTaxiCompetition(model).run()

In [6]:
output.show()

                                                                                

+-------------------+-----------+
|                key|fare_amount|
+-------------------+-----------+
|2015-01-27 13:08:24|        8.4|
|2015-01-27 13:08:24|       8.81|
|2011-10-08 11:53:44|       6.31|
|2012-12-01 21:12:12|        7.6|
|2012-12-01 21:12:12|      15.02|
|2012-12-01 21:12:12|       9.96|
|2011-10-06 12:10:20|       6.33|
|2011-10-06 12:10:20|      26.96|
|2011-10-06 12:10:20|      12.16|
|2014-02-18 15:22:20|       6.31|
|2014-02-18 15:22:20|       8.31|
|2014-02-18 15:22:20|      13.98|
|2010-03-29 20:20:32|       6.31|
|2010-03-29 20:20:32|       8.04|
|2011-10-06 03:59:12|       8.52|
|2011-10-06 03:59:12|      14.45|
|2012-07-15 16:45:04|       6.32|
|2012-07-15 16:45:04|      10.03|
|2012-07-15 16:45:04|       6.32|
|2012-07-15 16:45:04|       6.32|
+-------------------+-----------+
only showing top 20 rows



In [7]:
output.write.csv('submission.csv', header=True, mode='overwrite')
print("Your submission was successfully saved!")

                                                                                

Your submission was successfully saved!
