<a href="https://colab.research.google.com/github/ayushchty/taxi_price_prediction/blob/main/Untitled6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,unix_timestamp,hour,dayofweek,dayofmonth,lit

In [3]:
spark = SparkSession.builder.appName('TaxiFarePrediction').getOrCreate()

df = spark.read.csv('sample.csv', header=True, inferSchema=True)

df.show(5)

+-------------------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|                key|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-------------------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|2011-01-24 18:05:00|        9.3|2011-01-24 18:05:00|      -73.983768|      40.738037|       -73.982185|       40.757298|              1|
|2013-04-23 20:57:00|        5.0|2013-04-23 20:57:00|      -73.967943|      40.799842|        -73.96482|       40.810153|              1|
|2011-01-09 03:01:00|       16.5|2011-01-09 03:01:00|      -73.983637|      40.760198|       -73.942912|       40.824983|              2|
|2010-04-07 04:15:00|        8.1|2010-04-07 04:15:00|      -74.002312|      40.733603|       -73.985807|       40.762793|              2|
|2013-11-05 16:53:00|      51.83|2

In [7]:
# df=df.filter(col('fare_amount')>0)&(col('pickup_longitude').isNotNull())&(col('pickup_latitude').isNotNull())&(col('dropoff_longitude').isNotNull())&(col('dropoff_latitude').isNotNull())

In [6]:
df = df.filter((col('fare_amount') > 0) & (col('pickup_longitude').isNotNull()) & (col('pickup_latitude').isNotNull()) & (col('dropoff_longitude').isNotNull()) & (col('dropoff_latitude').isNotNull()))

In [8]:
df = df.withColumn('pickup_datetime', unix_timestamp('pickup_datetime', 'yyyy-MM-dd HH:mm:ss').cast('timestamp'))
df = df.withColumn('hour', hour('pickup_datetime'))
df = df.withColumn('weekday', dayofweek('pickup_datetime'))

In [14]:
df.show()

+-------------------+-----------+-------------------+------------------+----------------+------------------+----------------+---------------+----+-------+
|                key|fare_amount|    pickup_datetime|  pickup_longitude| pickup_latitude| dropoff_longitude|dropoff_latitude|passenger_count|hour|weekday|
+-------------------+-----------+-------------------+------------------+----------------+------------------+----------------+---------------+----+-------+
|2011-01-24 18:05:00|        9.3|2011-01-24 18:05:00|        -73.983768|       40.738037|        -73.982185|       40.757298|              1|  18|      2|
|2013-04-23 20:57:00|        5.0|2013-04-23 20:57:00|        -73.967943|       40.799842|         -73.96482|       40.810153|              1|  20|      3|
|2011-01-09 03:01:00|       16.5|2011-01-09 03:01:00|        -73.983637|       40.760198|        -73.942912|       40.824983|              2|   3|      1|
|2010-04-07 04:15:00|        8.1|2010-04-07 04:15:00|        -74.00231

In [11]:
import math

In [16]:
# haversine formula to calculate distance in kilometers
def haver(lon1, lat1, lon2, lat2):
    R = 6371
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)

    a = math.sin(delta_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

In [17]:
haversine_udf = udf(haver, DoubleType())

In [18]:
df = df.withColumn('distance', haversine_udf(col('pickup_longitude'), col('pickup_latitude'),
                                             col('dropoff_longitude'), col('dropoff_latitude')))

In [20]:
df.show()

+-------------------+-----------+-------------------+------------------+----------------+------------------+----------------+---------------+----+-------+------------------+
|                key|fare_amount|    pickup_datetime|  pickup_longitude| pickup_latitude| dropoff_longitude|dropoff_latitude|passenger_count|hour|weekday|          distance|
+-------------------+-----------+-------------------+------------------+----------------+------------------+----------------+---------------+----+-------+------------------+
|2011-01-24 18:05:00|        9.3|2011-01-24 18:05:00|        -73.983768|       40.738037|        -73.982185|       40.757298|              1|  18|      2|2.1458729965365486|
|2013-04-23 20:57:00|        5.0|2013-04-23 20:57:00|        -73.967943|       40.799842|         -73.96482|       40.810153|              1|  20|      3|1.1762763983398539|
|2011-01-09 03:01:00|       16.5|2011-01-09 03:01:00|        -73.983637|       40.760198|        -73.942912|       40.824983|     

In [23]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|       fare_amount|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|   passenger_count|              hour|           weekday|          distance|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|             99992|             99992|             99992|             99992|             99992|             99992|             99992|             99992|             99992|
|   mean|11.315218717497261|-72.57109926737286|39.934360387990374|-72.52166899547261| 39.93337016533233|1.6895751660132812|13.524651972157773|4.1243299463957115|19.366049747204563|
| stddev| 9.666784394358563|11.420298436830718| 6.516786680334995|15.139879574813945|7.09408299

In [24]:
df.printSchema()

root
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- distance: double (nullable = true)



In [26]:
df.dtypes

[('key', 'timestamp'),
 ('fare_amount', 'double'),
 ('pickup_datetime', 'timestamp'),
 ('pickup_longitude', 'double'),
 ('pickup_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('dropoff_latitude', 'double'),
 ('passenger_count', 'int'),
 ('hour', 'int'),
 ('weekday', 'int'),
 ('distance', 'double')]

In [31]:
df.distinct().count()

99992

In [32]:
df.count()

99992

In [33]:
df = df.withColumn('distance', haversine_udf(col('pickup_longitude'), col('pickup_latitude'),
                                             col('dropoff_longitude'), col('dropoff_latitude')))

In [34]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [36]:
assembler = VectorAssembler(inputCols=['hour', 'weekday', 'distance', 'passenger_count'], outputCol='features')
df = assembler.transform(df)

In [37]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)

rf = RandomForestRegressor(featuresCol='features', labelCol='fare_amount')
model = rf.fit(train_data)

In [38]:
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(labelCol='fare_amount', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

In [39]:
rmse

5.229298620718952

AttributeError: 'DataFrame' object has no attribute 'accuracy'

SyntaxError: invalid syntax (<ipython-input-41-f188d5ac1197>, line 1)