Hey, I'm Jobert Gutierrez and hereafter you'll find the logic and code used to answer the fifth assignment in the program Data Engineering Zoomcamp offered by Data Talks Club.

# __Module 5 Homework: Batch processing__

In this homework we'll put what we learned about Spark in practice.

For this homework we will be using the Yellow 2024-10 data from the official website:

> wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

### __Question 1.Install Spark and PySpark__
- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.
- What's the output?

Note: 

To install PySpark follow this [guide](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/05-batch/setup/pyspark.md)

### Answer: 
Using the code:

In [None]:
import pyspark
pyspark.__version__

The version of pyspark installed is __'3.3.2'__.

### __Question 2.Yellow October 2024__
Read the October 2024 Yellow into a Spark Dataframe.

Repartition the Dataframe to 4 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

- 6MB
- 25MB
- 75MB
- 100MB

### Answer: 
I first read the file for yellow trips in octobert 2024 as *!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-09.parquet* and the read into spark with the command:

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("homework") \
    .getOrCreate()

!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

df = df.repartition(4)

df.write.parquet('yellow/2024/10/')

Where the file is partitioned into 4 elements of __24.2 MB__ each.

### __Question 3.Count records__
How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

- 85,567
- 105,567
- 125,567
- 145,567

### Answer: 
Then, I read again the partitions and printed the schemas as follows:

In [None]:
df = spark.read.parquet('yellow/2024/10/')

df.printSchema()

The changed the types of the columns wrongly interpreted accordingly:

In [None]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('tpep_pickup_datetime', types.TimestampType(), True),
    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True),
    types.StructField('passenger_count', types.LongType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('RatecodeID', types.LongType(), True),
    types.StructField('store_and_fwd_flag', types.StringType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('payment_type', types.IntegerType(), True),
    types.StructField('fare_amount', types.DoubleType(), True),
    types.StructField('extra', types.DoubleType(), True),
    types.StructField('mta_tax', types.DoubleType(), True),
    types.StructField('tip_amount', types.DoubleType(), True),
    types.StructField('tolls_amount', types.DoubleType(), True),
    types.StructField('improvement_surcharge', types.DoubleType(), True),
    types.StructField('total_amount', types.DoubleType(), True),
    types.StructField('congestion_surcharge', types.DoubleType(), True),
    types.StructField('Airport_fee', types.DoubleType(), True)
])

df = spark.read \
    .schema(schema).parquet('yellow/2024/10/')

from pyspark.sql import functions as F

df = df.withColumn('tpep_pickup_date', F.to_date(df.tpep_pickup_datetime) )

df.filter(df.tpep_pickup_date == '2024-10-15').count()

Then the number of trips in october 15 is __128.893 trips__.

### __Question 4.Longest trip__

What is the length of the longest trip in the dataset in hours?

- 122
- 142
- 162
- 182

### Answer: 
To get the longest trip in hours I prepared the following query:

In [None]:
df.createOrReplaceTempView('yellow')

df_result = spark.sql("""
    select PULocationID, DOLocationID, ((unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime))/3600) as duration 
    from yellow
    where 
        tpep_dropoff_date >= '2024-10-01' and
        tpep_dropoff_date < '2024-11-01' and
        tpep_pickup_date >= '2024-10-01' and
        tpep_pickup_date < '2024-11-01'
    order by duration desc
    limit 3
""").show()

Then, the longest trip took __162.61 hours__.

### __Question 5.User Interface__

Spark’s User Interface which shows the application's dashboard runs on which local port?

### Answer: 
The port for Spark’s User Interface is 4040.

### __Question 6.Least frequent pickup location zone__

Load the zone lookup data into a temp view in Spark:

> wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

- Governor's Island/Ellis Island/Liberty Island
- Arden Heights
- Rikers Island
- Jamaica Bay


### Answer: 
Once I got the data from the link as *!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv* then I read the data and created a temporary table as:

In [None]:
zones = spark.read.csv('taxi_zone_lookup.csv', header = "True")

zones.createOrReplaceTempView('zones')

Then I used sparkSQL to answer the question as follows:

In [None]:
spark.sql("""
SELECT 
    PUlocationID, zone, count(*) as count
FROM
    yellow
INNER JOIN
    zones
ON 
    PULocationID == LocationID
GROUP BY 
    PUlocationID, Zone
ORDER BY
    count
LIMIT 7
""").show()

Then, the least frequent pick-up location is __Governor's Island/Ellis Island/Liberty Island__.