# Week #5. Batch Processing with Pyspark

## Home Assignment

In [37]:
import os
from pathlib import Path

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

### Load data from Web

In [21]:
fhvhv_data_url = \
    "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz"

In [19]:
fhvhv_data_dir = "data/fhvhv_trips"
fhvhv_data_filepath = Path(fhvhv_data_dir, Path(fhvhv_data_url).name)  # data is saved here!

if not fhvhv_data_filepath.exists():
    os.system(f"wget -P {fhvhv_data_dir} {fhvhv_data_url}")

### Question 1

`3.3.2`

In [22]:
# init session:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

In [23]:
spark.version

'3.3.2'

### Question 2

`24M`

In [25]:
# define schema
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [28]:
# load FHV trips
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(str(fhvhv_data_filepath))

In [31]:
# repartition & save to .parquet
df = df.repartition(12)
df.write.parquet(
    f"{str(fhvhv_data_dir)}/parquet/2021/06",
    mode="overwrite"
)

[Stage 0:>                                                          (0 + 1) / 1]

23/03/03 18:02:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [36]:
ls -lh data/fhvhv_trips/parquet/2021/06/ | grep .parquet

-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00000-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00001-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00002-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00003-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00004-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00005-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00006-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00007-47a0b4b3-047c-4550-8ef3-600d22729ca9-c000.snappy.parquet
-rw-r--r--  1 vgarist  staff    24M Mar  3 18:02 part-00

In [39]:
# check could be read:
df = spark.read.parquet("./data/fhvhv_trips/parquet/2021/06/")
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02875|2021-06-16 22:08:45|2021-06-16 22:38:10|          48|         181|      N|                B02875|
|              B02875|2021-06-27 08:13:22|2021-06-27 08:16:18|          10|          10|      N|                B02875|
|              B02510|2021-06-13 22:34:33|2021-06-13 22:53:03|          89|         189|      N|                  null|
|              B02764|2021-06-15 11:15:06|2021-06-15 11:35:40|          36|          82|      N|                B02764|
|              B02510|2021-06-09 11:41:36|2021-06-09 11:45:32|         254|         254|      N|                  null|
+--------------------+------------------

### Question 3

`452,470`

In [49]:
# filter & count:
df \
    .withColumn("pickup_date", F.to_date("pickup_datetime")) \
    .filter("pickup_date == '2021-06-15'") \
    .count()

452470

### Question 4

`66.87`

In [63]:
df \
    .withColumn("pickup_sec", F.unix_timestamp("pickup_datetime")) \
    .withColumn("dropoff_sec", F.unix_timestamp("dropoff_datetime")) \
    .withColumn("diff_in_hours", (F.col("dropoff_sec") - F.col("pickup_sec")) / 3600) \
    .groupBy() \
    .max("diff_in_hours") \
    .show()

+------------------+
|max(diff_in_hours)|
+------------------+
|  66.8788888888889|
+------------------+



### Question 5

Port: `4040`

Spark Jobs could be monitored here:<br>
`http://localhost:4040/jobs/`

### Question 6

`Crown Heights North`

In [74]:
# load data from web
taxi_zones_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"
taxi_zones_filepath = Path("data/taxi_zones", Path(taxi_zones_url).name)

if not taxi_zones_filepath.exists():
    os.system(f"wget -P {str(taxi_zones_filepath.parent)} {fhvhv_data_url}")

In [82]:
# define zones schema:
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), nullable=False),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [83]:
# read zones:
df_zones = spark.read \
    .option("header", "true") \
    .schema(zones_schema) \
    .csv(str(taxi_zones_filepath))

df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [85]:
# create Views:
df.createOrReplaceTempView("taxi_trips")
df_zones.createOrReplaceTempView("taxi_zones")

In [87]:
# get most grequest pickup location zone:
spark.sql("""
SELECT
    taxi_zones.Zone as zone_name,
    count(*) as trips_cnt
FROM taxi_trips
JOIN taxi_zones
    ON taxi_trips.PULocationID = taxi_zones.LocationID
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
""").show()

+--------------------+---------+
|           zone_name|tripx_cnt|
+--------------------+---------+
| Crown Heights North|   231279|
|        East Village|   221244|
|         JFK Airport|   188867|
|      Bushwick South|   187929|
|       East New York|   186780|
|TriBeCa/Civic Center|   164344|
|   LaGuardia Airport|   161596|
|            Union Sq|   158937|
|        West Village|   154698|
|             Astoria|   152493|
+--------------------+---------+





<hr>