Skip to content

Latest commit

 

History

History
150 lines (126 loc) · 7.62 KB

Flight_recorder_mode_FileSink.md

File metadata and controls

150 lines (126 loc) · 7.62 KB

SparkMeasure Flight Recorder - File Sink

Use sparkMeasure in flight recording mode to instrument Spark applications without touching their code. Flight recorder mode attaches a Spark Listener to the Spark Context, which takes care of collecting execution metrics while the application runs and of saving them for later processing.
There are two different levels of granularity for metrics collection: stage aggregation, with FlightRecorderStageMetrics, and task-level metrics with FlightRecorderTaskMetrics.
The collected metrics data can be saved to filesystem for later processing. The available file sinks are
locally mounted filesystems and Hadoop-compatible filesystems (HDFS, s3, etc). Metrics can also be printed to stdout.

Recording metrics using the Flight Recorder mode with Stage-level granularity

To record metrics at the stage execution level granularity add these configurations to spark-submit:

--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics

Flight Recorder mode with Task-level granularity

Use this to record metrics at the task execution level granularity. This can potentially generate large amounts of data in the driver. Consider using stage-level granularity first. The usage is almost the same as for the stage metrics mode described above, just change FlightRecorderStageMetrics with FlightRecorderTaskMetrics.
The configuration parameters applicable to Flight recorder mode for Task granularity are:

--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderTaskMetrics

Configuration

Configuration parameters for the flight recorder more:

--conf spark.sparkmeasure.outputFormat=<format> 
    Note: valid values: json,json_to_hadoop,java the default is "json"
--conf spark.sparkmeasure.outputFilename=<output file> 
    Note: default = "/tmp/stageMetrics_flightRecorder"
--conf spark.sparkmeasure.printToStdout=<true|false> /
    Note: default is false. Set this to true to print JSON serialized metrics for debug purposes.

Notes:

  • json and java serialization formats, write to the driver local filesystem
  • json_to_hadoop, writes to JSON serialized metrics to HDFS or to an Hadoop compliant filesystem, such as S3A
  • The amount of data generated by FlightRecorderStageMetrics is relatively small in most applications: O(number_of_stages)
  • FlightRecorderTaskMetrics can generate a large amount of data O(Number_of_tasks), use with care

Examples of how to use FlightRecorderStageMetrics

A Python example

  • This runs the pi.py example script
  • collects and saves the metrics to /tmp/stageMetrics_flightRecorder in json format:
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23 \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
examples/src/main/python/pi.py

# Read the flight recorder output file:
more /tmp/stageMetrics_flightRecorder

A Scala example

  • same example as above, in addition use a custom output filename
  • print metrics also to stdout
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23 \
--class org.apache.spark.examples.SparkPi \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
--conf spark.sparkmeasure.printToStdout=true \
--conf spark.sparkmeasure.outputFilename="/tmp/myoutput_$(date +%s).json" \
examples/jars/spark-examples_2.12-3.3.1.jar 10

# The metrics are printed on stdout and also saved to a file
# Find and read the flight recorder output file:
ls -ltr /tmp/myoutput*.json

Example of how to write the metrics output file to HDFS when running in cluster mode on YARN. This example collected metrics with Task granularity.
(note: source the Hadoop environment before running this)

bin/spark-submit --master yarn --deploy-mode cluster \
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24 \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderTaskMetrics \
--conf spark.sparkmeasure.outputFormat=json_to_hadoop \
--conf spark.sparkmeasure.outputFilename="hdfs://myclustername/user/luca/test/myoutput_$(date +%s).json" \
examples/src/main/python/pi.py

# Find the output file in HDFS
hdfs dfs -ls <path>/myoutput_*.json

Example, use spark-3.3.0, Kubernetes, Scala 2.12 and write output to S3:
(note: export KUBECONFIG=... + setup Hadoop environment + configure s3a keys in the script)

bin/spark-submit --master k8s://https://XXX.XXX.XXX.XXX --deploy-mode client --conf spark.executor.instances=3 \
--conf spark.executor.cores=2 --executor-memory 6g --driver-memory 8g \
--conf spark.kubernetes.container.image=<registry-URL>/spark:v3.0.0_20190529_hadoop32 \
--packages org.apache.hadoop:hadoop-aws:3.3.2,ch.cern.sparkmeasure:spark-measure_2.12:0.24 \
--conf spark.hadoop.fs.s3a.secret.key="YYY..." \
--conf spark.hadoop.fs.s3a.access.key="ZZZ..." \
--conf spark.hadoop.fs.s3a.endpoint="https://s3.cern.ch" \
--conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
--conf spark.sparkmeasure.outputFormat=json_to_hadoop \
--conf spark.sparkmeasure.outputFilename="s3a://test/myoutput_$(date +%s).json" \
--class org.apache.spark.examples.SparkPi \
examples/jars/spark-examples_2.12-3.3.1.jar 10

Metrics post-processing

To post-process the saved metrics you will need to deserialize objects saved by the flight mode. This is an example of how to do that using the supplied helper object sparkmeasure.Utils

bin/spark-shell  --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24

val myMetrics = ch.cern.sparkmeasure.IOUtils.readSerializedStageMetricsJSON("/tmp/stageMetrics_flightRecorder")
// use ch.cern.sparkmeasure.IOUtils.readSerializedStageMetrics("/tmp/stageMetrics.serialized") for java serialization
myMetrics.toDF.show()

IOUTils API

The following methods of IOUtils are used to read and write serialized Task and stage metrics data:

  def writeSerializedJSON(fullPath: String, metricsData: AnyRef): Unit = 
  def writeSerializedJSONToHadoop(fullPath: String, metricsData: AnyRef, conf: SparkConf): Unit 
  def writeToStringSerializedJSON(metricsData: AnyRef): String
  def writeSerialized(fullPath: String, metricsData: Any): Unit

  def readSerializedStageMetricsJSON(stageMetricsFileName: String): List[StageVals]
  def readSerializedStageMetrics(stageMetricsFileName: String): ListBuffer[StageVals]
  def readSerializedTaskMetricsJSON(taskMetricsFileName: String): List[TaskVals]
  def readSerializedTaskMetrics(stageMetricsFileName: String): ListBuffer[TaskVals]

Notes

  • If you are deploying applications using cluster mode, note that metrics serialized with json or java are written by the driver into the local filesystem. You could use a network filesystem mounted on the driver/cluster for convenience. You can also use json_to_hadoop serialization to write the metrics to HDFS or and Hadoop-compliant filesystem such as S3.

  • The flight recorder mode is similar to what Spark already does with the event log, where metrics are stored: see also Spark documentation for spark.eventLog.enabled and spark.eventLog.dir and for details on the Spark History Server.
    See also this note with a few tips on how to read event log files

  • For metrics analysis see also notes at Notes_on_metrics_analysis.md for a few examples.