# Python Notebook for Testing sparkMeasure

In [1]:
import json
import pandas as pd
import sparkmeasure as spm

# Initialize sparkMeasure metrics:
stagemetrics = spm.StageMetrics(spark)
taskmetrics = spm.TaskMetrics(spark)

In [2]:
# Read metrics from json encoded flight recorder file:
scope = 'stage' # Select 'task' or 'stage'

load_perf_metrics = spark.read.option('inferSchema', True).json(r'/home/ubuntu/notebooks/tpcds/metrics/'+scope.lower()+'Metrics_rec_sample', multiLine=True)
load_perf_metrics.createOrReplaceTempView('Perf'+scope.title()+'Metrics')
load_perf_metrics.printSchema()

root
 |-- bytesRead: long (nullable = true)
 |-- bytesWritten: long (nullable = true)
 |-- completionTime: long (nullable = true)
 |-- diskBytesSpilled: long (nullable = true)
 |-- executorCpuTime: long (nullable = true)
 |-- executorDeserializeCpuTime: long (nullable = true)
 |-- executorDeserializeTime: long (nullable = true)
 |-- executorRunTime: long (nullable = true)
 |-- jobGroup: string (nullable = true)
 |-- jobId: long (nullable = true)
 |-- jvmGCTime: long (nullable = true)
 |-- memoryBytesSpilled: long (nullable = true)
 |-- name: string (nullable = true)
 |-- numTasks: long (nullable = true)
 |-- peakExecutionMemory: long (nullable = true)
 |-- recordsRead: long (nullable = true)
 |-- recordsWritten: long (nullable = true)
 |-- resultSerializationTime: long (nullable = true)
 |-- resultSize: long (nullable = true)
 |-- shuffleBytesWritten: long (nullable = true)
 |-- shuffleFetchWaitTime: long (nullable = true)
 |-- shuffleLocalBlocksFetched: long (nullable = true)
 |-- shu

In [3]:
# Need to apply correct schema to fields. Otherwise aggregation will fail.
load_perf_metrics.describe()

DataFrame[summary: string, bytesRead: string, bytesWritten: string, completionTime: string, diskBytesSpilled: string, executorCpuTime: string, executorDeserializeCpuTime: string, executorDeserializeTime: string, executorRunTime: string, jobGroup: string, jobId: string, jvmGCTime: string, memoryBytesSpilled: string, name: string, numTasks: string, peakExecutionMemory: string, recordsRead: string, recordsWritten: string, resultSerializationTime: string, resultSize: string, shuffleBytesWritten: string, shuffleFetchWaitTime: string, shuffleLocalBlocksFetched: string, shuffleLocalBytesRead: string, shuffleRecordsRead: string, shuffleRecordsWritten: string, shuffleRemoteBlocksFetched: string, shuffleRemoteBytesRead: string, shuffleRemoteBytesReadToDisk: string, shuffleTotalBlocksFetched: string, shuffleTotalBytesRead: string, shuffleWriteTime: string, stageDuration: string, stageId: string, submissionTime: string]

In [4]:
# Read in flight recorder data. Register as spark table (e.g., "PerfStageMetrics", "PerfTaskMetrics").
# Then apply aggretation methods. 

# Summarize metrics for entire application. Results same as stagemetrics.print_report().
if scope.lower() == 'task':
    aggregatedDF = taskmetrics.aggregate_taskmetrics_DF("PerfTaskMetrics")
elif scope.lower() == 'stage':
    aggregatedDF = stagemetrics.aggregate_stagemetrics_DF("PerfStageMetrics")

# aggregatedDF cannot be converted to pandas dataframe: 'SparkSession' object has no attribute '_conf'
# Work around using other operations. Same data as found in stagemetrics.print_report().
perf_summary = dict(zip(aggregatedDF.columns, aggregatedDF.first()))
perf_summary

{'numStages': 0,
 'numTasks': None,
 'elapsedTime': None,
 'stageDuration': None,
 'executorRunTime': None,
 'executorCpuTime': None,
 'executorDeserializeTime': None,
 'executorDeserializeCpuTime': None,
 'resultSerializationTime': None,
 'jvmGCTime': None,
 'shuffleFetchWaitTime': None,
 'shuffleWriteTime': None,
 'resultSize': None,
 'diskBytesSpilled': None,
 'memoryBytesSpilled': None,
 'peakExecutionMemory': None,
 'recordsRead': None,
 'bytesRead': None,
 'recordsWritten': None,
 'bytesWritten': None,
 'shuffleRecordsRead': None,
 'shuffleTotalBlocksFetched': None,
 'shuffleLocalBlocksFetched': None,
 'shuffleRemoteBlocksFetched': None,
 'shuffleTotalBytesRead': None,
 'shuffleLocalBytesRead': None,
 'shuffleRemoteBytesRead': None,
 'shuffleRemoteBytesReadToDisk': None,
 'shuffleBytesWritten': None,
 'shuffleRecordsWritten': None}

In [5]:
aggregatedDF.show()

+---------+--------+-----------+-------------+---------------+---------------+-----------------------+--------------------------+-----------------------+---------+--------------------+----------------+----------+----------------+------------------+-------------------+-----------+---------+--------------+------------+------------------+-------------------------+-------------------------+--------------------------+---------------------+---------------------+----------------------+----------------------------+-------------------+---------------------+
|numStages|numTasks|elapsedTime|stageDuration|executorRunTime|executorCpuTime|executorDeserializeTime|executorDeserializeCpuTime|resultSerializationTime|jvmGCTime|shuffleFetchWaitTime|shuffleWriteTime|resultSize|diskBytesSpilled|memoryBytesSpilled|peakExecutionMemory|recordsRead|bytesRead|recordsWritten|bytesWritten|shuffleRecordsRead|shuffleTotalBlocksFetched|shuffleLocalBlocksFetched|shuffleRemoteBlocksFetched|shuffleTotalBytesRead|shuffle