Skip to content

Commit

Permalink
[SPARK-33476][CORE] Generalize ExecutorSource to expose user-given fi…
Browse files Browse the repository at this point in the history
…le system schemes

### What changes were proposed in this pull request?

This PR aims to generalize executor metrics to support user-given file system schemes instead of the fixed `file,hdfs` scheme.

### Why are the changes needed?

For the users using only cloud storages like `S3A`, we need to be able to expose `S3A` metrics. Also, we can skip unused `hdfs` metrics.

### Does this PR introduce _any_ user-facing change?

Yes, but compatible for the existing users which uses `hdfs` and `file` filesystem scheme only.

### How was this patch tested?

Manually do the following.

```
$ build/sbt -Phadoop-cloud package
$ sbin/start-master.sh; sbin/start-slave.sh spark://$(hostname):7077
$ bin/spark-shell --master spark://$(hostname):7077 -c spark.executor.metrics.fileSystemSchemes=file,s3a -c spark.metrics.conf.executor.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
scala> spark.read.textFile("s3a://dongjoon/README.md").collect()
```

Separately, launch `jconsole` and check `*.executor.filesystem.s3a.*`. Also, confirm that there is no `*.executor.filesystem.hdfs.*`

```
$ jconsole
```
![Screen Shot 2020-11-17 at 9 26 03 PM](https://user-images.githubusercontent.com/9700541/99487609-94121180-291b-11eb-9ed2-964546146981.png)

Closes #30405 from dongjoon-hyun/SPARK-33476.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
dongjoon-hyun committed Nov 18, 2020
1 parent 689c294 commit 594c7c6
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 4 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler
import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
import java.util.{Locale, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.GuardedBy
Expand Down Expand Up @@ -110,7 +110,9 @@ private[spark] class Executor(
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
private val executorSource = new ExecutorSource(threadPool, executorId)
private val schemes = conf.get(EXECUTOR_METRICS_FILESYSTEM_SCHEMES)
.toLowerCase(Locale.ROOT).split(",").map(_.trim).filter(_.nonEmpty)
private val executorSource = new ExecutorSource(threadPool, executorId, schemes)
// Pool used for threads that supervise task killing / cancellation
private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper")
// For tasks which are in the process of being killed, this map holds the most recently created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.spark.metrics.source.Source

private[spark]
class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {
class ExecutorSource(
threadPool: ThreadPoolExecutor,
executorId: String,
fileSystemSchemes: Array[String]) extends Source {

private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme))
Expand Down Expand Up @@ -70,7 +73,7 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
})

// Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
for (scheme <- fileSystemSchemes) {
registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("0")

private[spark] val EXECUTOR_METRICS_FILESYSTEM_SCHEMES =
ConfigBuilder("spark.executor.metrics.fileSystemSchemes")
.doc("The file system schemes to report in executor metrics.")
.version("3.1.0")
.stringConf
.createWithDefaultString("file,hdfs")

private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
.withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS)
Expand Down
2 changes: 2 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,8 @@ This is the component with the largest amount of instrumented metrics
These metrics are exposed by Spark executors.

- namespace=executor (metrics are of type counter or gauge)
- **notes:**
- `spark.executor.metrics.fileSystemSchemes` (default: `file,hdfs`) determines the exposed file system metrics.
- bytesRead.count
- bytesWritten.count
- cpuTime.count
Expand Down

0 comments on commit 594c7c6

Please sign in to comment.