# Week 5 Homework

## Question 1:
Install Spark and PySpark

* Install Spark
* Run PySpark
* Create a local spark session
* Execute spark.version.


What's the output?

In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
import pyspark
from pyspark.sql import SparkSession
import urllib.request
from pyspark.sql import types

In [None]:
urllib.request.urlretrieve('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz', 'fhv_tripdata_2019-10.csv.gz') 

In [3]:
spark = (SparkSession.builder
    .master("local[*]") 
    .appName('test') 
    .getOrCreate())

In [4]:
spark.version

'3.5.0'

## Question 2:
FHV October 2019

<br>Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.
<br>Repartition the Dataframe to 6 partitions and save it to parquet.
<br>What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [6]:
%%sh
ls -lh fhv_tripdata_2019-10.csv.gz

-rw-r--r-- 1 oheld 197609 19M Feb 19 20:46 fhv_tripdata_2019-10.csv.gz


In [33]:
%%sh 
zcat fhv_tripdata_2019-10.csv.gz | head -n 5 > head.csv

In [34]:
%%sh
cat head.csv

dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014


In [35]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.BooleanType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [37]:
df = (spark.read
.option("header", "true")
.schema(schema) 
.csv('fhv_tripdata_2019-10.csv.gz'))

df = df.repartition(6)

df.write.parquet('hw/fhv/2019/10/', compression="snappy")

In [38]:
%%sh
ls hw/fhv/2019/10 -lh

total 39M
-rw-r--r-- 1 oheld 197609    0 Feb 19 21:03 _SUCCESS
-rw-r--r-- 1 oheld 197609 6.4M Feb 19 21:03 part-00000-223f2dfe-312a-4910-aa9e-6ab17e78af88-c000.snappy.parquet
-rw-r--r-- 1 oheld 197609 6.4M Feb 19 21:03 part-00001-223f2dfe-312a-4910-aa9e-6ab17e78af88-c000.snappy.parquet
-rw-r--r-- 1 oheld 197609 6.4M Feb 19 21:03 part-00002-223f2dfe-312a-4910-aa9e-6ab17e78af88-c000.snappy.parquet
-rw-r--r-- 1 oheld 197609 6.4M Feb 19 21:03 part-00003-223f2dfe-312a-4910-aa9e-6ab17e78af88-c000.snappy.parquet
-rw-r--r-- 1 oheld 197609 6.4M Feb 19 21:03 part-00004-223f2dfe-312a-4910-aa9e-6ab17e78af88-c000.snappy.parquet
-rw-r--r-- 1 oheld 197609 6.4M Feb 19 21:03 part-00005-223f2dfe-312a-4910-aa9e-6ab17e78af88-c000.snappy.parquet


In [39]:
df = spark.read.parquet('hw/fhv/2019/10/')

## Question 3:
Count records
<br>How many taxi trips were there on the 15th of October?
<br>Consider only trips that started on the 15th of October.

In [13]:
from pyspark.sql import functions as F

In [40]:
(df
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) 
    .filter("pickup_date = '2019-10-15'") 
    .count())

62610

In [41]:
df.createOrReplaceTempView('fhv_2019_10')

In [42]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhv_2019_10
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



## Question 4:
Longest trip for each day
<br>What is the length of the longest trip in the dataset in hours?

In [43]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [57]:
(df 
    .withColumn('duration', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) / F.lit(60) / F.lit(60) )
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) 
    .groupBy('pickup_date') 
        .max('duration') 
    .orderBy('max(duration)', ascending=False)
    .limit(5) 
    .show())

+-----------+-----------------+
|pickup_date|    max(duration)|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333334|
| 2019-10-01|70128.02805555557|
| 2019-10-17|           8794.0|
+-----------+-----------------+



In [70]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60 / 60 ) AS duration
FROM 
    fhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

+-----------+-----------------+
|pickup_date|         duration|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333334|
| 2019-10-01|70128.02805555557|
| 2019-10-17|           8794.0|
+-----------+-----------------+



## Question 5: User Interface
Spark’s User Interface which shows the application's dashboard runs on which local port?

* 80
* 443
* **4040**
* 8080

## Question 6: Least frequent pickup location zone
Load the zone lookup data into a temp view in Spark (zone data)
<br> Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

In [58]:
urllib.request.urlretrieve('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv', 'taxi_zone_lookup.csv') 

('taxi_zone_lookup.csv', <http.client.HTTPMessage at 0x1a0365cc4a0>)

In [73]:
taxi_zone_df = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv('taxi_zone_lookup.csv'))

In [62]:
taxi_zone_df.createOrReplaceTempView('taxi_zone_df')

In [69]:
spark.sql("""
SELECT
    CONCAT(pul.Zone) AS pu_do_pair,
    COUNT(1)
FROM 
    fhv_2019_10 fhv LEFT JOIN taxi_zone_df pul ON fhv.PULocationID = pul.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show()
## Jamaica Bay

+--------------------+--------+
|          pu_do_pair|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+

