Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26457] Show hadoop configurations in HistoryServer environment tab #23486

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-finishedDrivers','aggregated-finishedDrivers');
collapseTablePageLoad('collapse-aggregated-runtimeInformation','aggregated-runtimeInformation');
collapseTablePageLoad('collapse-aggregated-sparkProperties','aggregated-sparkProperties');
collapseTablePageLoad('collapse-aggregated-hadoopProperties','aggregated-hadoopProperties');
collapseTablePageLoad('collapse-aggregated-systemProperties','aggregated-systemProperties');
collapseTablePageLoad('collapse-aggregated-classpathEntries','aggregated-classpathEntries');
collapseTablePageLoad('collapse-aggregated-activeJobs','aggregated-activeJobs');
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2370,8 +2370,8 @@ class SparkContext(config: SparkConf) extends Logging {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
addedFilePaths)
val environmentDetails = SparkEnv.environmentDetails(conf, hadoopConfiguration,
schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import java.io.File
import java.net.Socket
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Properties

import com.google.common.collect.MapMaker
import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
Expand Down Expand Up @@ -401,6 +403,7 @@ object SparkEnv extends Logging {
private[spark]
def environmentDetails(
conf: SparkConf,
hadoopConf: Configuration,
schedulingMode: String,
addedJars: Seq[String],
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
Expand Down Expand Up @@ -436,9 +439,13 @@ object SparkEnv extends Logging {
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

// Add Hadoop properties, it will not ignore configs including in Spark. Some spark
// conf starting with "spark.hadoop" may overwrite it.
val hadoopProperties = hadoopConf.asScala.map(entry => (entry.getKey, entry.getValue)).toSeq
srowen marked this conversation as resolved.
Show resolved Hide resolved
Map[String, Seq[(String, String)]](
"JVM Information" -> jvmInformation,
"Spark Properties" -> sparkProperties,
"Hadoop Properties" -> hadoopProperties,
"System Properties" -> otherProperties,
"Classpath Entries" -> classPaths)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ private[spark] class AppStatusListener(
val envInfo = new v1.ApplicationEnvironmentInfo(
runtime,
details.getOrElse("Spark Properties", Nil),
details.getOrElse("Hadoop Properties", Nil),
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ class VersionInfo private[spark](
class ApplicationEnvironmentInfo private[spark] (
val runtime: RuntimeInfo,
val sparkProperties: Seq[(String, String)],
val hadoopProperties: Seq[(String, String)],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running the SHS with previous event logs, it seems that the API change will cause json parse exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Thank you! I have repaired it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks it breaks mima but the constructor is private. Let's exclude it in mima. See the Jenkins test messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I will try it.

val systemProperties: Seq[(String, String)],
val classpathEntries: Seq[(String, String)])

Expand Down
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ private[ui] class EnvironmentPage(
propertyHeader, jvmRow, jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow,
Utils.redact(conf, appEnv.sparkProperties.toSeq), fixedWidth = true)
val hadoopPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow,
Utils.redact(conf, appEnv.hadoopProperties.toSeq), fixedWidth = true)
val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, appEnv.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
Expand Down Expand Up @@ -70,26 +72,37 @@ private[ui] class EnvironmentPage(
<div class="aggregated-sparkProperties collapsible-table">
{sparkPropertiesTable}
</div>
<span class="collapse-aggregated-hadoopProperties collapse-table"
onClick="collapseTable('collapse-aggregated-hadoopProperties',
'aggregated-hadoopProperties')">
<h4>
<span class="collapse-table-arrow arrow-closed"></span>
<a>Hadoop Properties</a>
</h4>
</span>
<div class="aggregated-hadoopProperties collapsible-table collapsed">
{hadoopPropertiesTable}
</div>
<span class="collapse-aggregated-systemProperties collapse-table"
onClick="collapseTable('collapse-aggregated-systemProperties',
'aggregated-systemProperties')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<span class="collapse-table-arrow arrow-closed"></span>
<a>System Properties</a>
</h4>
</span>
<div class="aggregated-systemProperties collapsible-table">
<div class="aggregated-systemProperties collapsible-table collapsed">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, so only the Spark properties and system info are expanded by default? that sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but collapseTablePageLoad in web.js may remember actions once expanded it.

{systemPropertiesTable}
</div>
<span class="collapse-aggregated-classpathEntries collapse-table"
onClick="collapseTable('collapse-aggregated-classpathEntries',
'aggregated-classpathEntries')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<span class="collapse-table-arrow arrow-closed"></span>
<a>Classpath Entries</a>
</h4>
</span>
<div class="aggregated-classpathEntries collapsible-table">
<div class="aggregated-classpathEntries collapsible-table collapsed">
{classpathEntriesTable}
</div>
</span>
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ private[spark] object JsonProtocol {
val environmentDetails = environmentUpdate.environmentDetails
val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap)
val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
val hadoopProperties = mapToJson(environmentDetails("Hadoop Properties").toMap)
val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
("JVM Information" -> jvmInformation) ~
("Spark Properties" -> sparkProperties) ~
("Hadoop Properties" -> hadoopProperties) ~
("System Properties" -> systemProperties) ~
("Classpath Entries" -> classpathEntries)
}
Expand Down Expand Up @@ -653,9 +655,13 @@ private[spark] object JsonProtocol {
}

def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
// For compatible with previous event logs
val hadoopProperties = jsonOption(json \ "Hadoop Properties").map(mapFromJson(_).toSeq)
.getOrElse(Seq.empty)
val environmentDetails = Map[String, Seq[(String, String)]](
"JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
"Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
"Hadoop Properties" -> hadoopProperties,
"System Properties" -> mapFromJson(json \ "System Properties").toSeq,
"Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
SparkListenerEnvironmentUpdate(environmentDetails)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
[ "spark.app.id", "app-20161116163331-0000" ],
[ "spark.task.maxFailures", "4" ]
],
"hadoopProperties" : [
[ "mapreduce.jobtracker.address", "local" ],
[ "yarn.resourcemanager.scheduler.monitor.policies", "org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy" ],
[ "mapreduce.jobhistory.client.thread-count", "10" ]
],
"systemProperties" : [
[ "java.io.tmpdir", "/var/folders/l4/d46wlzj16593f3d812vk49tw0000gp/T/" ],
[ "line.separator", "\n" ],
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
"test", Some("attempt1")),
SparkListenerEnvironmentUpdate(Map(
"Spark Properties" -> properties.toSeq,
"Hadoop Properties" -> Seq.empty,
"JVM Information" -> Seq.empty,
"System Properties" -> Seq.empty,
"Classpath Entries" -> Seq.empty
Expand Down Expand Up @@ -882,6 +883,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
SparkListenerApplicationStart("end-event-test", Some("end-event-test"), 1L, "test", None),
SparkListenerEnvironmentUpdate(Map(
"Spark Properties" -> Seq.empty,
"Hadoop Properties" -> Seq.empty,
"JVM Information" -> Seq.empty,
"System Properties" -> Seq.empty,
"Classpath Entries" -> Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val secretPassword = "secret_password"
val conf = getLoggingConf(testDirPath, None)
.set(key, secretPassword)
val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
val envDetails = SparkEnv.environmentDetails(conf, "FIFO", Seq.empty, Seq.empty)
val envDetails = SparkEnv.environmentDetails(conf, hadoopconf, "FIFO", Seq.empty, Seq.empty)
val event = SparkListenerEnvironmentUpdate(envDetails)
val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
assert(redactedProps(key) == "*********(redacted)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
"JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
"Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
"Hadoop Properties" -> Seq(("hadoop.tmp.dir", "/usr/local/hadoop/tmp")),
"System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
"Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
))
Expand Down Expand Up @@ -1761,6 +1762,9 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Spark Properties": {
| "Job throughput": "80000 jobs/s, regardless of job type"
| },
| "Hadoop Properties": {
| "hadoop.tmp.dir": "/usr/local/hadoop/tmp"
| },
| "System Properties": {
| "Username": "guest",
| "Password": "guest"
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.setActiveContext"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.markPartiallyConstructed"),

// [SPARK-26457] Show hadoop configurations in HistoryServer environment tab
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this changes any user-facing API then? if it's something people might use anywhere, I'd keep the constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, maybe regarding hadoopConf as an optional parameters will be better.

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.this"),

// Data Source V2 API changes
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
Expand Down