Spark-Tracing is a tool that allows users of Apache Spark to inspect its internal state as it runs jobs for performance and debugging purposes. It consists of three stages:
-
First, a Java bytecode instrumentation package is attached to Spark and the desired job is run. This outputs several trace files containing logs of Spark's internal events.
-
Next, a trace processing job is run on Spark with the trace files from step 1 as input. This consolidates and filters data and provides a single output file with processed event logs and statistics.
-
Finally, the processed output file is provided as input to a Jupyter notebook that displays statistics about the run and an interactive time sequence diagram of events that occurred during the run.
These steps are described in more detail in the following sections.
This project is written in Scala and requires SBT to compile. There are two SBT subprojects: the instrumentation in instrument
and the processor in process
. Running sbt assembly
in the project root will compile and package both of these, placing JARs
in <subproject>/target/scala-2.11/<subproject>-assembly-<version>.jar
.
The first two steps of the tracing process are configured using a HOCON configuration
file. (The third stage does not require configuration.) This file describes what to instrument, how to display the results, and
where to place files. A default config file is available at config/default.conf
; aside from the paths, this configuration should
be sufficient for most jobs. The configuration has up to four sections, described individually below.
This section holds basic options affecting the overall run of the program, like file paths. Valid keys are:
-
traceout
(string): This is the directory where the trace files generated by the instrumentation will be placed. The HadoopFileSystem
library is used for writing these files, so it must be a URL passable to this library. For writing to HDFS, use thehdfs://
scheme. For writing to the local filesystem, usefile://
. Trace output files will be generated by each JVM participating in the Spark job, so if you write to the local filesystem, be sure that the provided path is writable on all nodes in the cluster. The files here will no longer be needed after the processing job has been run, but they will be necessary if you want to re-analyze the data or change filters. -
result
(string): The path to the file where the results of processing should be written. Unliketraceout
, this is not a URL. It must be a writable path on the submitting machine. This file will be fed to the visualization notebook. -
overhead
(boolean): If true, the instrumentation will track its overhead during the run. This will be reported in the visualization notebook's statistics. Enabling this adds some additional overhead to the instrumentation, so you may want to disable it if you aren't interested in this. -
mode
(string): This affects the operation of the processor. By default, it is set toprocess
, which results in the behavior described in the "Running" section below. If this is instead set todump
, the processor will not output the processed data. Instead it will dump the filtered and transformed trace events to the file specified in theresult
parameter in a format designed for human consumption. This can be used to aid in configuring and debugging additions to the configuration file.
This section is the crux of the tracing configuration. It describes what function calls should be instrumented and reported in the trace files. The defaults are good for getting a high-level overview of Spark's internal operations if running on YARN, but for debugging specific areas of the code (mllib, SQL, etc.) or running on a different resource manager, it may be desirable to modify this section.
targets
is a map of package names to arrays of tracers -- let's call each package the "bounding package" of the tracers in its
array. Each "tracer" implements a specific type of instrumentation that captures and records certain actions. Tracers are limited
in scope to instrumenting functions in their bounding package. The currently available ones are:
-
main
: This logs the JVM start and shutdown processes. It is always enabled and cannot be specified in the configuration file. -
rpc
: This instrumentsNettyRpcEnv
andNettyRpcEnvFactory
to capture RPCs that are sent between Spark components. It does not take any arguments, and its bounding package must beorg.apache.spark.rpc.netty
(or one of its superpackages). This tracer is required to resolve JVM UUIDs to service names, and by extension to filter services. If it is disabled, UUIDs will be used in place of service names in the visualization. -
event
andspan
: These instrument a specified function with its arguments and return value. Theevent
tracer logs the moment a function was called, which will appear as a single point on the sequence plot. Thespan
tracer also logs the duration of the function call, which will be displayed as a line with start and end markers. This can be useful for evaluating how long it takes to complete various processes, such as creating a Spark session or transferring files.Both tracers take the same arguments, all of which are strings.
class
is the path to the class where the function is defined relative to the block's bounding package.method
is the name of the method within that class that is being instrumented.format
is optional; if defined and not empty, it tells the processor to format the event using that string rather than the fully-qualified function name and all of its arguments. The formatting string determines what is displayed when hovering over events in the sequence diagram, so adding simple, descriptive names here can make the visualization a lot easier to understand. The function's arguments can be inserted here using$1
for the first argument,$2
for the second, and so on. If the tracer is aspan
, the return value can also be accessed with$r
(otherwisenull
will be substituted). (You can actually descend the arguments' event trees using$1.1
and similar. See the "Event Trees" section for more information.)
The only bounding package in the default configuration is org.apache.spark
. The instrumentation will iterate over all classes
defined in each bounding package when they are loaded, searching for ones to instrument. Thus, it is important not to set them too
broadly. In fact, including certain portions of java.lang
here will cause the instrumentation to try introspecting itself even
if you don't define any targets within that package, causing bad things to happen. In general, choose the packages as
restrictively as possible for safety and performance reasons.
The processor allows certain events to be excluded from the sequence diagram. This is done by specifying conditions in the
filters
section. Filtering relies heavily on event tree extraction, so read the "Event Trees" section before continuing.
Each element in the filters list is a string specifying a filter condition, mapped either to a boolean indicating whether matching
events should be included (true) or excluded (false) or to a nested map of further conditions that will be applied on top of the
current one. The condition string is a string with the item to match, the type of match to perform, and the thing to match against,
separated by spaces. The item to match is a tree extraction path (see "Event Trees") without the preceding $
. The operation is
=
for equality and !=
for non-equality. The thing to match against is a literal string. For example, the default
configuration contains the following section:
filters = {
"3.0 = RPC" {
"3.3.0 = Heartbeat" = false
"3.3.0 = HeartbeatResponse" = false
}
}
This will select lines where $3.0
is RPC
and $3.3.0
is either Heartbeat
or HeartbeatResponse
, and excludes them (false
)
from the output. In other words, it filters out hearbeats and their responses.
To specify more complicated cascading rules, you can use the key default
to set the default behavior for events matched at a given
level, and then override them with deeper levels of comparisons. For example, the following configuration will exclude all RPCs
from the output except for status updates with a status of RUNNING
:
filters = {
"3.0 = RPC" {
default = false
"3.3.0 = StatusUpdate" {
"3.3.3 = RUNNING" = true
}
}
}
This section allows services (driver, executor, allocation manager) to be excluded from the output. For example, the
driverPropsFetcher
that is launched before each executor is generally not of interest for tracing. This section is a list of
strings specifying regular expressions matching services to be removed. The string it matches against is the IP address, port
number, and name of the service, separated by spaces -- for example, 192.168.0.10 56892 SparkExecutor
. Keep in mind that when a
service is removed, all events and spans that occur on it and all RPCs sent or received by it are also dropped.
This section allows specifying that certain strings should be parsed as Scala Product
strings of the form name(arg1, arg2)
.
Case classes and other product types are automatically broken down into event trees (see below) by the instrumentation before
dumping them, but regular classes with toString
methods imitating this formatting will not be split. This section allows the user
to selectively parse certain strings to event trees, allowing the user to pretend that they were case classes all along.
The syntax is the same as that of the filters
section, except that the leaf values are arrays of strings rather than booleans.
Each such string is the path of a subtree that should be parsed as a case class. For example, say you have the following event
tree:
0.......Fn
1.0.......myFunction
1.1.........MyCaseClass
2.0.......TraversableOnce
2.1.........MySubclass(1,123,cat)
3.........null
Element $2.1
is clearly a compound type, but it is treated as a single element. The following case-parse
section will match
trees where $0
is Fn
and $1.0
is myFunction
, and parse $2.1
to an event tree.
case-parse = {
"0 = Fn" = {
"1.0 = myFunction" = ["2.1"]
}
}
This will result in the tree being amended to the following:
0.......Fn
1.0.......myFunction
1.1.........MyCaseClass
2.0.......TraversableOnce
2.1.0.......MySubclass
2.1.1.........1
2.1.2.........123
2.1.3.........cat
3.........null
The first step to running the instrumentation is to instrument Spark and run a job. Before doing this, the instrumentation JAR from
instrument/target/scala-2.11
and the tracing configuration file need to be placed in the same location on each node of the cluster
that will be running a Spark process. Be sure to set the traceout
property in the config file to an appropriate location for the
output files. To apply the instrumentation to Spark, in spark-defaults.conf
(or on the command-line) set
spark.driver.extraJavaOptions
, spark.executor.extraJavaOptions
, and (if running on YARN) spark.yarn.am.extraJavaOptions
to
-javaagent:/path/to/instrumentation.jar -Dinstrument.config=/path/to/instrumentation.conf
. If you want Spark listener events to
be logged as well, you will also need to set spark.extraListeners
to org.apache.spark.SparkFirehoseListener
(and uncomment the
SparkFirehoseListener
line in the default config, if you are using it). Then run a Spark job using spark-submit
or
spark-shell
as usual. Several trace files should be generated in traceout
.
To process the results and get the single processed output file, simply run spark-submit /path/to/processor.jar /path/to/instrumentation.conf
. Generally, the same configuration should be used as was used by the instrumentation. This will
read trace files from traceout
and write the output file to result
. It is important that the Spark job be able to read the
trace files. For this reason, it is recommended to write the trace files to HDFS rather than to the local filesystem. Under almost
all circumstances, you should disable tracing before running the processor; otherwise, the processor will try to consume its own
partially-written trace output. It is possible to combine the trace files from multiple runs of Spark. This will display the
sequence diagrams for the two jobs side-by-side for easy comparison. To do this, place the trace files for each comparison run in a
separate directory, and provide these extra directories as additional arguments (as Hadoop FileSystem
URLs) to the spark-submit
invocation. The "primary" trace at traceout
, as well as the comparison traces in the additional arguments, will be read and
processed into the single output file.
Once the output file has been achieved, open the provided notebook, Spark Tracing.ipynb
, in Jupyter and edit the first line of the
last cell to reference the output file. Then run all cells in sequence. It may be necessary to install Python's pandas
and
bokeh
packages beforehand. Tables with various statistics will be generated, followed by an interactive time sequence diagram.
The vertical axis is time, progressing from the top toward the bottom. Each tick on the horizontal axis is a Spark service
identified by IP, port number, and RPC environment name. The red dots are events. The blue lines are RPCs sent between services,
and the blue dot indicates the sender of each RPC. The vertical green lines are spans, each of which runs from a light green
triangle indicating the start of a function call down to a dark green triangle indicating that call's completion. Hovering over any
dot or triangle with the mouse will show the description of the corresponding event. The plot can be panned and zoomed vertically
by clicking and dragging and by scrolling with the mouse wheel, respectively.
Each line in the trace files output by the instrumentation is the recursive decomposition of a Scala Product
type, usually a case
class or tuple. Each product may contain primitives, non-product classes, and other products, which contain more products, and so
on. This can be represented with a tree of product types, where the branch nodes are products and the leaf nodes are non-products.
The tree can then be traversed for analyzing or modifying events. Each event in the processor is handled as such an tree, which I
call an "event tree". While these are largely internal, understanding them is necessary for anyone who wants to modify
Spark-Tracing. Thus, a quick introduction follows.
Each node in an event tree has one or more children. The zeroth child, denoted $0
, is the name of the node, which has itself as
its zeroth child and has no other children. This is the name of a case class or TupleX
for a tuple of size X
. The following
children, $1
, $2
, and so on, are the product type's comma-separated arguments. If the type is not a product or is a product
with no arguments, only $0
will exist. Each of these children, in turn, can have only a name or have further arguments of their
own. These are denoted by separating indices with periods. Thus, $1.0
is the name of the first argument and $1.1
is the first
argument of the first argument.
For illustration, let's look at a typical event tree as formatted by the processor's "dump" mode:
0.......Tuple3
1.........e6b1e0ff-9c8f-41cd-a59c-e7d8857a7153
2.........6444
3.0.......Fn
3.1.0.......org.apache.spark.storage.BlockManagerMaster.registerBlockManager
3.1.1.........org.apache.spark.storage.BlockManagerId
3.1.2.........long
3.1.3.........long
3.1.4.........org.apache.spark.rpc.RpcEndpointRef
3.2.0.......TraversableOnce
3.2.1.0.......BlockManagerId
3.2.1.1.........driver
3.2.1.2.........example.com
3.2.1.3.........40510
3.2.1.4.........None
3.2.2.........384093388
3.2.3.........0
3.2.4.0.......NettyRpcEndpointRef
3.2.4.1.........spark://BlockManagerEndpoint1@example.com:43285
3.3.........null
For all lines in the trace file, $0
is Tuple3
, $1
is a UUID specifying the current JVM invocation, $2
is the Java timestamp
of this event, and $3
is the event body. Note that since $1
and $2
are plain strings, I could also denote them as $1.0
or
$2.0.0.0
. In this case, the payload is a Fn
(that's $3.0
), indicating that this tree represents a function call event logged
by the event
tracer.
A function call has $0
= Fn
, the fully qualified name of the function as $1
(with the types of its arguments as children), a
list of the function's arguments as $2
, and its return value as $3
. So, we can extract the type of our function call's second
argument with $3.1.2
(long
) and its value with $3.2.2
(384093388
). Note that the first argument, of type BlockManagerId
,
is a case class and is broken down into its components ($3.2.1.0
through $3.2.1.4
) in the tree. These same concepts can be used
on other lines in the file, and the code makes heavy use of such "extraction paths" in formatting and calculating the values that
are ultimately plotted.
This project is developed primarily by Matthew Schauer at IBM. All code and assets are copyright 2017 IBM Corp. and released under
the terms of the Apache License, version 2.0. For more information, see the LICENSE
file in the root directory of this project.