In [36]:
import requests
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import to_date, col, unix_timestamp, max, count, asc

In [7]:
spark.version

'3.5.0'

In [8]:
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet"
local_path = "/tmp/downloaded_file.parquet"

In [9]:
with open(local_path, 'wb') as f:
    f.write(requests.get(url).content)

In [10]:
spark = SparkSession.builder \
    .appName("HomeWork5") \
    .getOrCreate()

In [11]:
df = spark.read.parquet(local_path)

In [12]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [13]:
df_repartitioned = df.repartition(4)

In [14]:
output_path = "/tmp/yellow_tripdata.parquet"
df_repartitioned.write.mode("overwrite").parquet(output_path)

In [18]:
files = [f for f in os.listdir(output_path) if f.endswith(".parquet")]
total_size = sum(os.path.getsize(os.path.join(output_path, f)) for f in files)
average_size = total_size / len(files) if files else 0

In [24]:
print(f"Average size of the file: {average_size / (1024 * 1024):.2f} MB")

Average size of the file: 22.39 MB


In [26]:
count_records = df.filter(to_date(df.tpep_pickup_datetime) == "2024-10-15").count()

In [27]:
print(f"Number of records with pickup date 2024-10-15: {count_records}")

Number of records with pickup date 2024-10-15: 128893


In [29]:
df_trip_duration_hours = df.withColumn("trip_duration_hours", 
                   (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600)

In [31]:
max_trip_duration = df_trip_duration_hours.select(max("trip_duration_hours")).collect()[0][0]

In [32]:
print(f"The longest trip lasted: {max_trip_duration:.2f} hours")

The longest trip lasted: 162.62 hours


In [34]:
# Spark’s User Interface which shows the application's dashboard runs on which local port?
# 4040

In [39]:
zone_lookup_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
zone_lookup_path = "/tmp/taxi_zone_lookup.csv"

with open(zone_lookup_path, 'wb') as f:
    f.write(requests.get(zone_lookup_url).content)

In [40]:
zone_lookup_df = spark.read.option("header", "true").csv(zone_lookup_path)

In [41]:
pickup_counts = df.groupBy("PULocationID").agg(count("*").alias("trip_count"))

In [42]:
pickup_counts_with_zones = pickup_counts.join(zone_lookup_df, pickup_counts.PULocationID == zone_lookup_df.LocationID, "left")

In [43]:
least_frequent_zone = pickup_counts_with_zones.orderBy(asc("trip_count")).select("Zone", "trip_count").limit(1)

In [44]:
least_frequent_zone.show()

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|Governor's Island...|         1|
+--------------------+----------+

