In [1]:
from pyspark.sql import SparkSession

# Create Spark Session
# This example uses a local cluster, you can modify master to use  YARN or K8S if available 
# This example downloads sparkMeasure 0.14 for scala 2_11 from maven central

spark = SparkSession \
 .builder \
 .master("local[*]") \
 .appName("Test sparkmeasure instrumentation of Python/PySpark code") \
 .config("spark.jars.packages","ch.cern.sparkmeasure:spark-measure_2.12:0.17")  \
 .getOrCreate()

In [2]:
# test that Spark is working OK
spark.sql("select 1 as id, 'Hello world!' as Greeting").show()

[Stage 0:>                                                          (0 + 1) / 1]

+---+------------+
| id|    Greeting|
+---+------------+
|  1|Hello world!|
+---+------------+



                                                                                

In [3]:
# Install the Python wrapper API for spark-measure

# !pip install sparkmeasure

# !pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17

In [4]:
# Load the Python API in sparkmeasure package
# an attache the sparkMeasure Listener for stagemetrics to the active Spark session

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

In [5]:
stagemetrics.begin()

df2 = spark.read.csv("SUSY.csv", header=False, inferSchema = True)

stagemetrics.end()

# Print a summary report
stagemetrics.print_report()


21/07/09 17:06:57 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/07/09 17:06:57 WARN StageMetrics: Stage metrics data refreshed into temp view PerfStageMetrics



Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 2
numTasks => 19
elapsedTime => 76552 (1.3 min)
stageDuration => 75957 (1.3 min)
executorRunTime => 285541 (4.8 min)
executorCpuTime => 185704 (3.1 min)
executorDeserializeTime => 998 (1.0 s)
executorDeserializeCpuTime => 422 (0.4 s)
resultSerializationTime => 7 (7 ms)
jvmGCTime => 3784 (4 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
resultSize => 29437 (28.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 5000001
bytesRead => 2391457208 (2.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 

In [6]:
# Print additional metrics from accumulables
stagemetrics.print_accumulables()

21/07/09 17:08:11 WARN StageMetrics: Accumulables metrics data refreshed into temp view AccumulablesStageMetrics


Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]

executorCpuTime => 185704 (3.1 min)
executorDeserializeCpuTime => 422 (0.4 s)
executorDeserializeTime => 998 (1.0 s)
executorRunTime => 285541 (4.8 min)
input.bytesRead => 2391457208 (2.0 GB)
input.recordsRead => 5000001
jvmGCTime => 3784 (4 s)
resultSerializationTime => 7 (7 ms)
resultSize => 31383 (30.0 KB)

SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]

   39, number of output rows => 1
   40, number of output rows => 1
   69, number of output rows => 5000000


                                                                                