In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/04 20:14:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Question 1 Pyspark Version

In [3]:
pyspark.__version__

'3.5.0'

# Question 2 Repartitioning into Parquet files

In [4]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-04 20:24:21--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240304%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240304T202421Z&X-Amz-Expires=300&X-Amz-Signature=fa70f0f7f22370dac8a595bd132df5245001dd84e6bb9d21be032cf77e317f5f&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-04 20:24:21--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6

In [6]:
df_fhv_2019_10 = spark.read.option("header","True").option("inferSchema","True").csv('fhv_tripdata_2019-10.csv.gz')

                                                                                

In [8]:
df_fhv_2019_10.repartition(6).write.parquet('data/pq/fhv/2019/10/')

                                                                                

In [23]:
import os
directory = 'data/pq/fhv/2019/10/'
size = 0
count = 0
for filename in os.listdir(directory):
    if filename.endswith('parquet'):
        filepath = os.path.join(directory,filename)
        size += os.path.getsize(filepath)
        count+=1
avg_size_mb = (size/count/1024**2)
print(f'The average size of the parquet files is {round(avg_size_mb,2)} MB')

The average size of the parquet files is 6.35 MB


# Question 3 Longest trip on the 15th

In [30]:
from pyspark.sql.functions import to_date
target_date = '2019-10-15'
df_fhv_2019_10.select('pickup_datetime').filter(to_date(df_fhv_2019_10.pickup_datetime) == target_date).count()

                                                                                

62610

# Question 4 Length of the longest trip

In [46]:
from pyspark.sql.functions import unix_timestamp

In [51]:
df_with_duration = df_fhv_2019_10.select(
    df_fhv_2019_10.pickup_datetime,
    df_fhv_2019_10.dropOff_datetime,
    ((unix_timestamp(df_fhv_2019_10.dropOff_datetime) - unix_timestamp(df_fhv_2019_10.pickup_datetime)) / 60/60).alias('duration_hours')
)
df_with_duration.orderBy(df_with_duration.duration_hours.desc()).show(truncate=False)

[Stage 24:>                                                         (0 + 1) / 1]

+-------------------+-------------------+------------------+
|pickup_datetime    |dropOff_datetime   |duration_hours    |
+-------------------+-------------------+------------------+
|2019-10-11 18:00:00|2091-10-11 18:30:00|631152.5          |
|2019-10-28 09:00:00|2091-10-28 09:30:00|631152.5          |
|2019-10-31 23:46:33|2029-11-01 00:13:00|87672.44083333334 |
|2019-10-01 21:43:42|2027-10-01 21:45:23|70128.02805555557 |
|2019-10-17 14:00:00|2020-10-18 00:00:00|8794.0            |
|2019-10-26 21:26:00|2020-10-26 21:36:00|8784.166666666666 |
|2019-10-30 12:30:04|2019-12-30 13:02:08|1464.5344444444445|
|2019-10-25 07:04:57|2019-12-08 07:54:33|1056.8266666666666|
|2019-10-25 07:04:57|2019-12-08 07:21:11|1056.2705555555556|
|2019-10-01 13:47:17|2019-11-03 15:20:28|793.5530555555556 |
|2019-10-01 07:21:12|2019-11-03 08:44:21|793.3858333333334 |
|2019-10-01 13:41:00|2019-11-03 14:58:51|793.2975          |
|2019-10-01 18:43:20|2019-11-03 19:43:13|792.9980555555555 |
|2019-10-01 18:43:46|201

                                                                                

# Question 5 What Port does spark run on

In [52]:
spark_ui_port = spark.sparkContext.uiWebUrl.split(':')[-1]
print(f"Spark UI is running on port: {spark_ui_port}")

Spark UI is running on port: 4040


# Question 6 Least Frequent Pickup Location zone

In [58]:
df_zones = spark.read.option("header","True").option("inferSchema","True").csv('taxi+_zone_lookup.csv')

In [67]:
df_join =df_fhv_2019_10.join(df_zones,df_fhv_2019_10.PUlocationID == df_zones.LocationID)

In [79]:
df_join.groupBy(df_join.Zone).count().alias('total_count').orderBy("count").show()

[Stage 61:>                                                         (0 + 1) / 1]

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows



                                                                                