## Question 1: Install Spark and PySpark

In [61]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
import os

# Create SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

print(f'The PySpark {spark.version} version is running...')

The PySpark 3.5.4 version is running...


## Question 2: Yellow October 2024

In [62]:
# load data
# assume data saved locally in current working directory
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

In [63]:
# Repartition the Dataframe to 4 partitions and save it to parquet
main_dir = 'yellow_2024_04'
df = df.repartition(4)
df.write.mode("overwrite").parquet(main_dir)

                                                                                

In [64]:
# get file size of partition data
parquet_files = [f for f in os.listdir(main_dir) if f.endswith('.parquet')]
for i in parquet_files:
    size_mb = os.path.getsize(f'{main_dir}/{i}') / (1024 * 1024)
    print(size_mb)


22.409375190734863
22.38893699645996
22.388922691345215
22.404443740844727


## Question 3: Count records

In [65]:
# taxi trips that start in 10/15/2024
df.createOrReplaceTempView("trips")
spark.sql("""SELECT COUNT(*)
             from trips
             WHERE DATE(tpep_pickup_datetime)  = '2024-10-15'
         """).show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



## Question 4: Longest Trip

In [66]:
df_unix = df \
            .withColumn('pickup_unix', F.unix_timestamp(df.tpep_pickup_datetime) ) \
            .withColumn('dropoff_unix', F.unix_timestamp(df.tpep_dropoff_datetime) )
df_trip_dur = df_unix \
                    .withColumn('trip_hour', (df_unix.dropoff_unix - df_unix.pickup_unix) / 3600)
df_trip_dur.createOrReplaceTempView("trips_dur")
spark.sql("""
          SELECT VendorID, trip_hour
          FROM trips_dur
          ORDER BY trip_hour DESC
          LIMIT 1
          """).show()

+--------+------------------+
|VendorID|         trip_hour|
+--------+------------------+
|       2|162.61777777777777|
+--------+------------------+



## Question 6: Least frequent pickup location zone

In [68]:
# load lookup data (assumes saved in working directory)
df_lookup = spark.read.option("header", "true").csv('taxi_zone_lookup.csv')
df_lookup_clean = df_lookup \
                        .withColumn("lo_id", F.col("LocationID").cast(IntegerType()) )
df_lookup_clean.createOrReplaceTempView("lookup")

In [70]:
df_lookup_clean.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True), StructField('lo_id', IntegerType(), True)])

In [55]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [80]:
spark.sql("""
         SELECT l.zone, COUNT(*)
         FROM trips as t
         LEFT JOIN lookup as l
         ON t.PULocationID = l.lo_id
         GROUP BY l.zone
         ORDER BY COUNT(*)
         LIMIT 1
         """).show(truncate = False)

+---------------------------------------------+--------+
|zone                                         |count(1)|
+---------------------------------------------+--------+
|Governor's Island/Ellis Island/Liberty Island|1       |
+---------------------------------------------+--------+

