In [141]:
import pyspark
from pyspark.sql import SparkSession, types

<h1>Question 1. Install Spark and PySpark</h1>

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version

What's the output?

In [142]:
spark = SparkSession.builder.master("local[*]").appName('homework').getOrCreate()

In [8]:
spark.version

'3.3.1'

<h1>Question 2. HVFHW February 2021</h1>

Download the HVFHV data for february 2021:

```
wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
```

Read it with Spark using the same schema as we did in the lessons. We will use this dataset for all the remaining questions.

Repartition it to 24 partitions and save it to parquet.

What's the size of the folder with results (in MB)?

In [12]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

--2022-12-02 12:26:20--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.6.219, 52.85.6.129, 52.85.6.201, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.6.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet’


2022-12-02 12:26:21 (24.0 MB/s) - ‘fhv_tripdata_2021-02.parquet’ saved [10645466/10645466]



In [143]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.DoubleType(), True),
    types.StructField('DOLocationID', types.DoubleType(), True),
    types.StructField('SR_Flag', types.IntegerType(), True),
])

In [144]:
df = spark.read.schema(schema).parquet("fhv_tripdata_2021-02.parquet")

In [145]:
df.show()

+--------------------+----------------------+-------------------+-------------------+------------+------------+-------+
|dispatching_base_num|Affiliated_base_number|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|
+--------------------+----------------------+-------------------+-------------------+------------+------------+-------+
|              B00013|                B00014|2021-02-01 01:01:00|2021-02-01 02:33:00|        null|        null|   null|
|     B00021         |       B00021         |2021-02-01 01:55:40|2021-02-01 02:06:20|       173.0|        82.0|   null|
|     B00021         |       B00021         |2021-02-01 01:14:03|2021-02-01 01:28:37|       173.0|        56.0|   null|
|     B00021         |       B00021         |2021-02-01 01:27:48|2021-02-01 01:35:45|        82.0|       129.0|   null|
|              B00037|                B00037|2021-02-01 01:12:50|2021-02-01 01:26:38|        null|       225.0|   null|
|              B00037|                B0

In [40]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('Affiliated_base_number', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PULocationID', DoubleType(), True), StructField('DOLocationID', DoubleType(), True), StructField('SR_Flag', IntegerType(), True)])

In [146]:
df_types_cast = df \
.withColumn("PULocationID", df["PULocationID"].cast(types.IntegerType())) \
.withColumn("DOLocationID", df["DOLocationID"].cast(types.IntegerType()))

In [147]:
df_types_cast.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('Affiliated_base_number', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', IntegerType(), True)])

In [148]:
df_types_cast.show()

+--------------------+----------------------+-------------------+-------------------+------------+------------+-------+
|dispatching_base_num|Affiliated_base_number|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|
+--------------------+----------------------+-------------------+-------------------+------------+------------+-------+
|              B00013|                B00014|2021-02-01 01:01:00|2021-02-01 02:33:00|        null|        null|   null|
|     B00021         |       B00021         |2021-02-01 01:55:40|2021-02-01 02:06:20|         173|          82|   null|
|     B00021         |       B00021         |2021-02-01 01:14:03|2021-02-01 01:28:37|         173|          56|   null|
|     B00021         |       B00021         |2021-02-01 01:27:48|2021-02-01 01:35:45|          82|         129|   null|
|              B00037|                B00037|2021-02-01 01:12:50|2021-02-01 01:26:38|        null|         225|   null|
|              B00037|                B0

In [46]:
output_path = "fhv_tripdata_2021_02_repartitioned"
df_types_cast.repartition(24).write.parquet(output_path)



22/12/02 12:44:47 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/12/02 12:44:47 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers


[Stage 20:>                                                        (0 + 8) / 24]

22/12/02 12:44:48 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/12/02 12:44:48 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/12/02 12:44:48 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers




22/12/02 12:44:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers




22/12/02 12:44:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/12/02 12:44:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/12/02 12:44:49 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

In [52]:
import os
from pathlib import Path

In [50]:
os.path.getsize("fhv_tripdata_2021-02.parquet") / 1024 / 1024

10.152307510375977

In [54]:
sum(file.stat().st_size for file in Path(output_path).rglob('*')) / 1024 / 1024

23.33998203277588

<h1>Question 3. Count records</h1>

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [58]:
from pyspark.sql import functions as F

In [63]:
df_types_cast.select().filter(F.date_trunc('day', df_types_cast.pickup_datetime) == "2021-02-15 00:00:00").count()

35001

In [149]:
df_types_cast.createOrReplaceTempView("fhv_feb")

In [150]:
spark.sql("""
SELECT 
    count(*) as count
FROM
    fhv_feb
WHERE
    date_trunc('day', fhv_feb.pickup_datetime) == '2021-02-15 00:00:00'
""").show()

+-----+
|count|
+-----+
|35001|
+-----+



<h1>Question 4. Longest trip for each day</h1>

Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [151]:
df_duration = df_types_cast \
.withColumn('pickup_datetime_day', F.date_trunc('day', df_types_cast.pickup_datetime)) \
.withColumn('pickup_datetime_ts', F.to_timestamp(F.col('pickup_datetime'))) \
.withColumn('dropOff_datetime_ts', F.to_timestamp(F.col('dropOff_datetime'))) \
.withColumn('trip_duration_seconds', (F.col("dropOff_datetime_ts").cast("long") - F.col("pickup_datetime_ts").cast("long")))

In [152]:
df_duration.show()

+--------------------+----------------------+-------------------+-------------------+------------+------------+-------+-------------------+-------------------+-------------------+---------------------+
|dispatching_base_num|Affiliated_base_number|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_datetime_day| pickup_datetime_ts|dropOff_datetime_ts|trip_duration_seconds|
+--------------------+----------------------+-------------------+-------------------+------------+------------+-------+-------------------+-------------------+-------------------+---------------------+
|              B00013|                B00014|2021-02-01 01:01:00|2021-02-01 02:33:00|        null|        null|   null|2021-02-01 00:00:00|2021-02-01 01:01:00|2021-02-01 02:33:00|                 5520|
|     B00021         |       B00021         |2021-02-01 01:55:40|2021-02-01 02:06:20|         173|          82|   null|2021-02-01 00:00:00|2021-02-01 01:55:40|2021-02-01 02:06:20|             

In [153]:
df_duration.createOrReplaceTempView("fhv_trip_duration")

In [154]:
spark.sql("""
SELECT 
    pickup_datetime, dropOff_datetime, pickup_datetime_day, (trip_duration_seconds / 3600) as trip_duration_hours
FROM
    fhv_trip_duration
WHERE
    trip_duration_seconds == (SELECT max(trip_duration_seconds) FROM fhv_trip_duration)
""").show()

+-------------------+-------------------+-------------------+-------------------+
|    pickup_datetime|   dropOff_datetime|pickup_datetime_day|trip_duration_hours|
+-------------------+-------------------+-------------------+-------------------+
|2021-02-04 19:45:00|2021-04-22 21:24:00|2021-02-04 00:00:00|            1848.65|
+-------------------+-------------------+-------------------+-------------------+



<h1>Question 5. Most frequent dispatching_base_num</h1>

Now find the most frequently occurring dispatching_base_num in this dataset.

How many stages this spark job has?

Note: the answer may depend on how you write the query, so there are multiple correct answers. Select the one you have.

In [156]:
spark.sql("""
SELECT 
    dispatching_base_num, count(*) as records_count
FROM
    fhv_feb
GROUP BY
    fhv_feb.dispatching_base_num
ORDER BY records_count DESC
LIMIT 1
""").show()

+--------------------+-------------+
|dispatching_base_num|records_count|
+--------------------+-------------+
|              B00856|        35077|
+--------------------+-------------+



Two stages.

<h1>Question 6. Most common locations pair</h1>

Find the most common pickup-dropoff pair.

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East".

In [162]:
spark.sql("""
SELECT 
    PULocationID, DOLocationID, count(*) as records_count
FROM
    fhv_feb
GROUP BY
    fhv_feb.PULocationID, fhv_feb.DOLocationID
ORDER BY records_count DESC
LIMIT 100
""").show()

+------------+------------+-------------+
|PULocationID|DOLocationID|records_count|
+------------+------------+-------------+
|        null|        null|       139681|
|        null|          76|        23303|
|        null|         265|        17386|
|        null|         217|        16437|
|        null|         254|        16071|
|        null|          61|        15020|
|        null|          17|        14646|
|        null|         244|        13113|
|        null|          35|        12310|
|        null|          26|        12116|
|        null|         174|        11537|
|        null|          69|        11254|
|        null|         242|        11188|
|        null|         169|        11082|
|        null|         168|        10191|
|        null|         235|         9390|
|        null|          51|         9174|
|        null|          18|         9129|
|        null|         188|         9113|
|        null|         197|         9075|
+------------+------------+-------

In [163]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

--2022-12-02 14:01:11--  https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.32.11.166, 13.32.11.188, 13.32.11.148, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.32.11.166|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [text/csv]
Saving to: ‘taxi+_zone_lookup.csv’


2022-12-02 14:01:12 (9.00 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [167]:
df_zones = spark.read.option("header", "true").csv("taxi+_zone_lookup.csv")

In [168]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [169]:
df_zones.createOrReplaceTempView("df_zones")

In [179]:
spark.sql("""
SELECT 
    df_zones.Zone as pickup_zone, zones.Zone as dropoff_zone, records_count 
FROM
    (SELECT PULocationID, DOLocationID, count(*) as records_count FROM fhv_feb GROUP BY fhv_feb.PULocationID, fhv_feb.DOLocationID) AS group_result
    LEFT JOIN df_zones ON group_result.PULocationID == df_zones.LocationID LEFT JOIN df_zones as zones ON group_result.DOLocationID == zones.LocationID
ORDER BY records_count DESC
LIMIT 10
""").show()

+-----------+--------------------+-------------+
|pickup_zone|        dropoff_zone|records_count|
+-----------+--------------------+-------------+
|       null|                null|       139681|
|       null|       East New York|        23303|
|       null|                  NA|        17386|
|       null|  South Williamsburg|        16437|
|       null|Williamsbridge/Ol...|        16071|
|       null| Crown Heights North|        15020|
|       null|             Bedford|        14646|
|       null|Washington Height...|        13113|
|       null|         Brownsville|        12310|
|       null|        Borough Park|        12116|
+-----------+--------------------+-------------+



<h1>Bonus question. Join type</h1>

(not graded)

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?

And how many stages your spark job has?

LEFT JOIN / LEFT OUTER JOIN