In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import *
import pyspark.sql.functions as F

In [10]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


### Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?

In [11]:
spark.version

'3.3.2'

**Ans:** 3.3.2

### Question 2: 

**HVFHW June 2021**

Read it with Spark using the same schema as we did in the lessons.</br> 
We will use this dataset for all the remaining questions.</br>
Repartition it to 12 partitions and save it to parquet.</br>
What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.</br>

In [12]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

In [13]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.DoubleType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [14]:
df_fhvhv = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [15]:
df_fhvhv = df_fhvhv.repartition(12)

In [16]:
df_fhvhv.write.parquet('fhvhv/2021/06')

                                                                                

In [17]:
!ls -lh fhvhv/2021/06/

total 284M
-rw-r--r-- 1 andre andre   0 Mar  7 05:15 _SUCCESS
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00000-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00001-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00002-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00003-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00004-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00005-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00006-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00007-9c1ecef2-31a3-45ab-92cb-4e16f1099e60-c000.snappy.parquet
-rw-r--r-- 1 andre andre 24M Mar  7 05:15 part-00008-9c1ec

**Ans:** 24MB

### Question 3: 

**Count records**  

How many taxi trips were there on June 15?</br>
Consider only trips that started on June 15.</br>

- 308,164
- 12,856
- 452,470
- 50,982

In [19]:
df_fhvhv.filter((df_fhvhv.pickup_datetime >= '2021-06-15') & (df_fhvhv.dropoff_datetime < '2021-06-16')).count()

                                                                                

446828

**Ans:** 452,270

### Question 4: 

**Longest trip for each day**  

Now calculate the duration for each trip.</br>
How long was the longest trip in Hours?</br>

- 66.87 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours

In [20]:
duration = col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")
df_fhvhv.withColumn( "duration", duration/ 3600. ).select("duration").sort("duration", ascending=False).head(10)

                                                                                

[Row(duration=66.8788888888889),
 Row(duration=25.549722222222222),
 Row(duration=19.980833333333333),
 Row(duration=18.197222222222223),
 Row(duration=16.466944444444444),
 Row(duration=14.268888888888888),
 Row(duration=13.909722222222221),
 Row(duration=11.67),
 Row(duration=11.365833333333333),
 Row(duration=10.984444444444444)]

**Ans:** 66.87 hours

### Question 6: 

**Most frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)</br>

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?</br>

- East Chelsea
- Astoria
- Union Sq
- Crown Heights North
</br></br>

In [None]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-03-07 05:09:15--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230307T050915Z&X-Amz-Expires=300&X-Amz-Signature=e7463945f97fb14ea9f34e6f559e11014471ea09107da2714a456129c206a7b4&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-07 05:09:15--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [24]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [25]:
df_zones.head(10)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone'),
 Row(LocationID='6', Borough='Staten Island', Zone='Arrochar/Fort Wadsworth', service_zone='Boro Zone'),
 Row(LocationID='7', Borough='Queens', Zone='Astoria', service_zone='Boro Zone'),
 Row(LocationID='8', Borough='Queens', Zone='Astoria Park', service_zone='Boro Zone'),
 Row(LocationID='9', Borough='Queens', Zone='Auburndale', service_zone='Boro Zone'),
 Row(LocationID='10', Borough='Queens', Zone='Baisley Park', service_zone='Boro Zone')]

In [26]:
df_result = df_fhvhv.join(df_zones, df_fhvhv.PULocationID == df_zones.LocationID)

In [30]:
df_result.registerTempTable('result')



In [32]:
spark.sql("""
SELECT 
    Zone,
    COUNT(*) as frequency
FROM
    result
GROUP BY
    Zone
SORT BY 
    frequency DESC
""").head(5)

                                                                                

[Row(Zone='Crown Heights North', frequency=231279),
 Row(Zone='East Village', frequency=221244),
 Row(Zone='JFK Airport', frequency=188867),
 Row(Zone='Bushwick South', frequency=187929),
 Row(Zone='East New York', frequency=186780)]

**Ans:** Crown Heights North