## Question 1.
### Install Spark, Run PySpark, Create a local spark session, Execute spark.version.
#### What's the output?

In [1]:
# Import pyspark library
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

25/03/06 21:42:57 WARN Utils: Your hostname, cutty-sandbox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/06 21:42:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 21:42:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/06 21:42:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
spark.version

'3.5.5'

## Question 2
### Read the October 2024 Yellow into a Spark Dataframe. Repartition the Dataframe to 4 partitions and save it to parquet.
#### What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)?

In [3]:
# Download the October 2024 Yellow dataset
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-06 21:42:59--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.72, 54.230.209.126, 54.230.209.200, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.72|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-06 21:43:03 (17.9 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [4]:
# Read the October 2024 Yellow into a Spark Dataframe
df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

                                                                                

In [5]:
# Repartition the Dataframe to 4 partitions and save it to parquet.
df = df.repartition(4)

In [6]:
df.write.parquet('yellow/2024/10', mode='overwrite')

                                                                                

In [7]:
!ls -lh yellow/2024/10

total 90M
-rw-r--r-- 1 cutty cutty 23M Mar  6 21:43 part-00000-66395b91-795d-46c0-9e44-d995113eb852-c000.snappy.parquet
-rw-r--r-- 1 cutty cutty 23M Mar  6 21:43 part-00001-66395b91-795d-46c0-9e44-d995113eb852-c000.snappy.parquet
-rw-r--r-- 1 cutty cutty 23M Mar  6 21:43 part-00002-66395b91-795d-46c0-9e44-d995113eb852-c000.snappy.parquet
-rw-r--r-- 1 cutty cutty 23M Mar  6 21:43 part-00003-66395b91-795d-46c0-9e44-d995113eb852-c000.snappy.parquet
-rw-r--r-- 1 cutty cutty   0 Mar  6 21:43 _SUCCESS


## Question 3.
### Consider only trips that started on the 15th of October.
#### How many taxi trips were there on the 15th of October?

In [8]:
df = spark.read.parquet('yellow/2024/10')

In [9]:
from pyspark.sql.functions import *
df.filter(to_date(df.tpep_pickup_datetime) == '2024-10-15') \
    .count()

128893

In [10]:
df.createOrReplaceTempView('yellow_tripdata')

In [11]:
spark.sql("""
    SELECT 
        count(*)
    FROM
        yellow_tripdata
    WHERE
        date(tpep_pickup_datetime) == '2024-10-15'
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



## Question 4.
#### What is the length of the longest trip in the dataset in hours?

In [12]:
df2 = df.withColumn('tpep_dropoff_datetime', to_timestamp(col('tpep_dropoff_datetime'))) \
    .withColumn('tpep_pickup_datetime', to_timestamp(col('tpep_pickup_datetime'))) \
    .withColumn('trip_in_hours', (col('tpep_dropoff_datetime').cast('long') - col('tpep_pickup_datetime').cast('long'))/3600)

df2.select(max(df2.trip_in_hours)) \
    .show()



+------------------+
|max(trip_in_hours)|
+------------------+
|162.61777777777777|
+------------------+



                                                                                

In [13]:
spark.sql("""
    WITH datetimestamping AS (
        SELECT
            CAST(tpep_dropoff_datetime AS TIMESTAMP) AS dropoff_datetimestamp,
            CAST(tpep_pickup_datetime AS TIMESTAMP) AS pickup_datetimestamp
        FROM 
            yellow_tripdata
    ),
    tripsinhours AS (
        SELECT
            ((CAST(dropoff_datetimestamp AS INT) - CAST(pickup_datetimestamp AS INT))/3600) AS trip_in_hours
        FROM
           datetimestamping 
    )
    
    SELECT
        MAX(trip_in_hours)
    FROM
        tripsinhours        
""").show()



+------------------+
|max(trip_in_hours)|
+------------------+
|162.61777777777777|
+------------------+



                                                                                

## Question 6.
### Load the zone lookup data into a temp view in Spark: 
### wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
#### Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

In [14]:
# Download the zone lookup dataset
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-06 21:43:28--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.200, 54.230.209.72, 54.230.209.126, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.200|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-06 21:43:29 (2.67 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [15]:
# Read the zone lookup into a Spark Dataframe
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [16]:
df_zones.write.parquet('zones', mode='overwrite')

In [17]:
df_yellow = spark.read.parquet('yellow/2024/10')

In [18]:
df_zones = spark.read.parquet('zones')

In [19]:
df_zones.createOrReplaceTempView('taxi_zones')

In [20]:
spark.sql("""
    WITH yellow_taxi_zones AS (
        SELECT 
            y.PULocationID,
            t.Zone
        FROM
            yellow_tripdata y
            JOIN taxi_zones t
            ON y.PULocationID = t.LocationID
        WHERE
            service_zone LIKE '%Yell% Zone'
    ),
    zone_pickup_frequency AS (
        SELECT
            Zone as pickup_zone, COUNT(*) AS pickup_frequency
        FROM
            yellow_taxi_zones
        GROUP BY
            1
    ),
    least_frequent_pickup AS (
        SELECT
            MIN(pickup_frequency) least_pickup
        FROM
            zone_pickup_frequency
    )

    SELECT
        z.pickup_zone
    FROM
        zone_pickup_frequency z,
        least_frequent_pickup l
    WHERE
        z.pickup_frequency = l.least_pickup
""").show()



+--------------------+
|         pickup_zone|
+--------------------+
|Governor's Island...|
+--------------------+



                                                                                