# Example Jupyter notebook to showcase sparkMeasure APIs for Python
  
**SparkMeasure is a tool for performance troubleshooting of Apache Spark workloads**  
It simplifies the collection and analysis of Spark performance metrics. It is also intended as a working example of how to use Spark listeners for collecting and processing Spark executors task metrics data.

**References:**
- [https://github.com/LucaCanali/sparkMeasure](https://github.com/LucaCanali/sparkMeasure)  
- sparkmeasure Python docs: [docs/Python_shell_and_Jupyter](https://github.com/LucaCanali/sparkMeasure/blob/master/docs/Python_shell_and_Jupyter.md)  

**Architecture:**
![sparkMeasure architecture diagram](https://github.com/LucaCanali/sparkMeasure/raw/master/docs/sparkMeasure_architecture_diagram.png)

Author and contact: Luca.Canali@cern.ch  
Last updated: April 2023

In [None]:
# Dependencies

# 1. Install PySpark/Spark 
# This is optional, if you have already downloaded Spark
# See findspark (3.) in that case
!pip install pyspark

# 2. Install the Python wrapper API for spark-measure
!pip install sparkmeasure

# 3. The use of findspark is optional
# It can be handy if you have to choose among multiple Spark homes
# !pip install findspark
# import findspark
# findspark.init("/home/luca/Spark/spark-3.4.0-bin-hadoop3")

In [None]:
# Start the Spark Session
# This example uses Spark in local mode for simplicity.
# You can modify master to use  YARN or K8S if available 
# This example uses sparkMeasure 0.23 for scala 2.12, taken from maven central


from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("Test sparkmeasure instrumentation of Python/PySpark code")
         .master("local[*]")
         .config("spark.jars.packages","ch.cern.sparkmeasure:spark-measure_2.12:0.23")
         .getOrCreate()
        )  


In [2]:
# Initialize sparkMeasure
# Load the Python API for sparkmeasure package
# and attach the sparkMeasure Listener for stagemetrics to the active Spark session

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

In [3]:
# The easiest way to start using sparkMesure is with the "runandmeasure" method
# This will execute your Spark action, return the results, and collect and aggregate execution metrics

stagemetrics.runandmeasure(globals(), """
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
""")

+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 1372 (1 s)
stageDuration => 1047 (1 s)
executorRunTime => 2753 (3 s)
executorCpuTime => 2311 (2 s)
executorDeserializeTime => 3440 (3 s)
executorDeserializeCpuTime => 1321 (1 s)
resultSerializationTime => 4 (4 ms)
jvmGCTime => 192 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 29 (29 ms)
resultSize => 16134 (15.8 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk 

In [4]:
# Additionally, sparkMeasure collects executor metrics
# This is how you can print the memory usage report
# Note, If you receive the error message java.util.NoSuchElementException: key not found, 
# retry running the report after waiting for a few seconds.

stagemetrics.print_memory_report()


Additional stage-level executor metrics (memory usage info):

Stage 0 JVMHeapMemory maxVal bytes => 105968664 (101.1 MB)
Stage 0 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 1 JVMHeapMemory maxVal bytes => 105968664 (101.1 MB)
Stage 1 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 3 JVMHeapMemory maxVal bytes => 105968664 (101.1 MB)
Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)


In [5]:
# An equivalent API for collecting execution metrics is to explicitly wrap your Spark workload
# into stagemetrics instrumentation, as in this example

stagemetrics.begin()

spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()

stagemetrics.end()

# Print a summary report
stagemetrics.print_report()

+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 427 (0.4 s)
stageDuration => 350 (0.4 s)
executorRunTime => 2151 (2 s)
executorCpuTime => 1986 (2 s)
executorDeserializeTime => 55 (55 ms)
executorDeserializeCpuTime => 36 (36 ms)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 19 (19 ms)
resultSize => 16048 (15.7 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk =

In [6]:
# This code is for Jupyter notebooks
# Define cell and line magic to wrap sparkmeasure instrumentation
# See example in the next cell

from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

@register_line_cell_magic
def sparkmeasure(line, cell=None):
    "run and measure spark workload. Use: %sparkmeasure or %%sparkmeasure"
    val = cell if cell is not None else line
    stagemetrics.begin()
    eval(val)
    stagemetrics.end()
    stagemetrics.print_report()

In [7]:
%%sparkmeasure
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()

+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 473 (0.5 s)
stageDuration => 388 (0.4 s)
executorRunTime => 2365 (2 s)
executorCpuTime => 1860 (2 s)
executorDeserializeTime => 80 (80 ms)
executorDeserializeCpuTime => 39 (39 ms)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 8 (8 ms)
resultSize => 16048 (15.7 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 

## Example of using Task Metrics
Collecting Spark task metrics at the granularity of each task completion has additional overhead
compared to collecting at the stage-level.
This option should only be used if you need data with this finer granularity, for example because you want
to study skew effects, otherwise consider using stagemetrics aggregation as the preferred choice.


In [8]:
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)

taskmetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
taskmetrics.end()

taskmetrics.print_report()

+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark task metrics:
numTasks => 17
successful tasks => 17
speculative tasks => 0
taskDuration => 2415 (2 s)
schedulerDelayTime => 97 (97 ms)
executorRunTime => 2269 (2 s)
executorCpuTime => 1906 (2 s)
executorDeserializeTime => 49 (49 ms)
executorDeserializeCpuTime => 26 (26 ms)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
gettingResultTime => 0 (0 ms)
resultSize => 2667 (2.6 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleR

In [9]:
taskmetrics.runandmeasure(globals(), """
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
""")

+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark task metrics:
numTasks => 17
successful tasks => 17
speculative tasks => 0
taskDuration => 2368 (2 s)
schedulerDelayTime => 100 (0.1 s)
executorRunTime => 2203 (2 s)
executorCpuTime => 1939 (2 s)
executorDeserializeTime => 65 (65 ms)
executorDeserializeCpuTime => 30 (30 ms)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 7 (7 ms)
gettingResultTime => 0 (0 ms)
resultSize => 2667 (2.6 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffle

In [10]:
spark.stop()