In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types
from pyspark.sql.functions import col, unix_timestamp

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/04 20:22:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.5.5'

In [4]:
df = spark.read \
    .parquet('yellow_tripdata_2024-10.parquet')

                                                                                

In [5]:
df = df.repartition(4)

In [7]:
df.write.parquet('pq/', mode="overwrite")

                                                                                

In [8]:
import os

In [9]:
def get_average_parquet_size(directory):
    parquet_files = [f for f in os.listdir(directory) if f.endswith(".parquet")]
    
    if not parquet_files:
        return 0  
    
    total_size = sum(os.path.getsize(os.path.join(directory, f)) for f in parquet_files)
    average_size = total_size / len(parquet_files)
    
    return average_size

In [10]:
directory = "pq/"
average_size = get_average_parquet_size(directory)
print(f"Average size of Parquet files: {average_size / (1024 * 1024):.2f} MB")

Average size of Parquet files: 23.04 MB


In [13]:
df_date = df \
    .withColumn("pickup_date", F.to_date(df.tpep_pickup_datetime)) 

In [14]:
df_count = df_date\
    .filter(df_date.pickup_date == "2024-10-15") \
    .count()
df_count

128893

In [15]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [16]:
df_trip = df \
    .withColumn('trip_duration', (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600) \
    .select('trip_duration').show()

                                                                                

+--------------------+
|       trip_duration|
+--------------------+
|  0.5202777777777777|
|0.050555555555555555|
| 0.19777777777777777|
|  0.8927777777777778|
| 0.09944444444444445|
|  0.3713888888888889|
| 0.11777777777777777|
| 0.20666666666666667|
|  0.6480555555555556|
|  0.2452777777777778|
|  0.2747222222222222|
| 0.16944444444444445|
|  0.5566666666666666|
| 0.13361111111111112|
|0.050833333333333335|
|  0.6613888888888889|
|  0.1688888888888889|
|0.001944444444444...|
| 0.12861111111111112|
|              0.1575|
+--------------------+
only showing top 20 rows



In [17]:
df_trip = df \
    .withColumn('trip_duration', (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600) \
    .select('trip_duration') \
    .orderBy(col('trip_duration').desc()) \
    .first()
df_trip

                                                                                

Row(trip_duration=162.61777777777777)

In [42]:
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [43]:
zones = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('taxi_zone_lookup.csv') 

In [45]:
zones.createOrReplaceTempView('zones')
df.createOrReplaceTempView('df')

In [46]:
spark.sql("""
with df_zones as (
    select * 
    from df
    inner join zones on df.PULocationID = zones.LocationID)
select 
    df_zones.Zone,
    count(*) as zone_count
from df_zones
group by df_zones.Zone
order by zone_count
""").show()



+--------------------+----------+
|                Zone|zone_count|
+--------------------+----------+
|Governor's Island...|         1|
|       Rikers Island|         2|
|       Arden Heights|         2|
| Green-Wood Cemetery|         3|
|         Jamaica Bay|         3|
|Charleston/Totten...|         4|
|   Rossville/Woodrow|         4|
|       West Brighton|         4|
|       Port Richmond|         4|
|Eltingville/Annad...|         4|
|         Great Kills|         6|
|        Crotona Park|         6|
|Heartland Village...|         7|
|     Mariners Harbor|         7|
|Saint George/New ...|         9|
|             Oakwood|         9|
|       Broad Channel|        10|
|New Dorp/Midland ...|        10|
|         Westerleigh|        12|
|     Pelham Bay Park|        12|
+--------------------+----------+
only showing top 20 rows



                                                                                