In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [11]:
# H5Q1
spark.version

'3.3.2'

In [2]:
df_fhvhv = spark.read.parquet('fhvhv/2021/06')

In [3]:
df_fhvhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02765|2021-06-01 08:08:00|2021-06-01 08:23:45|         236|         100|      N|                B02765|
|              B02835|2021-06-03 16:42:36|2021-06-03 16:57:33|         119|          78|      N|                B02835|
|              B02765|2021-06-03 06:28:12|2021-06-03 06:51:04|         231|          42|      N|                B02765|
|              B02887|2021-06-02 17:48:58|2021-06-02 18:05:18|          33|         144|      N|                B02887|
|              B02510|2021-06-04 12:38:39|2021-06-04 13:22:22|         219|          85|      N|                  null|
|              B02866|2021-06-04 01:42:5

In [4]:
df_fhvhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



Useful function if you have two df of similar data (like green and yellow taxi trips) and would like to union them on the common list of columns you can use:
- to see list of columns of each df: df_green.columns
- to compare two df columns: set(df_green.columns) & set(df_yellow.columns)

Below is function to create a common column list:

In [None]:
# common_columns = []

# yellow_columns = set(df.yellow.columns)

# for col in df_green.columns:
#     if col in yellow_columns:
#         common_columns.append(col)

In [5]:
from pyspark.sql import functions as F

In [6]:
# to see distribution by date
df_fhvhv.groupBy(F.to_date('pickup_datetime')).count().show()

+------------------------+------+
|to_date(pickup_datetime)| count|
+------------------------+------+
|              2021-06-22|469568|
|              2021-06-04|538917|
|              2021-06-20|491630|
|              2021-06-27|509437|
|              2021-06-28|425909|
|              2021-06-01|417375|
|              2021-06-17|497133|
|              2021-06-13|509039|
|              2021-06-19|601189|
|              2021-06-02|457339|
|              2021-06-08|462554|
|              2021-06-26|592505|
|              2021-06-09|483353|
|              2021-06-14|426672|
|              2021-06-29|456586|
|              2021-06-07|425771|
|              2021-06-06|522753|
|              2021-06-11|549286|
|              2021-06-18|540056|
|              2021-06-21|419024|
+------------------------+------+
only showing top 20 rows



In [7]:
# To use actual SQL with Spark, we need to first define the table:
df_fhvhv.registerTempTable('trips_data')



In [None]:
# If I would like to create new columns for the date made out of the timestamp, I can use the code below:

# df_fhvhv \
#     .withColumn('pickup_date', F.to_date(df_fhvhv.pickup_datetime)) \
#     .withColumn('dropoff_date', F.to_date(df_fhvhv.dropoff_datetime)) \
#     .select('dispatching_base_num','pickup_datetime', 'pickup_date', 'dropoff_datetime', 'dropoff_date', 'PULocationID', 'DOLocationID''SR_Flag', 'Affiliated_base_number') \
#     .show()

In [12]:
# H5Q3:

df_h5q3 = spark.sql("""
SELECT 
    date_trunc('day', pickup_datetime) AS trip_day, 
    count(*) AS num_rides
FROM
    trips_data
GROUP BY 1
""").show()

+-------------------+---------+
|           trip_day|num_rides|
+-------------------+---------+
|2021-06-03 00:00:00|   521408|
|2021-06-06 00:00:00|   522753|
|2021-06-07 00:00:00|   425771|
|2021-06-27 00:00:00|   509437|
|2021-06-01 00:00:00|   417375|
|2021-06-10 00:00:00|   504108|
|2021-06-21 00:00:00|   419024|
|2021-06-15 00:00:00|   452470|
|2021-06-24 00:00:00|   500596|
|2021-06-23 00:00:00|   474599|
|2021-06-17 00:00:00|   497133|
|2021-06-19 00:00:00|   601189|
|2021-06-05 00:00:00|   604903|
|2021-06-12 00:00:00|   591339|
|2021-06-20 00:00:00|   491630|
|2021-06-28 00:00:00|   425909|
|2021-06-16 00:00:00|   479776|
|2021-06-11 00:00:00|   549286|
|2021-06-13 00:00:00|   509039|
|2021-06-18 00:00:00|   540056|
+-------------------+---------+
only showing top 20 rows



In [53]:
# Create a column representing duration of the ride in seconds

new_df = df_fhvhv \
    .withColumn('hours_between', (df_fhvhv.dropoff_datetime - df_fhvhv.pickup_datetime)/360) \
    .select('dispatching_base_num','pickup_datetime', 'dropoff_datetime', 'hours_between', 'PULocationID', 'DOLocationID', 'SR_Flag', 'Affiliated_base_number') 

In [54]:
new_df.show()

+--------------------+-------------------+-------------------+--------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|       hours_between|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+--------------------+------------+------------+-------+----------------------+
|              B02765|2021-06-01 08:08:00|2021-06-01 08:23:45|INTERVAL '0 00:00...|         236|         100|      N|                B02765|
|              B02835|2021-06-03 16:42:36|2021-06-03 16:57:33|INTERVAL '0 00:00...|         119|          78|      N|                B02835|
|              B02765|2021-06-03 06:28:12|2021-06-03 06:51:04|INTERVAL '0 00:00...|         231|          42|      N|                B02765|
|              B02887|2021-06-02 17:48:58|2021-06-02 18:05:18|INTERVAL '0 00:00...|          33|         144|      N|                B02887|
|            

In [55]:
new_df.registerTempTable('trips_data_new')

In [56]:
# H5Q4:

spark.sql("""
SELECT 
    date_trunc('day', pickup_datetime) AS trip_day, 
    hours_between
FROM
    trips_data_new
ORDER BY 2 DESC
LIMIT 5
""").show()

+-------------------+--------------------+
|           trip_day|       hours_between|
+-------------------+--------------------+
|2021-06-25 00:00:00|INTERVAL '0 00:11...|
|2021-06-22 00:00:00|INTERVAL '0 00:04...|
|2021-06-27 00:00:00|INTERVAL '0 00:03...|
|2021-06-26 00:00:00|INTERVAL '0 00:03...|
|2021-06-23 00:00:00|INTERVAL '0 00:02...|
+-------------------+--------------------+



In [57]:
# H5Q4:

spark.sql("""
SELECT 
    date_trunc('day', pickup_datetime) AS trip_day, 
    timestampdiff(hour, pickup_datetime, dropoff_datetime) AS hours_between
FROM
    trips_data
ORDER BY 2 DESC
LIMIT 5
""").show()

+-------------------+-------------+
|           trip_day|hours_between|
+-------------------+-------------+
|2021-06-25 00:00:00|           66|
|2021-06-22 00:00:00|           25|
|2021-06-27 00:00:00|           19|
|2021-06-26 00:00:00|           18|
|2021-06-23 00:00:00|           16|
+-------------------+-------------+



 Loading the Zone Lookup Data

In [59]:
import pandas as pd

In [60]:
# check what this file is composed of to create correct schema
df_zone=pd.read_csv("https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv")
df_zone

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
...,...,...,...,...
260,261,Manhattan,World Trade Center,Yellow Zone
261,262,Manhattan,Yorkville East,Yellow Zone
262,263,Manhattan,Yorkville West,Yellow Zone
263,264,Unknown,NV,


In [62]:
df_zone.dtypes

LocationID       int64
Borough         object
Zone            object
service_zone    object
dtype: object

In [61]:
from pyspark.sql import types

In [63]:
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [64]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-03-05 14:32:53--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230305T133300Z&X-Amz-Expires=300&X-Amz-Signature=cab7ac936489459e7ffa8f7e41bf6cf8080616a78ee296a32af599e37280ca6e&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-05 14:32:53--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [65]:
df_zone = spark.read \
    .option("header", "true") \
    .schema(zones_schema) \
    .csv('taxi_zone_lookup.csv')

In [68]:
df_zone.registerTempTable('zone_data')

In [67]:
df_fhvhv.registerTempTable('trips_data')

In [75]:
# H5Q6:

spark.sql("""
SELECT 
    zd.Zone, 
    count(*) AS pickup_freq
FROM
    trips_data td
    left join zone_data zd on td.PULocationID = zd.LocationID
GROUP BY 1 
ORDER BY 2 DESC
LIMIT 5
""").show()

+-------------------+-----------+
|               Zone|pickup_freq|
+-------------------+-----------+
|Crown Heights North|     231279|
|       East Village|     221244|
|        JFK Airport|     188867|
|     Bushwick South|     187929|
|      East New York|     186780|
+-------------------+-----------+

