Skip to content
Permalink
Branch: master
Find file Copy path
1 contributor

Users who have contributed to this file

266 lines (237 sloc) 13.8 KB

Spark Event Log

You can find in this note a few examples on how to read SparkEventlog files to extract SQL workload/performance metrics using Spark SQL. Some of the topics addressed are:

  • What is Spark EventLog and what info you can find there
  • How to read them using Spark SQL reader
  • Relevant SQL to extract and run aggregation on the data, notably working with nested structures present in the Event Log

Motivations

This is useful if you want to analyze the performance of your applications by processing the eventLog data beyond what is available using Spark history server. For example you want to process data to perform custom aggregations and/or use notebook-style tools. Another scenario is that you want to automate the analysis of multiple eventLog files.


Some background on Spark EventLog/applicationHistory files

  • The Spark driver logs into job workload/perf metrics in the spark.evenLog.dir directory as JSON files.
  • There is one file per application, the file names contains the application id (therefore including a timestamp) application_1502789566015_17671.
  • While the application is running the file as a suffix .inprogress, the suffix is removed if the application gracefully stops. This means that the .inprogress suffix can stick to the file in certains cases, such as driver crashes.
  • Typically these files are read with the Web UI and the history server.
  • EventLog JSON files can also be read directly.

Config

Spark Event Log records info on processed jobs/stages/tasks. See details at [https://spark.apache.org/docs/latest/monitoring.html]
This feature is activated and configured with spark config options. This is an example:

spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///user/spark/applicationHistory

Example of how to read Event Log JSON files with Spark SQL

This is an example of how to read and do basic processing with Spark Dataframes/SQL:

val df = spark.read.json("/user/spark/applicationHistory/application_1502789566015_25541")
df.printSchema

The JSON file has many attributes and a rich schema, see df.printSchema.
One useful source of info is about task metrics and events (generated by executors and handled via the Spark Listener infrastruture). Here an

// show the type of events and the number of entries for each
df.select("Event").groupBy("Event").count.show(20,false)

scala> df.select("Event").groupBy("Event").count.show(20,false)
+----------------------------------------------------------------+-----+
|Event                                                           |count|
+----------------------------------------------------------------+-----+
|org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd  |3    |
|SparkListenerTaskStart                                          |249  |
|SparkListenerBlockManagerAdded                                  |4    |
|SparkListenerJobStart                                           |2    |
|SparkListenerStageCompleted                                     |5    |
|SparkListenerJobEnd                                             |2    |
|SparkListenerLogStart                                           |1    |
|SparkListenerExecutorAdded                                      |4    |
|org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart|3    |
|SparkListenerEnvironmentUpdate                                  |1    |
|SparkListenerStageSubmitted                                     |5    |
|SparkListenerTaskEnd                                            |249  |
|SparkListenerApplicationStart                                   |1    |
+----------------------------------------------------------------+-----+

// Note use this SQL code if you prefer
df.createOrReplaceTempView("t1")
sql("select Event,count(*) from t1 group by Event").show(30,false)

Example analysis using Stage metrics:

  • This extract Stage Info values from the event log JSON
  • Metrics are stored in an array "Accumulables"
  • the SQL lateral view explode(Accumulables) is used to join the nested data in Accumulates with the main dataframe
val df = spark.read.json("/user/spark/applicationHistory/application_1502789566015_25541")

val df2 = df.filter("Event='SparkListenerStageCompleted'").select("`Stage Info`.*")
df2.createOrReplaceTempView("t2")

val df4 = sql("select 'Submission Time','Completion Time', 'Number of Tasks', 'Stage ID', t3.col.* from t2 lateral view explode(Accumulables) t3")
df4.show(20,false)
df4.createOrReplaceTempView("t4")

Examples:

// aggregate stage info metrics values
scala> sql("select Name, sum(Value) as value from t4 group by Name order by Name").show(40,false)

+---------------------------------------------------+----------------+
|Name                                               |value           |
+---------------------------------------------------+----------------+
|aggregate time total (min, med, max)               |1230038.0       |
|avg hash probe (min, med, max)                     |1240.0          |
|data size total (min, med, max)                    |5.6000205E7     |
|duration total (min, med, max)                     |3202872.0       |
|internal.metrics.executorCpuTime                   |1.46231111372E11|
|internal.metrics.executorDeserializeCpuTime        |3.445626341E9   |
|internal.metrics.executorDeserializeTime           |27622.0         |
|internal.metrics.executorRunTime                   |857185.0        |
|internal.metrics.input.bytesRead                   |1.3991536E7     |
|internal.metrics.input.recordsRead                 |1000224.0       |
|internal.metrics.jvmGCTime                         |100728.0        |
|internal.metrics.peakExecutionMemory               |1.2690128896E10 |
|internal.metrics.resultSerializationTime           |103.0           |
|internal.metrics.resultSize                        |1114448.0       |
|internal.metrics.shuffle.read.fetchWaitTime        |522274.0        |
|internal.metrics.shuffle.read.localBlocksFetched   |996.0           |
|internal.metrics.shuffle.read.localBytesRead       |4224628.0       |
|internal.metrics.shuffle.read.recordsRead          |2000016.0       |
|internal.metrics.shuffle.read.remoteBlocksFetched  |2988.0          |
|internal.metrics.shuffle.read.remoteBytesRead      |1.2722302E7     |
|internal.metrics.shuffle.read.remoteBytesReadToDisk|0.0             |
|internal.metrics.shuffle.write.bytesWritten        |1.694693E7      |
|internal.metrics.shuffle.write.recordsWritten      |2000016.0       |
|internal.metrics.shuffle.write.writeTime           |5.894307225E9   |
|number of output rows                              |2.504759806E9   |
|peak memory total (min, med, max)                  |1.2690128092E10 |
|sort time total (min, med, max)                    |221.0           |
+---------------------------------------------------+----------------+

scala> df2.printSchema
root
 |-- Accumulables: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Count Failed Values: boolean (nullable = true)
 |    |    |-- ID: long (nullable = true)
 |    |    |-- Internal: boolean (nullable = true)
 |    |    |-- Metadata: string (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Value: string (nullable = true)
 |-- Completion Time: long (nullable = true)
 |-- Details: string (nullable = true)
 |-- Number of Tasks: long (nullable = true)
 |-- Parent IDs: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- RDD Info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Callsite: string (nullable = true)
 |    |    |-- Disk Size: long (nullable = true)
 |    |    |-- Memory Size: long (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Number of Cached Partitions: long (nullable = true)
 |    |    |-- Number of Partitions: long (nullable = true)
 |    |    |-- Parent IDs: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- RDD ID: long (nullable = true)
 |    |    |-- Scope: string (nullable = true)
 |    |    |-- Storage Level: struct (nullable = true)
 |    |    |    |-- Deserialized: boolean (nullable = true)
 |    |    |    |-- Replication: long (nullable = true)
 |    |    |    |-- Use Disk: boolean (nullable = true)
 |    |    |    |-- Use Memory: boolean (nullable = true)
 |-- Stage Attempt ID: long (nullable = true)
 |-- Stage ID: long (nullable = true)
 |-- Stage Name: string (nullable = true)
 |-- Submission Time: long (nullable = true)


scala> sql("select `Submission Time`,`Completion Time`,`Number of Tasks`,`Stage ID`, t3.col.* from t2 lateral view explode(Accumulables) t3").show
+---------------+---------------+---------------+--------+-------------------+---+--------+--------+--------------------+---------+
|Submission Time|Completion Time|Number of Tasks|Stage ID|Count Failed Values| ID|Internal|Metadata|                Name|    Value|
+---------------+---------------+---------------+--------+-------------------+---+--------+--------+--------------------+---------+
|  1507552351523|  1507552352748|              4|       0|               true| 23|    true|    null|internal.metrics....|     2035|
|  1507552351523|  1507552352748|              4|       0|               true| 26|    true|    null|internal.metrics....|       44|
|  1507552351523|  1507552352748|              4|       0|               true| 20|    true|     sql|number of output ...|     1000|
|  1507552351523|  1507552352748|              4|       0|               true| 22|    true|    null|internal.metrics....|717091615|
|  1507552351523|  1507552352748|              4|       0|               true| 25|    true|    null|internal.metrics....|     6368|
|  1507552351523|  1507552352748|              4|       0|               true| 19|    true|     sql|duration total (m...|       15|


Example analysis using Task metrics:

val df = spark.read.json("/user/spark/applicationHistory/application_1502789566015_25541")
val df2 = df.filter("Event='SparkListenerTaskEnd'").select("Stage ID", "Task Info.*", "Task Metrics.*")


Examples:
scala> df2.printSchema
root
 |-- Stage ID: long (nullable = true)
 |-- Accumulables: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Count Failed Values: boolean (nullable = true)
 |    |    |-- ID: long (nullable = true)
 |    |    |-- Internal: boolean (nullable = true)
 |    |    |-- Metadata: string (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Update: string (nullable = true)
 |    |    |-- Value: string (nullable = true)
 |-- Attempt: long (nullable = true)
 |-- Executor ID: string (nullable = true)
 |-- Failed: boolean (nullable = true)
 |-- Finish Time: long (nullable = true)
 |-- Getting Result Time: long (nullable = true)
 |-- Host: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Killed: boolean (nullable = true)
 |-- Launch Time: long (nullable = true)
 |-- Locality: string (nullable = true)
 |-- Speculative: boolean (nullable = true)
 |-- Task ID: long (nullable = true)
 |-- Disk Bytes Spilled: long (nullable = true)
 |-- Executor CPU Time: long (nullable = true)
 |-- Executor Deserialize CPU Time: long (nullable = true)
 |-- Executor Deserialize Time: long (nullable = true)
 |-- Executor Run Time: long (nullable = true)
 |-- Input Metrics: struct (nullable = true)
 |    |-- Bytes Read: long (nullable = true)
 |    |-- Records Read: long (nullable = true)
 |-- JVM GC Time: long (nullable = true)
 |-- Memory Bytes Spilled: long (nullable = true)
 |-- Output Metrics: struct (nullable = true)
 |    |-- Bytes Written: long (nullable = true)
 |    |-- Records Written: long (nullable = true)
 |-- Result Serialization Time: long (nullable = true)
 |-- Result Size: long (nullable = true)
 |-- Shuffle Read Metrics: struct (nullable = true)
 |    |-- Fetch Wait Time: long (nullable = true)
 |    |-- Local Blocks Fetched: long (nullable = true)
 |    |-- Local Bytes Read: long (nullable = true)
 |    |-- Remote Blocks Fetched: long (nullable = true)
 |    |-- Remote Bytes Read: long (nullable = true)
 |    |-- Total Records Read: long (nullable = true)
 |-- Shuffle Write Metrics: struct (nullable = true)
 |    |-- Shuffle Bytes Written: long (nullable = true)
 |    |-- Shuffle Records Written: long (nullable = true)
 |    |-- Shuffle Write Time: long (nullable = true)
 |-- Updated Blocks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Block ID: string (nullable = true)
 |    |    |-- Status: struct (nullable = true)
 |    |    |    |-- Disk Size: long (nullable = true)
 |    |    |    |-- Memory Size: long (nullable = true)
 |    |    |    |-- Storage Level: struct (nullable = true)
 |    |    |    |    |-- Deserialized: boolean (nullable = true)
 |    |    |    |    |-- Replication: long (nullable = true)
 |    |    |    |    |-- Use Disk: boolean (nullable = true)
 |    |    |    |    |-- Use Memory: boolean (nullable = true)


scala> df2.select("Input Metrics.*","Executor CPU Time","Finish Time","Locality").show
+----------+------------+-----------------+-------------+-------------+
|Bytes Read|Records Read|Executor CPU Time|  Finish Time|     Locality|
+----------+------------+-----------------+-------------+-------------+
|         0|         250|        299716929|1507552352226|PROCESS_LOCAL|
|         0|         250|         27238324|1507552352230|PROCESS_LOCAL|
|         0|         250|        470049212|1507552352739|PROCESS_LOCAL|
|         0|         250|        180483811|1507552352744|PROCESS_LOCAL|
|         0|         250|        125423947|1507552353197|PROCESS_LOCAL|
|         0|         250|         97179093|1507552353197|PROCESS_LOCAL|
|         0|         250|        211535781|1507552353505|PROCESS_LOCAL|
|         0|         250|        242424956|1507552353509|PROCESS_LOCAL|
|         0|           0|        100401996|1507552353694|   NODE_LOCAL|
+----------+------------+-----------------+-------------+-------------+
You can’t perform that action at this time.