In [1]:
import pyspark
from pyspark.sql import SparkSession

In [20]:
spark.version

'3.3.2'

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/06 17:09:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
import pandas as pd

In [8]:
from pyspark.sql import types

In [5]:
pyspark.__version__

'3.3.2'

In [6]:
year = 2024
month=10

print(f'processing data for {year}/{month}')
input_path = f'yellow_tripdata_{year}-{month:02d}.parquet'
output_path = f'yellow_ride/{year}/{month:02d}/'
df_yellow = spark.read \
    .parquet(input_path)


df_yellow \
    .repartition(4) \
    .write.parquet(output_path)

processing data for 2024/10


                                                                                

In [9]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [10]:
import pyspark.sql.functions as F

In [11]:
(df_yellow.tpep_dropoff_datetime-df_yellow.tpep_pickup_datetime)

Column<'(tpep_dropoff_datetime - tpep_pickup_datetime)'>

In [13]:
df_yellow = df_yellow.withColumn('pickup_date', F.to_date(df_yellow.tpep_pickup_datetime))

In [14]:
df_yellow.filter(F.col('pickup_date') == '2024-10-15').count()

125567

In [15]:
df_yellow.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+
|       2| 2024-10-01 02:30:44|  2024-10-01 02:48:26|              1|          3.0|         1|                 N|         162|         246|           1

In [16]:
if 'trip_duration(hr)' in df_yellow.columns:
    df_yellow = df_yellow.drop('trip_duration(hr)')
df_yellow = df_yellow.withColumn('trip_duration_hr',  (F.unix_timestamp(F.col('tpep_dropoff_datetime')) - F.unix_timestamp(F.col('tpep_pickup_datetime'))) / 3600)

In [17]:
df_yellow.select(F.max('trip_duration_hr')).show()

+---------------------+
|max(trip_duration_hr)|
+---------------------+
|   162.61777777777777|
+---------------------+



In [18]:
# Get pickup locations ordered by count (ascending to find least frequent)
pickup_counts = df_yellow.groupBy('PULocationID').count().orderBy('count', ascending=True)
pickup_counts.show(5)  # Show the 5 least frequent pickup locations

+------------+-----+
|PULocationID|count|
+------------+-----+
|         105|    1|
|           5|    2|
|         199|    2|
|         111|    3|
|           2|    3|
+------------+-----+
only showing top 5 rows



In [19]:
# Load the taxi zone lookup table
taxi_lookup_df = pd.read_csv("taxi_zone_lookup.csv", header=0)

# Get the least frequent pickup location ID (first row from the ascending ordered dataframe)
least_frequent_location_id = pickup_counts.first()['PULocationID']

# Find the zone name for the least frequent pickup location
least_frequent_zone = taxi_lookup_df["Zone"].where(
    taxi_lookup_df["LocationID"] == least_frequent_location_id
).dropna().values[0]

print(f"The least frequent pickup location is: {least_frequent_zone} (ID: {least_frequent_location_id})")

The least frequent pickup location is: Governor's Island/Ellis Island/Liberty Island (ID: 105)
