In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

25/03/11 21:47:42 WARN Utils: Your hostname, DESKTOP-2TKP1T7 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/11 21:47:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/11 21:47:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [82]:
input_path = 'yellow_tripdata_2024-10.parquet'
output_path = 'data/pq/yellow/2024/10'
df_yellow = spark.read.parquet(input_path)
df_yellow = df_yellow.dropna(how="all")
df_yellow.repartition(4).write.parquet(output_path,mode="overwrite")

                                                                                

In [70]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [83]:
df_yellow.createOrReplaceTempView('yellow_tripdata_2024')

In [84]:
trips_count = spark.sql("""WITH day_tripdata AS( 
                            SELECT date_trunc('day', tpep_pickup_datetime) as pickup_date,
                                   date_trunc('day', tpep_dropoff_datetime) as dropoff_date
                             FROM yellow_tripdata_2024
                             ) 
                             SELECT COUNT(*) 
                             FROM day_tripdata 
                             WHERE pickup_date = '2024-10-15 00:00:00'
                        """
                        )
trips_count.show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [85]:
trips_count = spark.sql("""WITH day_tripdata AS( 
                                SELECT date_trunc('hour', tpep_pickup_datetime) as pickup_date,
                                       date_trunc('hour', tpep_dropoff_datetime) as dropoff_date,
                                       trip_distance
                                 FROM yellow_tripdata_2024
                             ),
                             trip_hours AS (
                                 SELECT 
                                     (UNIX_TIMESTAMP(dropoff_date) - UNIX_TIMESTAMP(pickup_date)) / 3600 AS trip_hour
                                 FROM 
                                     day_tripdata
                             )
                             SELECT 
                                 MAX(trip_hour)
                             FROM trip_hours
                        """
                        )
trips_count.show()

+--------------+
|max(trip_hour)|
+--------------+
|         162.0|
+--------------+



                                                                                

In [11]:
df_taxi_zone = spark.read \
            .option("header", "true") \
            .csv('taxi_zone_lookup.csv')
df_taxi_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [14]:
df_taxi_zone.createOrReplaceTempView('taxi_zone_lookup')

In [86]:
df_join = df_yellow.join(df_taxi_zone, df_taxi_zone.LocationID == df_yellow.PULocationID, how='inner')\
                    .select(df_taxi_zone.Zone)
                    

In [87]:
df_join.createOrReplaceTempView('taxi_zone_join')

In [89]:
df_least_frequent_zone = spark.sql("""
                                        WITH taxi_pickup_zone_count AS (
                                            SELECT 
                                                COUNT(*) as pickup_zone_count, 
                                                Zone
                                            FROM
                                                taxi_zone_join
                                            GROUP BY
                                                Zone
                                        )
                                        SELECT 
                                            *
                                        FROM 
                                            taxi_pickup_zone_count
                                        ORDER BY pickup_zone_count ASC
                                    """
                                  ).show()

+-----------------+--------------------+
|pickup_zone_count|                Zone|
+-----------------+--------------------+
|                1|Governor's Island...|
|                2|       Rikers Island|
|                2|       Arden Heights|
|                3|         Jamaica Bay|
|                3| Green-Wood Cemetery|
|                4|Charleston/Totten...|
|                4|   Rossville/Woodrow|
|                4|       West Brighton|
|                4|Eltingville/Annad...|
|                4|       Port Richmond|
|                6|         Great Kills|
|                6|        Crotona Park|
|                7|Heartland Village...|
|                7|     Mariners Harbor|
|                9|Saint George/New ...|
|                9|             Oakwood|
|               10|       Broad Channel|
|               10|New Dorp/Midland ...|
|               12|         Westerleigh|
|               12|     Pelham Bay Park|
+-----------------+--------------------+
only showing top