In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [4]:
!gunzip -vc fhv_tripdata_2019-10.csv.gz > fhv_tripdata_2019-10.csv

fhv_tripdata_2019-10.csv.gz:	 83.8%


In [None]:
!ls -la

In [6]:
spark.version

'3.3.2'

In [7]:
df = spark.read.csv('fhv_tripdata_2019-10.csv', header=True, inferSchema=True)

                                                                                

In [35]:
df = df.withColumn("dispatching_base_num", F.trim(df.dispatching_base_num)) \
    .withColumn("Affiliated_base_number", F.trim(df.Affiliated_base_number))

In [36]:
df = df.repartition(6)

In [37]:
df.write.parquet('fhv/2019/10', mode='overwrite')

                                                                                

In [38]:
df.show()



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00256|2019-10-23 08:19:12|2019-10-23 09:12:00|         264|         264|   null|                B00256|
|              B01717|2019-10-03 08:02:26|2019-10-03 08:27:01|         264|         244|   null|                B01717|
|              B01145|2019-10-01 10:55:00|2019-10-01 10:58:18|         264|         174|   null|                B02864|
|              B00900|2019-10-20 17:06:38|2019-10-20 17:27:41|         264|          37|   null|                B00900|
|              B01340|2019-10-11 08:26:05|2019-10-11 09:08:03|         264|         220|   null|                B01340|
|              B00254|2019-10-29 22:39:4

                                                                                

In [40]:
df.filter(to_date(df["pickup_datetime"]) == "2019-10-15").count()

                                                                                

62610

In [39]:
df.withColumn('trip_duration', (F.col('dropOff_datetime') - F.col('pickup_datetime')).cast("long") / 3600) \
    .orderBy(F.desc('trip_duration')).first()

                                                                                

Row(dispatching_base_num='B02832', pickup_datetime=datetime.datetime(2019, 10, 11, 19, 0), dropOff_datetime=datetime.datetime(2091, 10, 11, 19, 30), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B02832', trip_duration=631152.5)

In [41]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-03 19:54:01--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T175401Z&X-Amz-Expires=300&X-Amz-Signature=27a271dc351f2a71a22f08eaaeb5ebf25717468534bd9f3dfcc7879148bd2931&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-03 19:54:01--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [42]:
df_zones = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema=True)

In [43]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [49]:
df.groupBy('PUlocationID').count().join(df_zones, df.PUlocationID == df_zones.LocationID).orderBy(F.asc('count')).first()

                                                                                

Row(PUlocationID=2, count=1, LocationID=2, Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone')