In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [14]:
 !wget wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-04 22:02:29--  http://wget/
Resolving wget (wget)... failed: Name or service not known.
wget: unable to resolve host address ‘wget’
--2024-03-04 22:02:30--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240304%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240304T200230Z&X-Amz-Expires=300&X-Amz-Signature=4aa36e61772619c258c58dca7a8293ea378bb727872d5db7db72528b76104461&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-str

In [30]:
from pyspark.sql import types
schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ]
    )
df_data = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

df_data.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [16]:
df_data = df_data.repartition(6)
df_data.write.parquet("fhv_parquet")

                                                                                

In [23]:
# get average size of parquet files
import os

sizes = []

# Iterate over each file
dir = os.path.join(os.getcwd(), "fhv_parquet")
for filename in os.listdir(dir):
    # Check if the file has the desired extension
    if filename.endswith('.parquet'):
        # Get the full file path
        file_path = os.path.join(dir, filename)
        # Get the file size in bytes
        file_size_bytes = os.path.getsize(file_path)
        # Convert the size to megabytes
        file_size_mb = file_size_bytes / (1024 * 1024)
        # Append the file name and size to the list
        sizes.append(file_size_mb)

print(sum(sizes)/len(sizes))


6.450909614562988


In [34]:
# count trip data at 15th Oct

df_data.registerTempTable("trips")

spark.sql("""
select count(*) from trips 
where trips.pickup_datetime 
between '2019-10-15 00:00:00' and '2019-10-15 23:59:59'
""").show()

[Stage 19:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

In [40]:
# get longest trip
spark.sql("""
select max(unix_timestamp(trips.dropOff_datetime) - unix_timestamp(trips.pickup_datetime)) / 3600 from trips 

""").show()

[Stage 34:>                                                         (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------------------------------+
|(max((unix_timestamp(dropOff_datetime, yyyy-MM-dd HH:mm:ss) - unix_timestamp(pickup_datetime, yyyy-MM-dd HH:mm:ss))) / 3600)|
+----------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                    631152.5|
+----------------------------------------------------------------------------------------------------------------------------+



                                                                                

In [41]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-04 22:35:51--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240304%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240304T203551Z&X-Amz-Expires=300&X-Amz-Signature=fbffb0034e9613d241fa77f33a86f708f686951070e22a8de4c9b298581d7abe&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-04 22:35:52--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [42]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [43]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [44]:
df_joined = df_data.join(df_zones, df_data.PUlocationID == df_zones.LocationID)

In [46]:
df_zones.registerTempTable("zones")



In [58]:
spark.sql("""
select cnt, loc_id , zones.Zone from (
select count(*) as cnt, trips.PUlocationID as loc_id 
from trips
group by trips.PUlocationID
) as data
left join zones on data.loc_id = zones.LocationID
order by cnt
limit 1

""").show()

[Stage 56:>                                                         (0 + 1) / 1]

+---+------+-----------+
|cnt|loc_id|       Zone|
+---+------+-----------+
|  1|     2|Jamaica Bay|
+---+------+-----------+



                                                                                