In [None]:
# Setting up Pyspark

In [1]:
!pip install -q findspark
!pip install pyspark

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

Defaulting to user installation because normal site-packages is not writeable


In [19]:
spark = SparkSession.builder \
    .master('local') \
    .appName('SparkAnalytics') \
    .getOrCreate()

22/12/10 17:58:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
# Download Dataset

In [20]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

--2022-12-10 17:58:26--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.161.108.141, 18.161.108.231, 18.161.108.184, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.161.108.141|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet.1’


2022-12-10 17:58:31 (2.44 MB/s) - ‘fhv_tripdata_2021-02.parquet.1’ saved [10645466/10645466]



In [16]:
# Defining Table Schema
## Spark includes the ability to read and write from a large number of data sources using InferSchema, this will automatically guess the data types for each field.
## Howerever, you should use StructType to define the schema while reading a file for more efficient way to improve the Spark performance.

In [3]:
schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.DoubleType(), True),
        types.StructField('DOLocationID', types.DoubleType(), True),
        types.StructField('SR_Flag', types.IntegerType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True)
    ]
)

In [4]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .parquet('fhv_tripdata_2021-02.parquet')

In [5]:
df.show()



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 07:01:00|2021-02-01 08:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 07:55:40|2021-02-01 08:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 07:14:03|2021-02-01 07:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 07:27:48|2021-02-01 07:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 07:12:50|2021-02-01 07:26:38|        null|       225.0|   null|                B00037|
|              B00037|2021-02-01 07:00:3

                                                                                

In [6]:
# Data Analysis using Spark SQL

In [9]:
## createOrReplaceTempView() is used when you wanted to store the table for a specific spark session. Once created you can use it to run SQL queries.
df.createOrReplaceTempView("fhv_trip")

In [10]:
### How many taxi trips were there on February 15?

In [11]:
taxi_trips_15_feb = spark.sql("""
with trips_15_feb as 
(SELECT
    *
FROM 
    fhv_trip
WHERE
    to_date(pickup_datetime) = '2021-02-15'
)
SELECT
    COUNT(1) as taxi_trips_15_feb
FROM 
    trips_15_feb
WHERE
    to_date(pickup_datetime) = '2021-02-15'
""").show()



+-----------------+
|taxi_trips_15_feb|
+-----------------+
|            35709|
+-----------------+



                                                                                

In [15]:
### Find the longest trip for each day!

In [27]:
taxi_longest_trips = spark.sql("""
                              SELECT
                                  pickup_datetime, dropoff_datetime,
                                  ROUND(((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/3600),2) AS duration_in_hours
                              FROM fhv_trip
                              SORT BY
                                  duration_in_hours DESC
                              """)

taxi_longest_trips.show()



+-------------------+-------------------+-----------------+
|    pickup_datetime|   dropoff_datetime|duration_in_hours|
+-------------------+-------------------+-----------------+
|2021-02-05 01:45:00|2021-04-23 02:24:00|          1848.65|
|2021-02-01 15:00:00|2021-03-05 18:30:00|            771.5|
|2021-02-01 19:00:00|2021-03-05 18:29:00|           767.48|
|2021-02-25 22:00:00|2021-03-26 00:49:00|           674.82|
|2021-02-23 20:30:00|2021-03-23 21:02:00|           672.53|
|2021-02-04 00:25:00|2021-03-03 19:39:53|           667.25|
|2021-02-04 03:59:00|2021-02-24 04:31:15|           480.54|
|2021-02-04 03:59:00|2021-02-24 04:31:15|           480.54|
|2021-02-04 03:59:00|2021-02-24 04:31:15|           480.54|
|2021-02-27 19:00:00|2021-03-11 15:44:00|           284.73|
|2021-02-28 17:00:00|2021-03-11 15:43:00|           262.72|
|2021-02-28 19:00:00|2021-03-11 15:42:00|            260.7|
|2021-02-01 16:20:08|2021-02-12 00:43:59|            248.4|
|2021-02-15 21:20:52|2021-02-26 01:51:01

                                                                                

In [28]:
### Find Top 5 Most frequent `dispatching_base_num` ?

In [30]:
most_dispatching_base_num = spark.sql("""
    SELECT 
          dispatching_base_num,
          COUNT(1) as amount
    FROM 
          fhv_trip
    GROUP BY
          1
    ORDER BY
          2 DESC
    LIMIT 
          5
""")

most_dispatching_base_num.show()

+--------------------+------+
|dispatching_base_num|amount|
+--------------------+------+
|              B00856| 35077|
|              B01312| 33089|
|              B01145| 31114|
|              B02794| 30397|
|              B03016| 29794|
+--------------------+------+



In [31]:
### Find Top 5 Most common location pairs (PUlocationID and DOlocationID)
## Location name is in another file 'taxi_zone_lookup', we need to import in and join into main table

In [32]:
### Download Dataset

!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

--2022-12-10 18:45:01--  https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.161.108.77, 18.161.108.231, 18.161.108.141, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.161.108.77|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [text/csv]
Saving to: ‘taxi+_zone_lookup.csv’


2022-12-10 18:45:05 (222 KB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [35]:
### Defining Table Schema
taxi_zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [36]:
taxi_zones_df = spark.read.option('header', 'true').schema(taxi_zones_schema).csv('taxi+_zone_lookup.csv')

In [39]:
taxi_zones_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [43]:
### Schema for Pick Up Zone
Pickup_table = taxi_zones_df \
    .withColumnRenamed('Zone', 'PickUp_Zone') \
    .withColumnRenamed('LocationID', 'PickUp_Location_ID') \
    .withColumnRenamed('Borough', 'PickUP_Borough') \
    .drop('service_zone')

### Schema for Drop Off Zone
Dropoff_table = taxi_zones_df \
    .withColumnRenamed('Zone', 'DropOff_Zone') \
    .withColumnRenamed('LocationID', 'DropOff_Location_ID') \
    .withColumnRenamed('Borough', 'DropOff_Borough') \
    .drop('service_zone')

In [51]:
# Join fhv_trip table with the table taxi_zone table
join_test = df.join(Pickup_table, df.PULocationID == Pickup_table.PickUp_Location_ID)
df_join = join_test.join(Dropoff_table, join_test.DOLocationID == Dropoff_table.DropOff_Location_ID)

In [52]:
df_join.show(3)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+--------------+------------+-------------------+---------------+---------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|PickUp_Location_ID|PickUP_Borough| PickUp_Zone|DropOff_Location_ID|DropOff_Borough|   DropOff_Zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+--------------+------------+-------------------+---------------+---------------+
|     B00021         |2021-02-01 07:55:40|2021-02-01 08:06:20|       173.0|        82.0|   null|       B00021         |               173|        Queens|North Corona|                 82|         Queens|       Elmhurst|
|     B00021         |2021-02-01 07:14:03|2021-02-01 07:28:37|       173.0|        56.0|   null|       B00021         |     

In [53]:
df_join.createOrReplaceTempView("location_pairs")

In [55]:
location_pairs = spark.sql("""
SELECT
    CONCAT(coalesce(PickUp_Zone, 'Unknown'), '/', coalesce(DropOff_Zone, 'Unknown')) AS zone_pair,
    COUNT(1) as total_count
FROM
    location_pairs
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT
    5
;
""")

location_pairs.show()



+--------------------+-----------+
|           zone_pair|total_count|
+--------------------+-----------+
|Saint George/New ...|       2374|
|Stapleton/Saint G...|       2112|
|Jackson Heights/J...|       1902|
|     Astoria/Astoria|       1829|
|Old Astoria/Old A...|       1736|
+--------------------+-----------+



                                                                                