Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-424] Allow to use Sparkling Water tab in the Spark History Server #270

Merged
merged 1 commit into from May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -330,6 +330,11 @@ Follow our [H2O Stream](https://groups.google.com/forum/#!forum/h2ostream).
> **YARN mode**: The executors logs are available via `yarn logs -applicationId <appId>` command. Driver logs are by default printed to console, however, H2O also writes logs into `current_dir/h2ologs`.

> The location of H2O driver logs can be controlled via Spark property `spark.ext.h2o.client.log.dir` (pass via `--conf`) option.

* How to display Sparkling Water information in the Spark History Server?
> Sparkling Water reports the information already, you just need to add the sparkling-water classes on the classpath of the Spark history server.
> To see how to configure the spark application for logging into the History Server, please see [Spark Monitoring Configuration](http://spark.apache.org/docs/latest/monitoring.html)


* Spark is very slow during initialization or H2O does not form a cluster. What should I do?

Expand Down

This file was deleted.

This file was deleted.

@@ -0,0 +1 @@
org.apache.spark.h2o.ui.SparklingWaterHistoryListenerFactory
51 changes: 37 additions & 14 deletions core/src/main/scala/org/apache/spark/h2o/H2OContext.scala
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.h2o.backends.SparklingBackend
import org.apache.spark.h2o.backends.external.ExternalH2OBackend
import org.apache.spark.h2o.backends.internal.InternalH2OBackend
import org.apache.spark.h2o.converters._
import org.apache.spark.h2o.ui.SparklingWaterUITab
import org.apache.spark.h2o.ui._
import org.apache.spark.h2o.utils.{H2OContextUtils, LogUtil, NodeDesc}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
Expand Down Expand Up @@ -63,22 +63,15 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
self =>
val announcementService = AnnouncementServiceFactory.create(conf)
val sparkContext = sparkSession.sparkContext

val sparklingWaterListener = new SparklingWaterListener(sparkContext.conf)
val uiUpdateThread = new H2ORuntimeInfoUIThread(sparkContext, conf)
/** IP of H2O client */
private var localClientIp: String = _
/** REST port of H2O client */
private var localClientPort: Int = _
/** Runtime list of active H2O nodes */
private val h2oNodes = mutable.ArrayBuffer.empty[NodeDesc]
private var stopped = false;
/** Sparkling Water UI extension for Spark UI */
private val sparklingWaterTab: Option[SparklingWaterUITab] = {
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new SparklingWaterUITab(this))
} else {
None
}
}
private var stopped = false

/** Used backend */
val backend: SparklingBackend = if(conf.runsInExternalClusterMode){
Expand All @@ -105,7 +98,7 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
s" points to Spark of version ${sparkContext.version}. Please ensure correct Spark is provided and" +
s" re-run Sparkling Water.")
}

sparkContext.addSparkListener(sparklingWaterListener)
// Init the H2O Context in a way provided by used backend and return the list of H2O nodes in case of external
// backend or list of spark executors on which H2O runs in case of internal backend
val nodes = backend.init()
Expand All @@ -115,13 +108,42 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
localClientIp = sys.env.getOrElse("SPARK_PUBLIC_DNS", sparkContext.env.rpcEnv.address.host)
localClientPort = H2O.API_PORT
// Register UI
sparklingWaterTab.foreach(_.attach())
if (conf.getBoolean("spark.ui.enabled", true)) {
new SparklingWaterUITab(sparklingWaterListener, sparkContext.ui.get)
}
logInfo("Sparkling Water started, status of context: " + this)
// Announce Flow UI location
announcementService.announce(FlowLocationAnnouncement(H2O.ARGS.name, "http", localClientIp, localClientPort))
updateUIAfterStart() // updates the spark UI
uiUpdateThread.start() // start periodical updates of the UI
this
}

private[this] def updateUIAfterStart(): Unit ={
val h2oBuildInfo = H2OBuildInfo(
H2O.ABV.projectVersion(),
H2O.ABV.branchName(),
H2O.ABV.lastCommitHash(),
H2O.ABV.describe(),
H2O.ABV.compiledBy(),
H2O.ABV.compiledOn()
)
val h2oCloudInfo = H2OCloudInfo(
h2oLocalClient,
H2O.CLOUD.healthy(),
H2O.CLOUD.members().map(node => node.getIpPortString),
backend.backendUIInfo,
H2O.START_TIME_MILLIS.get()
)

val swPropertiesInfo = _conf.getAll.filter(_._1.startsWith("spark.ext.h2o"))

sparkSession.sparkContext.listenerBus.post(SparkListenerH2OStart(
h2oCloudInfo,
h2oBuildInfo,
swPropertiesInfo
))
}
/**
* Return a copy of this H2OContext's configuration. The configuration ''cannot'' be changed at runtime.
*/
Expand Down Expand Up @@ -221,6 +243,7 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
def stop(stopSparkContext: Boolean = false): Unit = synchronized {
if(!stopped) {
announcementService.shutdown
uiUpdateThread.interrupt()
backend.stop(stopSparkContext)
H2OContext.stop(this)
stopped = true
Expand Down Expand Up @@ -249,7 +272,7 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
| Open H2O Flow in browser: http://$h2oLocalClient (CMD + click in Mac OSX)
""".stripMargin
}

// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/** Define implicits available via h2oContext.implicits._*/
Expand Down
Expand Up @@ -64,6 +64,13 @@ trait SharedBackendConf {
def h2oNodeLogLevel = sparkConf.get(PROP_NODE_LOG_LEVEL._1, PROP_NODE_LOG_LEVEL._2)
def h2oNodeLogDir = sparkConf.getOption(PROP_NODE_LOG_DIR._1)

def uiUpdateInterval = sparkConf.getInt(PROP_UI_UPDATE_INTERVAL._1, PROP_UI_UPDATE_INTERVAL._2)

def setUiUpdateInterval(interval: Int): H2OConf = {
sparkConf.set(PROP_UI_UPDATE_INTERVAL._1, interval.toString)
self
}

def setH2ONodeLogLevel(level: String): H2OConf = {
sparkConf.set(PROP_NODE_LOG_LEVEL._1, level)
self
Expand Down Expand Up @@ -105,6 +112,8 @@ trait SharedBackendConf {

object SharedBackendConf {

/** Interval for updates of Spark UI in milliseconds */
val PROP_UI_UPDATE_INTERVAL = ("spark.ext.h2o.ui.update.interval", 10000)
/** H2O internal log level for launched remote nodes. */
val PROP_NODE_LOG_LEVEL = ("spark.ext.h2o.node.log.level", "INFO")

Expand Down
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.h2o.ui

import org.apache.spark.SparkContext
import org.apache.spark.h2o.H2OConf
import water.H2O

/**
* Periodically publish info to UI
*/
class H2ORuntimeInfoUIThread(sc: SparkContext, conf : H2OConf) extends Thread{
override def run(): Unit = {
while(!Thread.interrupted()){
sc.listenerBus.post(SparkListenerH2ORuntimeUpdate(H2O.CLOUD.healthy(),System.currentTimeMillis()))
Thread.sleep(conf.uiUpdateInterval)
}
}
}