
In this homework we'll put what we learned about Spark in practice.

For this homework we will be using the FHVHV 2021-06 data found here. [FHVHV Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz )


### Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?
- 3.3.2
- 2.1.4
- 1.2.3
- 5.4

> <span style="color:red">'3.3.2'</span>

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.version

/usr/local/lib/python3.9/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/27 04:09:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.3.2'

### Question 2: 

**HVFHW June 2021**

Read it with Spark using the same schema as we did in the lessons.</br> 
We will use this dataset for all the remaining questions.</br>
Repartition it to 12 partitions and save it to parquet.</br>
What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.</br>


- 2MB
- 24MB
- 100MB
- 250MB

> <span style="color:red">24 MB</span>

In [5]:
!head -n 1001 fhvhv_tripdata_2021-06.csv > head.csv

In [18]:
def read_df(data_set = 'fhvhv_tripdata_2021-06.csv', schema=None, test=False):
    
    if test is True:
        data_set = 'head.csv'
    if schema is not None:
        df = spark.read \
        .schema(schema) \
        .option("header", "true") \
        .csv(data_set)
    else:
        df = spark.read \
        .option("header", "true") \
        .csv(data_set)
    return df

In [19]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ])

In [20]:
df = read_df(schema=schema)

In [21]:
df.count()

                                                                                

14961892

In [10]:
from pyspark.sql import types

In [23]:
df = df.repartition(12)

In [77]:
df.write.parquet('fhvhv/2021/06/', mode='overwrite')

                                                                                

In [79]:
!ls -ltrh fhvhv/2021/06/*.parquet

-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00003-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00001-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00002-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00000-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00004-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00007-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00005-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 user user 23M Feb 26 21:07 fhvhv/2021/06/part-00006-6ace8957-737c-412c-9806-6a9b98affc0e-c000.snappy.parquet
-rw-r--r-- 1 use

### Question 3: 

**Count records**  

How many taxi trips were there on June 15?</br></br>
Consider only trips that started on June 15.</br>

- 308,164
- 12,856
- 452,470
- 50,982

> <span style="color:red">452470</span>

In [24]:
from pyspark.sql import functions as F

In [25]:

df = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) 

In [26]:
df.count()

                                                                                

14961892

In [260]:
df \
    .filter(df.pickup_date == '2021-06-15') \
    .count()

                                                                                

452470

### Question 4: 

**Longest trip for each day**  

Now calculate the duration for each trip.</br>
How long was the longest trip in Hours?</br>

- 66.87 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours

> <span style="color:red">66.87</span>

In [178]:
# from datetime import datetime

# def calc_trip_duration_in_h(pickup_datetime, dropoff_datetime):
# #     delta = datetime.fromisoformat(dropoff_datetime) - datetime.fromisoformat(pickup_datetime)
#     delta = dropoff_datetime - pickup_datetime
#     return (delta.total_seconds()/3600)

# trip_duration_udf = F.udf(calc_trip_duration, returnType=types.FloatType())

In [168]:
# df = read_df(test=True)
# p = df.select('pickup_datetime').take(1)[0].asDict()['pickup_datetime']
# d = df.select('dropoff_datetime').take(1)[0].asDict()['dropoff_datetime']

In [261]:
timeFmt = "yyyy-MM-dd HH:mm:ss"
duration = ((F.unix_timestamp('dropoff_datetime', format=timeFmt)
            - F.unix_timestamp('pickup_datetime', format=timeFmt))/3600)

# df \
#     .withColumn('pickup_datetime', F.to_timestamp(df.pickup_datetime)) \
#     .withColumn('dropoff_datetime', F.to_timestamp(df.dropoff_datetime)) \


df = df.withColumn("Duration", duration)
max_duration = df.agg({"Duration": "max"}).collect()[0]
max_duration[0]


                                                                                

66.8788888888889


### Question 5: 

**User Interface**

 Spark’s User Interface which shows application's dashboard runs on which local port?</br>

- 80
- 443
- 4040
- 8080

> <span style="color:red">4040</span>

### Question 6: 

**Most frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)</br>

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?</br>

- East Chelsea
- Astoria
- Union Sq
- Crown Heights North

> <span style="color:red">Crown Heights North</span>

In [272]:
!ls

fhvhv			    get_data.sh  taxi+_zone_lookup.csv	tets
fhvhv_tripdata_2021-06.csv  head.csv	 taxi_zone_lookup.csv	week5.ipynb


In [27]:
zones = read_df(data_set='taxi_zone_lookup.csv')

In [28]:
zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [29]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number',
 'pickup_date',
 'dropoff_date']

In [30]:
zones.select('LocationID').take(5)

[Row(LocationID='1'),
 Row(LocationID='2'),
 Row(LocationID='3'),
 Row(LocationID='4'),
 Row(LocationID='5')]

In [33]:
zones.take(1)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR')]

In [31]:
df.select('PULocationID').take(5)

                                                                                

[Row(PULocationID=196),
 Row(PULocationID=196),
 Row(PULocationID=196),
 Row(PULocationID=196),
 Row(PULocationID=196)]

In [38]:
df_zones = df.join(zones, zones.LocationID == df.PULocationID)

In [55]:
df_zones_counts = df_zones.groupBy("Zone").count()

In [57]:
df_zones_counts.filter(df_zones_counts.Zone.isin(["East Chelsea","Astoria","Union Sq", "Crown Heights North"])).show()



+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
|           Union Sq|158937|
|            Astoria|152493|
|       East Chelsea|147673|
+-------------------+------+



                                                                                