# Jupyter/Colab Notebook to Showcase sparkMeasure APIs for Python

### [Run on Google Colab Research: <img src="https://raw.githubusercontent.com/googlecolab/open_in_colab/master/images/icon128.png">](https://colab.research.google.com/github/LucaCanali/sparkMeasure/blob/master/examples/SparkMeasure_Jupyter_Colab_Example.ipynb)

**SparkMeasure is a tool for performance troubleshooting of Apache Spark workloads**  
It simplifies the collection and analysis of Spark performance metrics. It is also intended as a working example of how to use Spark listeners for collecting and processing Spark executors task metrics data.

**References:**
- [https://github.com/LucaCanali/sparkMeasure](https://github.com/LucaCanali/sparkMeasure)  
- sparkmeasure Python docs: [docs/Python_shell_and_Jupyter](https://github.com/LucaCanali/sparkMeasure/blob/master/docs/Python_shell_and_Jupyter.md)  

**Architecture:**
![sparkMeasure architecture diagram](https://github.com/LucaCanali/sparkMeasure/raw/master/docs/sparkMeasure_architecture_diagram.png)

Contact: Luca.Canali@cern.ch, February 2019  

In [None]:
# Install Spark 
# Note: This installs the Spark version 2.4.3 on newer does not work.

!pip install pyspark==2.4.3

In [37]:
from pyspark.sql import SparkSession

# Create Spark Session
# This example uses a local cluster, you can modify master to use  YARN or K8S if available 
# This example downloads sparkMeasure 0.14 for scala 2_11 from maven central

spark = SparkSession \
 .builder \
 .master("local[*]") \
 .appName("Test sparkmeasure instrumentation of Python/PySpark code") \
 .config("spark.jars.packages","ch.cern.sparkmeasure:spark-measure_2.11:0.14")  \
 .getOrCreate()

In [38]:
# test that Spark is working OK
spark.sql("select 1 as id, 'Hello world!' as Greeting").show()

+---+------------+
| id|    Greeting|
+---+------------+
|  1|Hello world!|
+---+------------+



In [None]:
# Install the Python wrapper API for spark-measure

!pip install sparkmeasure

In [40]:
# Load the Python API in sparkmeasure package
# an attache the sparkMeasure Listener for stagemetrics to the active Spark session

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

In [41]:
# Define cell and line magic to wrap the instrumentation
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

@register_line_cell_magic
def sparkmeasure(line, cell=None):
    "run and measure spark workload. Use: %sparkmeasure or %%sparkmeasure"
    val = cell if cell is not None else line
    stagemetrics.begin()
    eval(val)
    stagemetrics.end()
    stagemetrics.print_report()

In [42]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", '-1')

In [43]:
%%sparkmeasure
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 2
sum(numTasks) => 9
elapsedTime => 9209 (9 s)
sum(stageDuration) => 9208 (9 s)
sum(executorRunTime) => 18066 (18 s)
sum(executorCpuTime) => 11305 (11 s)
sum(executorDeserializeTime) => 38 (38 ms)
sum(executorDeserializeCpuTime) => 17 (17 ms)
sum(resultSerializationTime) => 1 (1 ms)
sum(jvmGCTime) => 171 (0.2 s)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 12 (12 ms)
max(resultSize) => 16891 (16.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 8400
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetch

In [44]:
# You can also explicitly Wrap your Spark workload into stagemetrics instrumentation 
# as in this example
stagemetrics.begin()

spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()

stagemetrics.end()
# Print a summary report
stagemetrics.print_report()

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 2
sum(numTasks) => 9
elapsedTime => 8833 (9 s)
sum(stageDuration) => 8832 (9 s)
sum(executorRunTime) => 17217 (17 s)
sum(executorCpuTime) => 11414 (11 s)
sum(executorDeserializeTime) => 39 (39 ms)
sum(executorDeserializeCpuTime) => 16 (16 ms)
sum(resultSerializationTime) => 1 (1 ms)
sum(jvmGCTime) => 128 (0.1 s)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 12 (12 ms)
max(resultSize) => 16891 (16.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 8400
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetch

In [45]:
# Another way to encapsulate code and instrumentation in a compact form

result = stagemetrics.runandmeasure(locals(), """
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()
""")

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 2
sum(numTasks) => 9
elapsedTime => 6323 (6 s)
sum(stageDuration) => 6322 (6 s)
sum(executorRunTime) => 12518 (13 s)
sum(executorCpuTime) => 11666 (12 s)
sum(executorDeserializeTime) => 31 (31 ms)
sum(executorDeserializeCpuTime) => 15 (15 ms)
sum(resultSerializationTime) => 2 (2 ms)
sum(jvmGCTime) => 86 (86 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 3 (3 ms)
max(resultSize) => 16891 (16.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 8400
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched)

## Example of collecting using Task Metrics
Collecting Spark task metrics at the granularity of each task completion has additional overhead
compare to collecting at the stage completion level, therefore this option should only be used if you need data with this finer granularity, for example because you want
to study skew effects, otherwise consider using stagemetrics aggregation as preferred choice.


In [46]:
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)

taskmetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()
taskmetrics.end()
taskmetrics.print_report()

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Contex default degree of parallelism = 2
Aggregated Spark task metrics:
numtasks => 9
elapsedTime => 6211 (6 s)
sum(duration) => 12354 (12 s)
sum(schedulerDelay) => 28
sum(executorRunTime) => 12304 (12 s)
sum(executorCpuTime) => 11639 (12 s)
sum(executorDeserializeTime) => 22 (22 ms)
sum(executorDeserializeCpuTime) => 9 (9 ms)
sum(resultSerializationTime) => 0 (0 ms)
sum(jvmGCTime) => 94 (94 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
sum(gettingResultTime) => 0 (0 ms)
max(resultSize) => 2106 (2.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 8400
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8

In [47]:
report = taskmetrics.report()

In [48]:
print(report)


Scheduling mode = FIFO
Spark Contex default degree of parallelism = 2
Aggregated Spark task metrics:
numtasks => 9
elapsedTime => 6211 (6 s)
sum(duration) => 12354 (12 s)
sum(schedulerDelay) => 28
sum(executorRunTime) => 12304 (12 s)
sum(executorCpuTime) => 11639 (12 s)
sum(executorDeserializeTime) => 22 (22 ms)
sum(executorDeserializeCpuTime) => 9 (9 ms)
sum(resultSerializationTime) => 0 (0 ms)
sum(jvmGCTime) => 94 (94 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
sum(gettingResultTime) => 0 (0 ms)
max(resultSize) => 2106 (2.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 8400
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten

In [49]:
def measure(my_job):
  taskmetrics.begin()
  my_job()
  taskmetrics.end()
  return taskmetrics.report()

In [50]:
new_report = measure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show)

+---------+
| count(1)|
+---------+
|100000000|
+---------+



In [51]:
print(new_report)


Scheduling mode = FIFO
Spark Contex default degree of parallelism = 2
Aggregated Spark task metrics:
numtasks => 9
elapsedTime => 6294 (6 s)
sum(duration) => 12522 (13 s)
sum(schedulerDelay) => 36
sum(executorRunTime) => 12462 (12 s)
sum(executorCpuTime) => 11628 (12 s)
sum(executorDeserializeTime) => 21 (21 ms)
sum(executorDeserializeCpuTime) => 9 (9 ms)
sum(resultSerializationTime) => 3 (3 ms)
sum(jvmGCTime) => 150 (0.2 s)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
sum(gettingResultTime) => 0 (0 ms)
max(resultSize) => 2149 (2.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 8400
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritte

In [52]:
from sparkmeasure import TaskMetrics
from pyspark.sql import SparkSession

spark = SparkSession \
 .builder \
 .master("local[*]") \
 .appName("Test sparkmeasure instrumentation of Python/PySpark code") \
 .config("spark.jars.packages","ch.cern.sparkmeasure:spark-measure_2.11:0.14")  \
 .getOrCreate()

def measure(my_job):
  taskmetrics = TaskMetrics(spark)
  taskmetrics.begin()
  my_job()
  taskmetrics.end()
  return taskmetrics.report()

In [53]:
measure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show)

+---------+
| count(1)|
+---------+
|100000000|
+---------+



'\nScheduling mode = FIFO\nSpark Contex default degree of parallelism = 2\nAggregated Spark task metrics:\nnumtasks => 9\nelapsedTime => 6292 (6 s)\nsum(duration) => 12559 (13 s)\nsum(schedulerDelay) => 30\nsum(executorRunTime) => 12505 (13 s)\nsum(executorCpuTime) => 11654 (12 s)\nsum(executorDeserializeTime) => 24 (24 ms)\nsum(executorDeserializeCpuTime) => 8 (8 ms)\nsum(resultSerializationTime) => 0 (0 ms)\nsum(jvmGCTime) => 122 (0.1 s)\nsum(shuffleFetchWaitTime) => 0 (0 ms)\nsum(shuffleWriteTime) => 0 (0 ms)\nsum(gettingResultTime) => 0 (0 ms)\nmax(resultSize) => 2106 (2.0 KB)\nsum(numUpdatedBlockStatuses) => 0\nsum(diskBytesSpilled) => 0 (0 Bytes)\nsum(memoryBytesSpilled) => 0 (0 Bytes)\nmax(peakExecutionMemory) => 0\nsum(recordsRead) => 8400\nsum(bytesRead) => 0 (0 Bytes)\nsum(recordsWritten) => 0\nsum(bytesWritten) => 0 (0 Bytes)\nsum(shuffleTotalBytesRead) => 472 (472 Bytes)\nsum(shuffleTotalBlocksFetched) => 8\nsum(shuffleLocalBlocksFetched) => 8\nsum(shuffleRemoteBlocksFetche