<a href="https://colab.research.google.com/github/NavyaKiran/NYCTaxiFarePrediciton/blob/main/NYC_Taxi_Fare_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("NYC Taxi Trip Analysis") \
    .getOrCreate()

In [3]:
import google.auth
from google.colab import auth

auth.authenticate_user()

In [4]:
# !pip install google-cloud-bigquery
from google.cloud import bigquery
client = bigquery.Client(project = 'navya-weather-data')

query_weather = """
SELECT wban, date, year, mo, da, temp, count_temp, dewp, count_dewp, slp, count_slp, stp, count_stp,
visib, count_visib, wdsp, count_wdsp, mxpsd, gust, max, min, prcp, sndp, fog, rain_drizzle, snow_ice_pellets, hail, thunder,
tornado_funnel_cloud
FROM `bigquery-public-data.noaa_gsod.gsod2023` weather
where year = '2023' and mo in ('01', '02')
"""

query_stations = """ select usaf, wban, state, country, call
from `bigquery-public-data.noaa_gsod.stations` stations
where stations.country = 'US' and stations.state = 'NY' and stations.call = 'KNYC'
"""

query_weather_job = client.query(query_weather)
query_stations_job = client.query(query_stations)

In [5]:
weather_data = query_weather_job.to_dataframe(max_results = None)
stations_data = query_stations_job.to_dataframe(max_results = None)

In [6]:
weather_data.shape

(659543, 29)

In [7]:
stations_data.shape

(4, 5)

In [8]:
weather_data.head()

Unnamed: 0,wban,date,year,mo,da,temp,count_temp,dewp,count_dewp,slp,...,max,min,prcp,sndp,fog,rain_drizzle,snow_ice_pellets,hail,thunder,tornado_funnel_cloud
0,99999,2023-02-18,2023,2,18,30.5,4,24.2,4,991.4,...,32.5,29.5,0.0,999.9,0,0,0,0,0,0
1,99999,2023-01-27,2023,1,27,42.3,4,39.7,4,9999.9,...,44.6,39.2,99.99,999.9,0,1,0,0,0,0
2,99999,2023-01-12,2023,1,12,39.2,4,25.7,4,978.4,...,40.6,33.1,0.0,999.9,0,0,0,0,0,0
3,99999,2023-01-04,2023,1,4,40.3,4,24.8,4,9999.9,...,41.0,32.7,0.0,999.9,0,0,0,0,0,0
4,99999,2023-01-08,2023,1,8,26.6,4,25.1,4,990.6,...,27.7,26.1,0.0,999.9,0,0,0,0,0,0


In [9]:
stations_data.head()

Unnamed: 0,usaf,wban,state,country,call
0,725033,94728,NY,US,KNYC
1,725060,94728,NY,US,KNYC
2,999999,94728,NY,US,KNYC
3,725053,94728,NY,US,KNYC


In [10]:
weather_nyc = weather_data.merge(stations_data, on = 'wban', how = 'inner')

In [11]:
weather_nyc.head()

Unnamed: 0,wban,date,year,mo,da,temp,count_temp,dewp,count_dewp,slp,...,fog,rain_drizzle,snow_ice_pellets,hail,thunder,tornado_funnel_cloud,usaf,state,country,call
0,94728,2023-02-07,2023,2,7,35.9,24,21.5,24,1025.2,...,0,0,0,0,0,0,725033,NY,US,KNYC
1,94728,2023-02-07,2023,2,7,35.9,24,21.5,24,1025.2,...,0,0,0,0,0,0,725060,NY,US,KNYC
2,94728,2023-02-07,2023,2,7,35.9,24,21.5,24,1025.2,...,0,0,0,0,0,0,999999,NY,US,KNYC
3,94728,2023-02-07,2023,2,7,35.9,24,21.5,24,1025.2,...,0,0,0,0,0,0,725053,NY,US,KNYC
4,94728,2023-01-03,2023,1,3,51.6,24,47.7,24,1017.3,...,1,1,0,0,0,0,725033,NY,US,KNYC


In [12]:
weather_nyc.dtypes

wban                     object
date                     dbdate
year                     object
mo                       object
da                       object
temp                    float64
count_temp                Int64
dewp                    float64
count_dewp                Int64
slp                     float64
count_slp                 Int64
stp                     float64
count_stp                 Int64
visib                   float64
count_visib               Int64
wdsp                     object
count_wdsp               object
mxpsd                    object
gust                    float64
max                     float64
min                     float64
prcp                    float64
sndp                    float64
fog                      object
rain_drizzle             object
snow_ice_pellets         object
hail                     object
thunder                  object
tornado_funnel_cloud     object
usaf                     object
state                    object
country 

In [13]:
weather_nyc.shape

(236, 33)

In [16]:
january_data = spark.read.parquet('/content/yellow_tripdata_2023-01.parquet')
february_data = spark.read.parquet('/content/yellow_tripdata_2023-02.parquet')
zone_data = spark.read.option("header", "true").csv('/content/taxi_zones.csv')

In [17]:
january_data.printSchema()
february_data.printSchema()
zone_data.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetim

In [18]:
january_data.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

In [19]:
february_data.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-02-01 00:32:53|  2023-02-01 00:34:34|              2|          0.3|         1|                 N|         142|         163|           2|        4.4|  3.5|    0.5|       0.

In [20]:
zone_data.show(10)

+--------+---------------+--------------------+-----------------+--------------------+----------+-------------+
|OBJECTID|     Shape_Leng|            the_geom|       Shape_Area|                zone|LocationID|      borough|
+--------+---------------+--------------------+-----------------+--------------------+----------+-------------+
|       1| 0.116357453189|MULTIPOLYGON (((-...|  0.0007823067885|      Newark Airport|         1|          EWR|
|       2|  0.43346966679|MULTIPOLYGON (((-...| 0.00486634037837|         Jamaica Bay|         2|       Queens|
|       3|0.0843411059012|MULTIPOLYGON (((-...|0.000314414156821|Allerton/Pelham G...|         3|        Bronx|
|       4|0.0435665270921|MULTIPOLYGON (((-...|0.000111871946192|       Alphabet City|         4|    Manhattan|
|       5|0.0921464898574|MULTIPOLYGON (((-...|0.000497957489363|       Arden Heights|         5|Staten Island|
|       6| 0.150490542523|MULTIPOLYGON (((-...|0.000606460984581|Arrochar/Fort Wad...|         6|Staten 

In [21]:
february_data = february_data.withColumnRenamed("Airport_fee", "airport_fee")

In [22]:
february_data.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [23]:
yellow_trip_data = january_data.union(february_data)
yellow_trip_data.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [24]:
print("The number of rows in the dataframe are",yellow_trip_data.count())
print("The number of columns in the dataframe are",len(yellow_trip_data.columns))
print("The number of rows in the zone dataframe are", zone_data.count())
print("The number of columns in the zone dataframe are", len(zone_data.columns))

The number of rows in the dataframe are 5980721
The number of columns in the dataframe are 19
The number of rows in the zone dataframe are 263
The number of columns in the zone dataframe are 7


In [25]:
yellow_trip_data.registerTempTable("yellow_trip_data")
zone_data.registerTempTable("zone_data")



In [26]:
#Now, we combine (join) yellow_trip_data and zone_data

join_query = """select yellow_trip_data.VendorID, TO_DATE(yellow_trip_data.tpep_pickup_datetime) as pickup_date, TO_DATE(yellow_trip_data.tpep_dropoff_datetime) as dropoff_date,
yellow_trip_data.passenger_count, yellow_trip_data.trip_distance, yellow_trip_data.RatecodeID, yellow_trip_data.store_and_fwd_flag,
yellow_trip_data.PULocationID, z1.zone as pickup_zone, z1.borough as pickup_borough, DOLocationID, z2.zone as dropoff_zone,
z2.borough as dropoff_borough, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount,
congestion_surcharge, airport_fee
from yellow_trip_data left join zone_data z1 on yellow_trip_data.PULocationID = z1.LocationID
left join zone_data z2 on yellow_trip_data.DOLocationID = z2.LocationID
"""

trip_data = spark.sql(join_query)
trip_data.registerTempTable("trip_data")

In [27]:
trip_data.show()

+--------+-----------+------------+---------------+-------------+----------+------------------+------------+--------------------+--------------+------------+--------------------+---------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|pickup_date|dropoff_date|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|         pickup_zone|pickup_borough|DOLocationID|        dropoff_zone|dropoff_borough|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+-----------+------------+---------------+-------------+----------+------------------+------------+--------------------+--------------+------------+--------------------+---------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01|

In [36]:
weather_nyc = spark.createDataFrame(weather_nyc)

ConnectionRefusedError: [Errno 111] Connection refused

In [33]:
query_weather_cab = """ select * from trip_data join weather_nyc on trip_data.pickup_date = weather_nyc.date """

weather_cab = spark.sql(query_weather_cab)
weather_cab.registerTempTable("weather_cab")

In [34]:
weather_cab = spark.createDataFrame(weather_cab.collect())
weather_cab.show()

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 45376)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-34-39421735548a>", line 1, in <cell line: 1>
    weather_cab = spark.createDataFrame(weather_cab.collect())
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/dataframe.py", line 1261, in collect
    sock_info = self._jdf.collectToPython()
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.10/dist-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.10/dist-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJav

Py4JError: An error occurred while calling None.None