In [1]:
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('testing') \
    .getOrCreate()

In [4]:
!pip install pandas
!pip install pyarrow
!pip install fastparquet



In [5]:
import pandas as pd
import pyarrow
import fastparquet
import urllib.request

In [7]:
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet"
filename = "fhvhv_tripdata_2021_02.parquet"
urllib.request.urlretrieve(url,filename)

('fhvhv_tripdata_2021-02.parquet', <http.client.HTTPMessage at 0x2475f9afa10>)

In [8]:
df = pd.read_parquet('fhvhv_tripdata_2021-02.parquet')
df.to_csv('fhvhv_tripdata_2021-02.csv')

In [9]:
df.isnull().sum()

hvfhs_license_num              0
dispatching_base_num           0
originating_base_num     3319132
request_datetime               1
on_scene_datetime        3318817
pickup_datetime                0
dropoff_datetime               0
PULocationID                   0
DOLocationID                   0
trip_miles                     0
trip_time                      0
base_passenger_fare            0
tolls                          0
bcf                            0
sales_tax                      0
congestion_surcharge           0
airport_fee             11613181
tips                           0
driver_pay                     0
shared_request_flag            0
shared_match_flag              0
access_a_ride_flag             0
wav_request_flag               0
wav_match_flag                 0
dtype: int64

In [11]:
df["originating_base_num"] = df["originating_base_num"].fillna("Nan Values")
df["on_scene_datetime"] = df["on_scene_datetime"].fillna("Nan Values")
df["airport_fee"] = df["airport_fee"].fillna("Nan Values")
df["request_datetime"] = df["request_datetime"].fillna("Nan Values")
df.isnull().sum()

hvfhs_license_num       0
dispatching_base_num    0
originating_base_num    0
request_datetime        0
on_scene_datetime       0
pickup_datetime         0
dropoff_datetime        0
PULocationID            0
DOLocationID            0
trip_miles              0
trip_time               0
base_passenger_fare     0
tolls                   0
bcf                     0
sales_tax               0
congestion_surcharge    0
airport_fee             0
tips                    0
driver_pay              0
shared_request_flag     0
shared_match_flag       0
access_a_ride_flag      0
wav_request_flag        0
wav_match_flag          0
dtype: int64

In [35]:
df1 = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-02.csv')

df1.show

<bound method DataFrame.show of DataFrame[_c0: string, hvfhs_license_num: string, dispatching_base_num: string, originating_base_num: string, request_datetime: string, on_scene_datetime: string, pickup_datetime: string, dropoff_datetime: string, PULocationID: string, DOLocationID: string, trip_miles: string, trip_time: string, base_passenger_fare: string, tolls: string, bcf: string, sales_tax: string, congestion_surcharge: string, airport_fee: string, tips: string, driver_pay: string, shared_request_flag: string, shared_match_flag: string, access_a_ride_flag: string, wav_request_flag: string, wav_match_flag: string]>

In [19]:
df.dtypes

hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime                object
on_scene_datetime               object
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
airport_fee                     object
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
wav_match_flag                  object
dtype: object

In [29]:
df1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: string (nullable = true)
 |-- on_scene_datetime: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- trip_miles: string (nullable = true)
 |-- trip_time: string (nullable = true)
 |-- base_passenger_fare: string (nullable = true)
 |-- tolls: string (nullable = true)
 |-- bcf: string (nullable = true)
 |-- sales_tax: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)
 |-- tips: string (nullable = true)
 |-- driver_pay: string (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- ac

In [42]:
from pyspark.sql.functions import col
filter_condition = col('pickup_datetime').contains('2021-02-15')
df_15feb = df1.filter(filter_condition)
df_15feb_count = df_15feb.count()

In [43]:
print(f"How many taxi trips were there on February 15? There are: {df_15feb_count} trips")

How many taxi trips were there on February 15? There are: 367170 trips


In [72]:
from pyspark.sql.functions import col, max

In [73]:
df_max_trip_miles = df1.groupBy(date_format(col('pickup_datetime'), 'yyyy-MM-dd') \
                                .alias('pickup_date')).agg(max(col('trip_miles')) \
                                .alias('max_trip_miles'))
print('Find the longest trip for each day?')
df_max_trip_miles.show()

Find the longest trip for each day?
+-----------+--------------+
|pickup_date|max_trip_miles|
+-----------+--------------+
| 2021-02-01|         99.67|
| 2021-02-02|         98.69|
| 2021-02-03|        99.891|
| 2021-02-04|         99.84|
| 2021-02-05|        99.807|
| 2021-02-06|         99.28|
| 2021-02-07|        99.378|
| 2021-02-08|         99.08|
| 2021-02-09|        99.647|
| 2021-02-10|        98.578|
| 2021-02-11|        99.875|
| 2021-02-12|         99.92|
| 2021-02-13|          99.2|
| 2021-02-14|          99.0|
| 2021-02-15|         99.99|
| 2021-02-16|         99.35|
| 2021-02-17|        99.786|
| 2021-02-18|         97.48|
| 2021-02-19|         98.58|
| 2021-02-20|         98.64|
+-----------+--------------+
only showing top 20 rows



In [65]:
dispatch = df1.groupBy(col('dispatching_base_num')).count()
top5_dispatch = dispatch.orderBy(col('count').desc()).limit(5)
top5_dispatch.show()

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
+--------------------+-------+



In [67]:
from pyspark.sql.functions import col, concat_ws, count

In [70]:
df_with_pair = df1.withColumn('location_pair', concat_ws('->', col('PUlocationID'), col('DOlocationID')))
df_location_pair = df_with_pair.groupBy('location_pair').agg(count('*').alias('count'))
top5_location_pair = df_location_pair.orderBy(col('count').desc()).limit(5)

In [71]:
print('Find Top 5 Most common location pairs (PUlocationID and DOlocationID)')
top5_location_pair.show()

+-------------+-----+
|location_pair|count|
+-------------+-----+
|       76->76|45041|
|       26->26|37329|
|       39->39|28026|
|       61->61|25976|
|       14->14|17934|
+-------------+-----+

