### Question 1: Install Spark and PySpark
- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.
- What's the output?

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

#     .config("spark.sql.warehouse.dir", "C:/Users/kamen/Documents/Data_Engineering_Course/my_tests/week_5") \
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [2]:
# Question 1
spark.version

'3.5.4'

### Question 2: Yellow October 2024
Read the October 2024 Yellow into a Spark Dataframe.

Repartition the Dataframe to 4 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

- 6MB
- 25MB
- 75MB
- 100MB

In [5]:
os.makedirs("data_parquet", exist_ok=True)

In [6]:
!curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet -o data_parquet/yellow_tripdata_2024-10.parquet

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
 12 61.3M   12 8063k    0     0  17.1M      0  0:00:03 --:--:--  0:00:03 17.1M
 71 61.3M   71 43.9M    0     0  30.1M      0  0:00:02  0:00:01  0:00:01 30.1M
100 61.3M  100 61.3M    0     0  31.7M      0  0:00:01  0:00:01 --:--:-- 31.7M


In [7]:
df = spark.read.parquet('data_parquet/')
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [8]:
df = df.repartition(4)
df.write.mode("overwrite").parquet('data_parquet_output')

In [9]:
# Question 2
from pathlib import Path

folder_path = Path("data_parquet_output")  # Change this to your folder path

for file in folder_path.iterdir():
    if file.is_file() and file.name[-8:]=='.parquet':
        print(f"{file.name}: {file.stat().st_size/1024/1024} MB")


part-00000-61c497e7-e4a7-45db-88d0-f0e18496805d-c000.snappy.parquet: 22.386794090270996 MB
part-00001-61c497e7-e4a7-45db-88d0-f0e18496805d-c000.snappy.parquet: 22.397467613220215 MB
part-00002-61c497e7-e4a7-45db-88d0-f0e18496805d-c000.snappy.parquet: 22.38918972015381 MB
part-00003-61c497e7-e4a7-45db-88d0-f0e18496805d-c000.snappy.parquet: 22.411956787109375 MB


### Question 3: Count records
How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

- 85,567
- 105,567
- 125,567
- 145,567

In [10]:
# Question 3
from pyspark.sql import functions as F

df\
.filter(F.to_date(df.tpep_pickup_datetime) == '2024-10-15') \
.count()

128893

### Question 4: Longest trip
What is the length of the longest trip in the dataset in hours?

- 122
- 142
- 162
- 182

In [11]:
from pyspark.sql import types

In [12]:
# Quesstion 4
df \
    .withColumn('hours', (df.tpep_dropoff_datetime - df.tpep_pickup_datetime).cast("long")/3600) \
    .select('hours', 'tpep_dropoff_datetime', 'tpep_pickup_datetime')\
    .orderBy('hours', ascending=False)\
    .limit(1)\
    .show()

+------------------+---------------------+--------------------+
|             hours|tpep_dropoff_datetime|tpep_pickup_datetime|
+------------------+---------------------+--------------------+
|162.61777777777777|  2024-10-23 07:40:53| 2024-10-16 13:03:49|
+------------------+---------------------+--------------------+



### Question 5: User Interface
Spark’s User Interface which shows the application's dashboard runs on which local port?

- 80
- 443
- 4040
- 8080

# Question 5
![Alt Text](q5.png)


### Question 6: Least frequent pickup location zone
Load the zone lookup data into a temp view in Spark:

wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

- Governor's Island/Ellis Island/Liberty Island
- Arden Heights
- Rikers Island
- Jamaica Bay

In [13]:
folder_path = "data_csv"
# Create folder if it doesn't exist
os.makedirs(folder_path, exist_ok=True)

In [14]:
! curl https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv -o data_csv/taxi_zone_lookup.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 12331  100 12331    0     0  47742      0 --:--:-- --:--:-- --:--:-- 47980


In [15]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('data_csv/taxi_zone_lookup.csv')

In [16]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [17]:
df_result = df.join(df_zones, df.PULocationID == df_zones.LocationID)


In [18]:
# Question 6
df_result.groupBy('Zone').count().orderBy('count').limit(5).show()

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|Governor's Island...|    1|
|       Rikers Island|    2|
|       Arden Heights|    2|
|         Jamaica Bay|    3|
| Green-Wood Cemetery|    3|
+--------------------+-----+

