In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/02 16:38:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.3.2'

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
from pyspark.sql import types

In [5]:
schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True), 
        types.StructField('pickup_datetime', types.TimestampType(), True), 
        types.StructField('dropOff_datetime', types.TimestampType(), True), 
        types.StructField('PUlocationID', types.StringType(), True), 
        types.StructField('DOlocationID', types.StringType(), True), 
        types.StructField('SR_Flag', types.StringType(), True), 
        types.StructField('Affiliated_base_number', types.StringType(), True)
    ])

In [24]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv.csv')

In [25]:
df = df.repartition(6)

In [26]:
df.write.parquet('fhv/')

                                                                                

In [6]:
# read parquet 

fhv_df = spark.read.parquet('fhv/')

                                                                                

In [7]:
fhv_df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: string (nullable = true)
 |-- DOlocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [8]:
from pyspark.sql import functions as F

In [9]:
fhv_df.show()

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02849|2019-10-03 15:27:55|2019-10-03 16:06:10|         264|         257|   null|                B02849|
|              B01730|2019-10-07 05:48:04|2019-10-07 06:28:12|         264|         223|   null|                B02096|
|              B03060|2019-10-04 00:06:39|2019-10-04 00:23:08|         264|         165|   null|                B02888|
|              B02905|2019-10-05 07:40:02|2019-10-05 07:52:00|         264|          62|   null|                B02905|
|              B02509|2019-10-08 13:16:43|2019-10-08 13:35:42|         264|          18|   null|                B02872|
|     B01711         |2019-10-02 22:38:4

In [10]:
filtered_trips = fhv_df.filter(F.col("pickup_datetime").cast("date") == "2019-10-15")

In [11]:
# Count the number of taxi trips
trip_count = filtered_trips.count()


                                                                                

In [12]:
print(trip_count)

62610


In [30]:
fhv_df_with_duration = fhv_df.withColumn(
    "trip_duration_hours",
    (F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime")) / 3600
)


In [31]:
max_duration = fhv_df_with_duration.agg({"trip_duration_hours": "max"}).collect()[0][0]

In [32]:
max_duration

631152.5

In [33]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-02 15:17:53--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.207.73.82
Connecting to github.com (github.com)|20.207.73.82|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240302%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240302T151753Z&X-Amz-Expires=300&X-Amz-Signature=fcdfeaff731865d29ffd468663a739fa4991a925cf265cc07893ef3b0c22f1ee&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-02 15:17:53--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [10]:
df_zones = spark.read.csv('taxi_zone_lookup.csv', header=True)

In [11]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [12]:
df_zones.registerTempTable('zones')



In [13]:
spark.sql("""
SELECT * FROM zones limit 5
""").show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+



In [14]:
fhv_df.registerTempTable('fhv_data')

In [20]:
spark.sql("""
SELECT Zone, COUNT(1) as trips
FROM 
    fhv_data
LEFT JOIN 
    zones
ON
    fhv_data.PUlocationID = zones.LocationID
GROUP BY Zone
ORDER BY
    trips
""").show()



+--------------------+-----+
|                Zone|trips|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows



                                                                                