In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [4]:
spark = SparkSession.builder \
            .master("local[*]") \
            .appName('test') \
            .getOrCreate()


In [6]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-06.parquet

--2023-03-07 16:09:26--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-06.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.67.246.186, 18.67.246.47, 18.67.246.176, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.67.246.186|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 394114750 (376M) [application/x-www-form-urlencoded]
Saving to: 'fhvhv_tripdata_2021-06.parquet'

     0K .......... .......... .......... .......... ..........  0% 2,68M 2m20s
    50K .......... .......... .......... .......... ..........  0% 2,30M 2m32s
   100K .......... .......... .......... .......... ..........  0% 12,3M 1m51s
   150K .......... .......... .......... .......... ..........  0% 11,1M 92s
   200K .......... .......... .......... .......... ..........  0% 3,01M 99s
   250K .......... .......... .......... .......... ..........  0% 13,8M 87s
   300K .......... .......... .......... ..

In [57]:
df = spark.read \
    .option("header", "true") \
    .csv("fhv_tripdata_2021-06.csv")

In [58]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [59]:
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.LongType(), True),
    types.StructField("DOLocationID", types.LongType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [60]:
df_fhv = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv("fhv_tripdata_2021-06.csv")

In [61]:
df_fhv.show(1)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 1 row



In [67]:
df_fhv \
    .repartition(12) \
    .write.parquet("data", mode="overwrite")

In [62]:

from pyspark.sql import functions as F

df_fhv_15_jun = df_fhv \
    .filter(F.date_trunc("day", "pickup_datetime") == "2021-06-15") 

df_fhv_15_jun.count()

452470

In [68]:

df_fhv.createOrReplaceTempView("df_fhv")

df_sql = spark.sql("""
SELECT 
    COUNT(1) AS number_records
FROM
    df_fhv
WHERE date_trunc('day', pickup_datetime) == "2021-06-15"
""")

df_sql.show()

+--------------+
|number_records|
+--------------+
|        452470|
+--------------+



In [126]:
from pyspark.sql import functions as F

df_fhv_15_jun = df_fhv \
     .withColumn("diff", (unix_timestamp("dropoff_datetime")-unix_timestamp("pickup_datetime"))/3600) \
     .orderBy(col("diff").desc())

df_fhv_15_jun.select("pickup_datetime","dropoff_datetime","diff").show(10)

+-------------------+-------------------+------------------+
|    pickup_datetime|   dropoff_datetime|              diff|
+-------------------+-------------------+------------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|  66.8788888888889|
|2021-06-22 12:09:45|2021-06-23 13:42:44|25.549722222222222|
|2021-06-27 10:32:29|2021-06-28 06:31:20|19.980833333333333|
|2021-06-26 22:37:11|2021-06-27 16:49:01|18.197222222222223|
|2021-06-23 20:40:43|2021-06-24 13:08:44|16.466944444444444|
|2021-06-23 22:03:31|2021-06-24 12:19:39|14.268888888888888|
|2021-06-24 23:11:00|2021-06-25 13:05:35|13.909722222222221|
|2021-06-04 20:56:02|2021-06-05 08:36:14|             11.67|
|2021-06-27 07:45:19|2021-06-27 19:07:16|11.365833333333333|
|2021-06-20 17:05:12|2021-06-21 04:04:16|10.984444444444444|
+-------------------+-------------------+------------------+
only showing top 10 rows



In [125]:

df_longest = spark.sql("""
SELECT 
    pickup_datetime, dropoff_datetime,
    (unix_timestamp(dropoff_datetime)-unix_timestamp(pickup_datetime))/(3600) as diff 
FROM
    df_fhv
    order by 3 desc
""")

df_longest.show(10)

+-------------------+-------------------+------------------+
|    pickup_datetime|   dropoff_datetime|              diff|
+-------------------+-------------------+------------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|  66.8788888888889|
|2021-06-22 12:09:45|2021-06-23 13:42:44|25.549722222222222|
|2021-06-27 10:32:29|2021-06-28 06:31:20|19.980833333333333|
|2021-06-26 22:37:11|2021-06-27 16:49:01|18.197222222222223|
|2021-06-23 20:40:43|2021-06-24 13:08:44|16.466944444444444|
|2021-06-23 22:03:31|2021-06-24 12:19:39|14.268888888888888|
|2021-06-24 23:11:00|2021-06-25 13:05:35|13.909722222222221|
|2021-06-04 20:56:02|2021-06-05 08:36:14|             11.67|
|2021-06-27 07:45:19|2021-06-27 19:07:16|11.365833333333333|
|2021-06-20 17:05:12|2021-06-21 04:04:16|10.984444444444444|
+-------------------+-------------------+------------------+
only showing top 10 rows



In [137]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('../code/taxi+_zone_lookup.csv')


df_zones = df_zones.withColumnRenamed('LocationID', 'PULocationID') 

df_zones.show(10)

+------------+-------------+--------------------+------------+
|PULocationID|      Borough|                Zone|service_zone|
+------------+-------------+--------------------+------------+
|           1|          EWR|      Newark Airport|         EWR|
|           2|       Queens|         Jamaica Bay|   Boro Zone|
|           3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|           4|    Manhattan|       Alphabet City| Yellow Zone|
|           5|Staten Island|       Arden Heights|   Boro Zone|
|           6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|           7|       Queens|             Astoria|   Boro Zone|
|           8|       Queens|        Astoria Park|   Boro Zone|
|           9|       Queens|          Auburndale|   Boro Zone|
|          10|       Queens|        Baisley Park|   Boro Zone|
+------------+-------------+--------------------+------------+
only showing top 10 rows



In [142]:
df_join = df_fhv.join(df_zones.select("PULocationID","Zone"), on=['PULocationID'], how='inner')
df_join.show(10)

+------------+--------------------+-------------------+-------------------+------------+-------+----------------------+-------------------+
|PULocationID|dispatching_base_num|    pickup_datetime|   dropoff_datetime|DOLocationID|SR_Flag|Affiliated_base_number|               Zone|
+------------+--------------------+-------------------+-------------------+------------+-------+----------------------+-------------------+
|         174|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|          18|      N|                B02764|            Norwood|
|          32|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|         254|      N|                B02764|          Bronxdale|
|         240|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         127|      N|                B02764| Van Cortlandt Park|
|         127|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         235|      N|                B02764|             Inwood|
|         144|      

In [147]:

df_join.createOrReplaceTempView("df_join")

df_freq_pu_zones = spark.sql("""
SELECT 
    count(1), zone
FROM
    df_join
GROUP BY zone
ORDER BY 1 DESC
""")

df_freq_pu_zones.show(10)

+--------+--------------------+
|count(1)|                zone|
+--------+--------------------+
|  231279| Crown Heights North|
|  221244|        East Village|
|  188867|         JFK Airport|
|  187929|      Bushwick South|
|  186780|       East New York|
|  164344|TriBeCa/Civic Center|
|  161596|   LaGuardia Airport|
|  158937|            Union Sq|
|  154698|        West Village|
|  152493|             Astoria|
+--------+--------------------+
only showing top 10 rows



In [176]:
df_join \
    .groupBy("zone") \
    .agg({ "*": "count"}) \
    .withColumnRenamed("count(1)", "amount") \
    .orderBy(col("amount"), ascending=False) \
    .show()


+--------------------+------+
|                zone|amount|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
|     Lower East Side|151020|
|        East Chelsea|147673|
|Central Harlem North|146402|
|Williamsburg (Nor...|143683|
|          Park Slope|143594|
|  Stuyvesant Heights|141427|
|        Clinton East|139611|
|West Chelsea/Huds...|139431|
|             Bedford|138428|
|         Murray Hill|137879|
+--------------------+------+
only showing top 20 rows

