## Week 5 Homework

In this homework we'll put what we learned about Spark in practice.

We'll use high volume for-hire vehicles (HVFHV) dataset for that.

### Question 1. Install Spark and PySpark

    Install Spark
    Run PySpark
    Create a local spark session
    Execute spark.version

What's the output?

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/28 11:39:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark.version

'3.0.3'

### Question 2. HVFHW February 2021

Download the HVFHV data for february 2021:

wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

Read it with Spark using the same schema as we did in the lessons.

Repartition it to 24 partitions and save it to parquet.

What's the size of the folder with results (in MB)?

In [3]:
df_hvfhv = spark.read \
    .option("header", "true") \
    .csv('data/raw/hvfhw/')

In [4]:
import pandas as pd
from pyspark.sql import types

In [5]:
df_hvfhv_pd = pd.read_csv('data/raw/hvfhw/fhvhv_tripdata_2021-02.csv', nrows = 1001)

In [8]:
spark.createDataFrame(df_hvfhv_pd).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))

In [5]:
schema = types.StructType([
    types.StructField("hvfhs_license_num", types.StringType(), True),
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.DoubleType(), True)])

In [None]:
input_path = f'data/raw/hvfhw/fhvhv_tripdata_2021-02.csv'
output_path = f'data/pq/hvfhv/'


df_hvfhv = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(input_path)

df_hvfhv \
    .repartition(24) \
    .write.parquet(output_path)

In [15]:
!du -sh ./data/pq/hvfhv

210M	./data/pq/hvfhv


### Question 3. Count records

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [7]:
df_hvfhv_pq = spark.read.parquet('data/pq/hvfhv/*')

In [8]:
# To query df using standard SQL synthax we need to build it on top of the dataframe

df_hvfhv_pq.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)



In [9]:
import datetime

In [10]:
d = datetime.date(2021, 2, 15)
print(d)

2021-02-15


In [11]:
df_hvfhv_pq.filter(df_hvfhv_pq.pickup_datetime == datetime.date(2021, 2, 15)).count()

                                                                                

5

In [12]:
from pyspark.sql.functions import *

In [13]:
df_hvfhv_pq = df_hvfhv_pq \
    .withColumn("pickup_date", to_date("pickup_datetime")) \
#     .show(n=10)

In [14]:
df_hvfhv_pq.filter(df_hvfhv_pq.pickup_date == datetime.date(2021, 2, 15)).count()

                                                                                

367170

In [15]:
df_hvfhv_pq.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- pickup_date: date (nullable = true)



### Question 4. Longest trip for each day

Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [16]:
df_hvfhv_pq = df_hvfhv_pq \
    .withColumn("trip_duration", unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime"))

In [17]:
df_hvfhv_pq.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- trip_duration: long (nullable = true)



In [18]:
df_hvfhv_pq.registerTempTable('trips_data')

In [19]:
df_result = spark.sql("""
SELECT 
    pickup_datetime, 
    trip_duration
FROM trips_data
ORDER BY trip_duration DESC
LIMIT 1
;
""")

In [20]:
df_result.show()

[Stage 6:>                                                          (0 + 4) / 4]

+-------------------+-------------+
|    pickup_datetime|trip_duration|
+-------------------+-------------+
|2021-02-11 13:40:44|        75540|
+-------------------+-------------+



                                                                                

### Question 5. Most frequent dispatching_base_num

Now find the most frequently occurring dispatching_base_num in this dataset.

How many stages this spark job has?

    Note: the answer may depend on how you write the query, so there are multiple correct answers. Select the one you have.

In [21]:
df_result = spark.sql("""
SELECT 
    count(*) AS cnt_rows,
    dispatching_base_num
FROM trips_data
GROUP BY dispatching_base_num
ORDER BY cnt_rows DESC
LIMIT 1
;
""")

In [22]:
df_result.show()



+--------+--------------------+
|cnt_rows|dispatching_base_num|
+--------+--------------------+
| 3233664|              B02510|
+--------+--------------------+



                                                                                

### Question 6. Most common locations pair

Find the most common pickup-dropoff pair.

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East".

In [23]:
zones_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)])

In [24]:
df_zone = spark.read \
    .option("header", "true") \
    .schema(zones_schema) \
    .csv('data/raw/taxi+_zone_lookup.csv')

In [25]:
df_zone.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [26]:
df_zone.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [27]:
df_hvfhv_pq = df_hvfhv_pq \
    .join(df_zone.select("LocationID", "Zone") \
          .withColumnRenamed("LocationID","PULocationID") \
          .withColumnRenamed("Zone","PUZone"), "PULocationID", "left_outer") \
    .join(df_zone.select("LocationID", "Zone") \
          .withColumnRenamed("LocationID","DOLocationID") \
          .withColumnRenamed("Zone","DOZone"), "DOLocationID", "left_outer") \
    .withColumn("pickup_dropoff", concat_ws(' / ', "PUZone", "DOZone", ))

In [41]:
df_hvfhv_pq \
    .cache() \
    .groupBy("pickup_dropoff") \
    .agg(count(lit(1)).alias("cnt_rows")) \
    .sort("cnt_rows", ascending = False) \
    .show(10)

22/02/28 12:54:14 WARN CacheManager: Asked to cache already cached data.
22/02/28 12:54:46 WARN MemoryStore: Not enough space to cache rdd_169_3 in memory! (computed 105.7 MiB so far)
22/02/28 12:54:46 WARN BlockManager: Persisting block rdd_169_3 to disk instead.
22/02/28 12:54:46 WARN MemoryStore: Not enough space to cache rdd_169_2 in memory! (computed 105.8 MiB so far)
22/02/28 12:54:46 WARN BlockManager: Persisting block rdd_169_2 to disk instead.
22/02/28 12:55:06 WARN MemoryStore: Not enough space to cache rdd_169_2 in memory! (computed 38.8 MiB so far)
22/02/28 12:55:07 WARN MemoryStore: Not enough space to cache rdd_169_3 in memory! (computed 38.8 MiB so far)

+--------------------+--------+
|      pickup_dropoff|cnt_rows|
+--------------------+--------+
|East New York / E...|   45041|
|Borough Park / Bo...|   37329|
| Canarsie / Canarsie|   28026|
|Crown Heights Nor...|   25976|
|Bay Ridge / Bay R...|   17934|
|Jackson Heights /...|   14688|
|   Astoria / Astoria|   14688|
|Central Harlem No...|   14481|
|Bushwick South / ...|   14424|
|Flatbush/Ditmas P...|   13976|
+--------------------+--------+
only showing top 10 rows





In [28]:
df_hvfhv_pq.registerTempTable('trips_data')

In [43]:
df_result = spark.sql("""
SELECT 
    count(*) AS cnt_rows,
    PULocationID,
    PUZone,
    DOLocationID,
    DOZone,
    pickup_dropoff
FROM trips_data
GROUP BY PULocationID,
    PUZone,
    DOLocationID,
    DOZone,
    pickup_dropoff
ORDER BY cnt_rows DESC
LIMIT 10
;
""")

In [44]:
df_result.show(truncate = False)

22/03/01 11:32:45 WARN MemoryStore: Not enough space to cache rdd_169_3 in memory! (computed 38.8 MiB so far)
22/03/01 11:32:45 WARN MemoryStore: Not enough space to cache rdd_169_2 in memory! (computed 38.8 MiB so far)
22/03/01 11:32:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/01 11:32:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/01 11:32:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/03/01 11:32:45 ERROR Executor: Exception in task 2.0 in stage 41.0 (TID 1466)
org.apache.spark.memory.SparkOutOfMemoryError: No enough memory for aggregation
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	a

Py4JJavaError: An error occurred while calling o151.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 41.0 failed 1 times, most recent failure: Lost task 2.0 in stage 41.0 (TID 1466, vm-de-zoomcamp.europe-west1-b.c.datatalks-de-zoomcamp.internal, executor driver): org.apache.spark.memory.SparkOutOfMemoryError: No enough memory for aggregation
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: No enough memory for aggregation
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


22/03/01 11:32:46 WARN TaskSetManager: Lost task 1.0 in stage 41.0 (TID 1465, vm-de-zoomcamp.europe-west1-b.c.datatalks-de-zoomcamp.internal, executor driver): TaskKilled (Stage cancelled)
22/03/01 11:32:46 WARN TaskSetManager: Lost task 0.0 in stage 41.0 (TID 1464, vm-de-zoomcamp.europe-west1-b.c.datatalks-de-zoomcamp.internal, executor driver): TaskKilled (Stage cancelled)
22/03/01 11:32:46 WARN TaskSetManager: Lost task 3.0 in stage 41.0 (TID 1467, vm-de-zoomcamp.europe-west1-b.c.datatalks-de-zoomcamp.internal, executor driver): TaskKilled (Stage cancelled)
