In [0]:
# Run a dummy streaming query so that MBeans for streaming metrics are created
# (provided spark.sql.streaming.metricsEnabled=true)
(spark
    .readStream
    .format("rate")
    .load()
    .writeStream
    .foreach(print)
    .start())

Out[1]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f41e4ab31f0>

In [0]:
%sh
# Write a script to collect all JMX metrics available for all Java processes
# into a JSON snippet compatible with the "jmxMetrics" field of the applicationinsights.json file
# (Application Insights Java agent configuration file).
# See https://learn.microsoft.com/azure/azure-monitor/app/java-jmx-metrics-configuration

rm -rf /dbfs/metrics/
mkdir -p /dbfs/metrics/result

cat <<EOF >/dbfs/metrics/dump-jmx-metrics.sh

test -e jmxterm-1.0.4-uber.jar || wget -q https://github.com/jiaqi/jmxterm/releases/download/v1.0.4/jmxterm-1.0.4-uber.jar
for pid in \$(ps -e | grep java | cut -f3 -d ' '); do

  echo "PID: \$pid"

  # Output available domains to notebook
  echo "open \$pid" > jmxterm-commands.txt
  echo "domains" >> jmxterm-commands.txt
  java -jar jmxterm-1.0.4-uber.jar -i jmxterm-commands.txt

  # List MBean names in desired domains
  echo "open \$pid" > jmxterm-commands.txt
  for domain in "java.lang" "metrics"; do
    echo "beans -d \$domain" >> jmxterm-commands.txt
  done
  java -jar jmxterm-1.0.4-uber.jar -i jmxterm-commands.txt > jmxterm-output.txt

  # Skip java processes that are not related to Spark
  grep --quiet name=spark jmxterm-output.txt || continue

  # Generate jmxterm commands file to list attributes for each MBean
  echo "open \$pid" > jmxterm-commands-detailed.txt
  python -c 'import sys, re; [sys.stdout.write(f"bean {line}\ninfo\n") for line in sys.stdin]' < jmxterm-output.txt >> jmxterm-commands-detailed.txt

  # Spaces in MBean names must be escaped in jmxterm command
  # see https://github.com/jiaqi/jmxterm/issues/41
  # This sed command replaces each space starting with the second occurrence
  sed -i 's/ /\\\\ /2g' jmxterm-commands-detailed.txt
  
  # Output MBeans detailed information to output file
  java -jar jmxterm-1.0.4-uber.jar -i jmxterm-commands-detailed.txt > /dbfs/metrics/result/\$1\$SPARK_LOCAL_IP-\$pid.txt 2>&1
done
EOF

In [0]:
%sh 
# Run the script for the driver
bash /dbfs/metrics/dump-jmx-metrics.sh driver-

PID: 477
Welcome to JMX terminal. Type "help" for available commands.
#Connection to 477 is opened
#following domains are available
JMImplementation
com.sun.management
java.lang
java.nio
java.util.logging
jdk.management.jfr
org.apache.logging.log4j2
Welcome to JMX terminal. Type "help" for available commands.
#Connection to 477 is opened
#domain = java.lang:
#IllegalArgumentException: Domain metrics doesn't exist, check your spelling
PID: 577
Welcome to JMX terminal. Type "help" for available commands.
#Connection to 577 is opened
#following domains are available
JMImplementation
com.sun.management
java.lang
java.nio
java.util.logging
jdk.management.jfr
org.apache.logging.log4j2
Welcome to JMX terminal. Type "help" for available commands.
#Connection to 577 is opened
#domain = java.lang:
#IllegalArgumentException: Domain metrics doesn't exist, check your spelling
PID: 808
Welcome to JMX terminal. Type "help" for available commands.
#Connection to 808 is opened
#following domains are av

In [0]:
%scala
// Run the script for each executor

import scala.concurrent.duration._

var res=sc.runOnEachExecutor[String]({ () =>
  import sys.process._
  var cmd_Result=Seq("bash", "-c", "/dbfs/metrics/dump-jmx-metrics.sh").!!
  cmd_Result
  }, 100.seconds)

In [0]:
%ls /dbfs/metrics/result

[0m[01;32m10.139.64.4-823.txt[0m*  [01;32mdriver-10.139.64.5-808.txt[0m*


In [0]:
# Parse the detailed output of jmxterm (text file with mbean names and attributes)
# into the JSON format of "jmxMetrics" field of the applicationinsights.json file.

import os, re, json

# Define the directory path where the jmxterm output files are located.
# Output JSON files are also written in this location.
directory = '/dbfs/metrics/result'

# Read each .txt file in the directory.
for filename in os.listdir(directory):
    if not filename.endswith('.txt'):
        continue

    filepath = os.path.join(directory, filename)
    
    # read the file
    with open(filepath, 'r') as file:
        lines = file.readlines()

    # read all mbean/attribute combinations into a results list
    results = []
    in_attributes_section = False
    for line in lines:
        if line.startswith('#'):
            in_attributes_section = line.startswith('# attributes')
            if line.startswith('#mbean = '):
                mbean_name = line.rstrip().removeprefix('#mbean = ')
                mbean_name_short = mbean_name.split(',')[0]
        elif in_attributes_section:
            match = re.search(r'  - (\S*).*,', line) 
            result_string = match.group(1)
            results.append({"name": mbean_name_short, "objectName": mbean_name, "attribute": result_string})

    # filter and process results
    filtered_results = []

    for result in results:
        others = len([1 for r in results if r["objectName"] == result["objectName"]])
        if others > 1:
            # Add the attribute name to the metric name, except when there is only one attribute
            # (or the other attribute is "Value" and this attribute is "Number", see exclusion rule below)
            if not(result["attribute"] == "Number" and not any(r["objectName"] == result["objectName"] and r["attribute"] != "Value" and r["attribute"] != "Number" for r in results)):
                result["name"] = f'{result["name"]}.{result["attribute"]}' 

    for result in results:
        # skip deprecated metrics (redundant with other metrics)
        if 'blacklist' in result["objectName"].lower():
            continue

        # Exclude weird object names from nested/anonymous classes
        if '$' in result["objectName"]:
            continue

        # Use JMX MBean Object Name Patterns for executor metric names which are dependent of the executor ID
        result["objectName"] = re.sub(r'^metrics:name=spark\.\d+\.', 'metrics:name=spark.*.', result["objectName"])

        # Streaming metrics: make independent of job ID, simplify prefix
        m = r'^metrics:name=spark\.driver\.spark\.streaming\.[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}\.'
        result["objectName"] = re.sub(m, 'metrics:name=spark.driver.spark.streaming.*.', result["objectName"])
        result["name"] = re.sub(m, 'metrics:name=spark.streaming.', result["name"])

        # Make Spark metric names shorter and independent of the executor ID
        result["name"] = re.sub(r'^metrics:name=spark\.\d+\.', 'spark.worker.', result["name"])
        result["name"] = re.sub(r'^metrics:name=spark\.', 'spark.', result["name"])

        # Exclude "Value" attributes when a "Number" attribute exists, since their values are then identical
        if result["attribute"] == "Value" and any(r["objectName"] == result["objectName"] and r["attribute"] == "Number" for r in results):
            continue

        filtered_results.append(result)
    
    # sort and output results
    dedup_results = [dict(t) for t in {tuple(d.items()) for d in filtered_results}]
    sorted_results = sorted(dedup_results, key=lambda x: (x['name'], x['objectName'], x['attribute']))
        
    with open(filepath + '.json', 'w') as f:
        json.dump(sorted_results, f, indent=4)

In [0]:
%sh
# Show some sample lines
grep processingRate /dbfs/metrics/result/*.json
grep activeTask /dbfs/metrics/result/*.json
grep ProcessCpuLoad /dbfs/metrics/result/*.json
grep OnHeapStorageMemory /dbfs/metrics/result/*.json
grep CollectionUsageThresholdExceeded /dbfs/metrics/result/*.json

/dbfs/metrics/result/driver-10.139.64.5-808.txt.json:        "name": "spark.streaming.processingRate-total",
/dbfs/metrics/result/driver-10.139.64.5-808.txt.json:        "objectName": "metrics:name=spark.driver.spark.streaming.*.processingRate-total,type=gauges",
/dbfs/metrics/result/10.139.64.4-823.txt.json:        "name": "spark.worker.executor.threadpool.activeTasks",
/dbfs/metrics/result/10.139.64.4-823.txt.json:        "objectName": "metrics:name=spark.*.executor.threadpool.activeTasks,type=gauges",
/dbfs/metrics/result/10.139.64.4-823.txt.json:        "name": "java.lang:type=OperatingSystem.ProcessCpuLoad",
/dbfs/metrics/result/10.139.64.4-823.txt.json:        "attribute": "ProcessCpuLoad"
/dbfs/metrics/result/driver-10.139.64.5-808.txt.json:        "name": "java.lang:type=OperatingSystem.ProcessCpuLoad",
/dbfs/metrics/result/driver-10.139.64.5-808.txt.json:        "attribute": "ProcessCpuLoad"
/dbfs/metrics/result/10.139.64.4-823.txt.json:        "name": "spark.worker.ExecutorMe

In [0]:
%sh 
zip /tmp/metrics.zip /dbfs/metrics/result/*
mkdir -p /dbfs/FileStore
mv /tmp/metrics.zip /dbfs/FileStore

  adding: dbfs/metrics/result/10.139.64.4-823.txt (deflated 94%)
  adding: dbfs/metrics/result/10.139.64.4-823.txt.json (deflated 94%)
  adding: dbfs/metrics/result/driver-10.139.64.5-808.txt (deflated 95%)
  adding: dbfs/metrics/result/driver-10.139.64.5-808.txt.json (deflated 96%)


After running the notebook, download the template JSON files at [metrics.zip](/files/metrics.zip).