In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [8]:
spark.version

'3.4.0'

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2025-03-05 22:32:44--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250306T033244Z&X-Amz-Expires=300&X-Amz-Signature=1442269966b1099fcdbc45227dd4aff36d816ae1e1905ed0dfb7e9477cd933e3&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2025-03-05 22:32:44--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorit

In [16]:
# !gzip -dc fhv_tripdata_2019-10.csv.gz

In [9]:
!wc -l fhv_tripdata_2019-10.csv

 1897494 fhv_tripdata_2019-10.csv


In [11]:
from pyspark.sql import types

In [12]:
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PUlocationID", types.IntegerType(), True),
    types.StructField("DOlocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [14]:
df = df.repartition(6)

In [15]:
df.write.parquet('fhv/2019/10/')

## Q3

In [17]:
from pyspark.sql import functions as F

In [18]:
df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [19]:
df.registerTempTable('fhv_data')



In [23]:
spark.sql("""
SELECT
    count(1)
FROM
    fhv_data
where to_date(pickup_datetime) = '2019-10-15'
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



## Q4

In [29]:
spark.sql("""
SELECT
    to_date(pickup_datetime) as pickup_date,
    max((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/3600) as time_diff_hour
FROM
    fhv_data
group by 1
order by time_diff_hour desc
limit 10
""").show()

+-----------+------------------+
|pickup_date|    time_diff_hour|
+-----------+------------------+
| 2019-10-11|          631152.5|
| 2019-10-28|          631152.5|
| 2019-10-31| 87672.44083333333|
| 2019-10-01| 70128.02805555555|
| 2019-10-17|            8794.0|
| 2019-10-26| 8784.166666666666|
| 2019-10-30|1465.5344444444445|
| 2019-10-25|1057.8266666666666|
| 2019-10-02| 770.2313888888889|
| 2019-10-23| 746.6166666666667|
+-----------+------------------+



## Q6

In [30]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2025-03-05 23:06:55--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250306T040655Z&X-Amz-Expires=300&X-Amz-Signature=b3f8fcef385cd81a417fdb8eb83bf4493c6d9d820afe939b79c354e4851d308b&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2025-03-05 23:06:55--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-

In [31]:
zone_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)
])

In [32]:
df_zone = spark.read \
    .option("header", "true") \
    .schema(zone_schema) \
    .csv('taxi_zone_lookup.csv')

In [33]:
df_zone.registerTempTable('zones')

In [35]:
spark.sql("""
SELECT
    a.PUlocationID,
    b.Zone as pickup_zone,
    count(*) as frequency
FROM
    fhv_data a
left join zones b on a.PUlocationID = b.LocationID
group by 1,2
order by frequency
limit 10

""").show()

+------------+--------------------+---------+
|PUlocationID|         pickup_zone|frequency|
+------------+--------------------+---------+
|           2|         Jamaica Bay|        1|
|         105|Governor's Island...|        2|
|         111| Green-Wood Cemetery|        5|
|          30|       Broad Channel|        8|
|         120|     Highbridge Park|       14|
|          12|        Battery Park|       15|
|         207|Saint Michaels Ce...|       23|
|          27|Breezy Point/Fort...|       25|
|         154|Marine Park/Floyd...|       26|
|           8|        Astoria Park|       29|
+------------+--------------------+---------+

