### Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?


In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
spark.version

'3.5.0'

### Question 2: 

**FHV October 2019**

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.


In [13]:
! download_fhv_data.sh fhv 2019

In [15]:
df_fhv = spark.read \
        .option("header", "true") \
        .csv('data/raw/fhv/2019/10/')

In [16]:
df_fhv.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [18]:
from pyspark.sql import types
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)]
)

In [19]:
df_fhv = spark.read \
        .option("header", "true") \
        .schema(fhv_schema) \
        .csv('data/raw/fhv/2019/10/')

In [20]:
df_fhv = df_fhv.repartition(6)


In [None]:
df_fhv.write.parquet('fhvhv/2019/10/')

In [25]:
df_fhv_parquet = spark.read.parquet('fhvhv/2019/10/')

In [35]:
!dir "fhvhv\2019\10\*.parquet" /s /-c

 Le volume dans le lecteur C n'a pas de nom.
 Le num‚ro de s‚rie du volume est 144C-EFC3

 R‚pertoire de C:\Users\asma-\Documents\Dev\dt_data_engineering_zoomcamp\05-batch\Homework\fhvhv\2019\10

04/03/2024  20:10           6664698 part-00000-76180920-6c87-4dc1-8a63-33b1f806da02-c000.snappy.parquet
04/03/2024  20:10           6657082 part-00001-76180920-6c87-4dc1-8a63-33b1f806da02-c000.snappy.parquet
04/03/2024  20:10           6665057 part-00002-76180920-6c87-4dc1-8a63-33b1f806da02-c000.snappy.parquet
04/03/2024  20:10           6665082 part-00003-76180920-6c87-4dc1-8a63-33b1f806da02-c000.snappy.parquet
04/03/2024  20:10           6665016 part-00004-76180920-6c87-4dc1-8a63-33b1f806da02-c000.snappy.parquet
04/03/2024  20:10           6662655 part-00005-76180920-6c87-4dc1-8a63-33b1f806da02-c000.snappy.parquet
               6 fichier(s)         39979590 octets

     Total des fichiers list‚sÿ:
               6 fichier(s)         39979590 octets
               0 R‚p(s)    140500303872 oc

### Question 3: 

**Count records** 

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of Oc0
- 62,610

In [46]:
from pyspark.sql import functions as F
df_fhv_parquet.filter(F.to_date(df_fhv_parquet.pickup_datetime) == F.lit('2019-10-15')).count()

62610

### Question 4: 

**Longest trip for each day** 

What is the length of the longest trip in the dataset in hors?


In [76]:
df_fhv_parquet.registerTempTable('trips_data')



In [95]:
df_result = spark.sql(
    """
    SELECT 
        pickup_datetime, 
        MAX(TIMESTAMPDIFF(HOUR, pickup_datetime, dropOff_datetime)) AS hour_difference
    FROM trips_data
    GROUP BY 1
    ORDER BY 2 DESC
    """
)

In [96]:
df_result.show()

+-------------------+---------------+
|    pickup_datetime|hour_difference|
+-------------------+---------------+
|2019-10-28 09:00:00|         631152|
|2019-10-11 18:00:00|         631152|
|2019-10-31 23:46:33|          87672|
|2019-10-01 21:43:42|          70128|
|2019-10-17 14:00:00|           8794|
|2019-10-26 21:26:00|           8784|
|2019-10-30 12:30:04|           1464|
|2019-10-25 07:04:57|           1056|
|2019-10-01 07:21:12|            793|
|2019-10-01 13:47:17|            793|
|2019-10-01 13:41:00|            793|
|2019-10-01 06:04:13|            792|
|2019-10-01 06:54:57|            792|
|2019-10-01 10:24:04|            792|
|2019-10-01 12:37:49|            792|
|2019-10-01 09:06:55|            792|
|2019-10-01 14:55:36|            792|
|2019-10-01 17:39:45|            792|
|2019-10-01 05:41:00|            792|
|2019-10-01 12:02:44|            792|
+-------------------+---------------+
only showing top 20 rows



### Question 5: 

**User Interface**

Spark’s User Interface which shows the application's dashboard runs on which local po
- 4040
rt?


### Question 6: 

**Least frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?</br>

In [100]:
df_z = spark.read.parquet('zones/')

In [104]:
df_zones.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [101]:
df_zones.registerTempTable('zones')

In [109]:
df_result = spark.sql(
    """
    SELECT 
        Zone, 
        count(*)
    FROM trips_data
    INNER JOIN zones
    on LocationID = PULocationID
    group by 1
    order by 2 asc
    """
)

In [110]:
df_result.show()

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows

