In [1]:
from pyspark.sql import SparkSession, DataFrame

In [3]:
spark = SparkSession.builder.appName('Homework5').getOrCreate()

In [4]:
spark.version

'3.3.2'

In [8]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-03-05 10:18:49--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230305T101849Z&X-Amz-Expires=300&X-Amz-Signature=4b56f6bbdb3d8ee376efea43684c7f2238779e11f571d4af25d601a4b9f9564f&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-05 10:18:49--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [9]:
!wc -l fhvhv_tripdata_2021-06.csv.gz

651315 fhvhv_tripdata_2021-06.csv.gz


In [10]:
df = spark.read.option('header', 'true').csv('fhvhv_tripdata_2021-06.csv.gz')

In [None]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [17]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

In [18]:
schema = StructType([
    StructField('dispatching_base_num', StringType(), True),
    StructField('pickup_datetime', TimestampType(), True),
    StructField('dropoff_datetime', TimestampType(), True),
    StructField('PULocationID', IntegerType(), True),
    StructField('DOLocationID', IntegerType(), True),
    StructField('SR_Flag', StringType(), True),
    StructField('Affiliated_base_number', StringType(), True)
])

In [19]:
df = spark.read.option('header', 'true').schema(schema).csv('fhvhv_tripdata_2021-06.csv.gz')

In [20]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [22]:
df = df.repartition(12)

In [24]:
df.write.parquet('fhvhv_tripdata_2021-06')

In [29]:
from pyspark.sql import functions as F

In [34]:
df = (df
         .withColumn('pickup_date', F.to_date('pickup_datetime'))
         .withColumn('dropoff_date', F.to_date('dropoff_datetime'))
)

In [35]:
df.filter(df.pickup_date == '2021-06-15').count()

452470

In [42]:
df = df.withColumn('diffInHours', (F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 3600)

In [44]:
df.select(F.max('diffInHours')).show()

+----------------+
|max(diffInHours)|
+----------------+
|66.8788888888889|
+----------------+



In [46]:
df.registerTempTable('df')

In [47]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

--2023-03-05 11:41:53--  https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.54.206, 65.9.54.100, 65.9.54.106, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.54.206|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [text/csv]
Saving to: ‘taxi+_zone_lookup.csv’


2023-03-05 11:41:54 (2.22 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [48]:
zones = spark.read.option('header', 'true').csv('taxi+_zone_lookup.csv')

In [49]:
zones.registerTempTable('zones')

In [50]:
zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [52]:
spark.sql("""
SELECT zones.Zone, count(*)
FROM df
    JOIN zones
        ON df.PULocationID = zones.LocationID
GROUP BY zones.Zone
ORDER BY count(*) desc
LIMIT 1
""").show()

+-------------------+--------+
|               Zone|count(1)|
+-------------------+--------+
|Crown Heights North|  231279|
+-------------------+--------+

