Skip to content

Commit

Permalink
[SW-424] Allow to use Sparkling Water tab in the Spark History Server (
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubhava committed May 9, 2017
1 parent 423b012 commit a97ed74
Show file tree
Hide file tree
Showing 11 changed files with 367 additions and 133 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -330,6 +330,11 @@ Follow our [H2O Stream](https://groups.google.com/forum/#!forum/h2ostream).
> **YARN mode**: The executors logs are available via `yarn logs -applicationId <appId>` command. Driver logs are by default printed to console, however, H2O also writes logs into `current_dir/h2ologs`.
> The location of H2O driver logs can be controlled via Spark property `spark.ext.h2o.client.log.dir` (pass via `--conf`) option.
* How to display Sparkling Water information in the Spark History Server?
> Sparkling Water reports the information already, you just need to add the sparkling-water classes on the classpath of the Spark history server.
> To see how to configure the spark application for logging into the History Server, please see [Spark Monitoring Configuration](http://spark.apache.org/docs/latest/monitoring.html)

* Spark is very slow during initialization or H2O does not form a cluster. What should I do?

Expand Down

This file was deleted.

This file was deleted.

@@ -0,0 +1 @@
org.apache.spark.h2o.ui.SparklingWaterHistoryListenerFactory
51 changes: 37 additions & 14 deletions core/src/main/scala/org/apache/spark/h2o/H2OContext.scala
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.h2o.backends.SparklingBackend
import org.apache.spark.h2o.backends.external.ExternalH2OBackend
import org.apache.spark.h2o.backends.internal.InternalH2OBackend
import org.apache.spark.h2o.converters._
import org.apache.spark.h2o.ui.SparklingWaterUITab
import org.apache.spark.h2o.ui._
import org.apache.spark.h2o.utils.{H2OContextUtils, LogUtil, NodeDesc}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
Expand Down Expand Up @@ -63,22 +63,15 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
self =>
val announcementService = AnnouncementServiceFactory.create(conf)
val sparkContext = sparkSession.sparkContext

val sparklingWaterListener = new SparklingWaterListener(sparkContext.conf)
val uiUpdateThread = new H2ORuntimeInfoUIThread(sparkContext, conf)
/** IP of H2O client */
private var localClientIp: String = _
/** REST port of H2O client */
private var localClientPort: Int = _
/** Runtime list of active H2O nodes */
private val h2oNodes = mutable.ArrayBuffer.empty[NodeDesc]
private var stopped = false;
/** Sparkling Water UI extension for Spark UI */
private val sparklingWaterTab: Option[SparklingWaterUITab] = {
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new SparklingWaterUITab(this))
} else {
None
}
}
private var stopped = false

/** Used backend */
val backend: SparklingBackend = if(conf.runsInExternalClusterMode){
Expand All @@ -105,7 +98,7 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
s" points to Spark of version ${sparkContext.version}. Please ensure correct Spark is provided and" +
s" re-run Sparkling Water.")
}

sparkContext.addSparkListener(sparklingWaterListener)
// Init the H2O Context in a way provided by used backend and return the list of H2O nodes in case of external
// backend or list of spark executors on which H2O runs in case of internal backend
val nodes = backend.init()
Expand All @@ -115,13 +108,42 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
localClientIp = sys.env.getOrElse("SPARK_PUBLIC_DNS", sparkContext.env.rpcEnv.address.host)
localClientPort = H2O.API_PORT
// Register UI
sparklingWaterTab.foreach(_.attach())
if (conf.getBoolean("spark.ui.enabled", true)) {
new SparklingWaterUITab(sparklingWaterListener, sparkContext.ui.get)
}
logInfo("Sparkling Water started, status of context: " + this)
// Announce Flow UI location
announcementService.announce(FlowLocationAnnouncement(H2O.ARGS.name, "http", localClientIp, localClientPort))
updateUIAfterStart() // updates the spark UI
uiUpdateThread.start() // start periodical updates of the UI
this
}

private[this] def updateUIAfterStart(): Unit ={
val h2oBuildInfo = H2OBuildInfo(
H2O.ABV.projectVersion(),
H2O.ABV.branchName(),
H2O.ABV.lastCommitHash(),
H2O.ABV.describe(),
H2O.ABV.compiledBy(),
H2O.ABV.compiledOn()
)
val h2oCloudInfo = H2OCloudInfo(
h2oLocalClient,
H2O.CLOUD.healthy(),
H2O.CLOUD.members().map(node => node.getIpPortString),
backend.backendUIInfo,
H2O.START_TIME_MILLIS.get()
)

val swPropertiesInfo = _conf.getAll.filter(_._1.startsWith("spark.ext.h2o"))

sparkSession.sparkContext.listenerBus.post(SparkListenerH2OStart(
h2oCloudInfo,
h2oBuildInfo,
swPropertiesInfo
))
}
/**
* Return a copy of this H2OContext's configuration. The configuration ''cannot'' be changed at runtime.
*/
Expand Down Expand Up @@ -221,6 +243,7 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
def stop(stopSparkContext: Boolean = false): Unit = synchronized {
if(!stopped) {
announcementService.shutdown
uiUpdateThread.interrupt()
backend.stop(stopSparkContext)
H2OContext.stop(this)
stopped = true
Expand Down Expand Up @@ -249,7 +272,7 @@ class H2OContext private (val sparkSession: SparkSession, conf: H2OConf) extends
| Open H2O Flow in browser: http://$h2oLocalClient (CMD + click in Mac OSX)
""".stripMargin
}

// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/** Define implicits available via h2oContext.implicits._*/
Expand Down
Expand Up @@ -64,6 +64,13 @@ trait SharedBackendConf {
def h2oNodeLogLevel = sparkConf.get(PROP_NODE_LOG_LEVEL._1, PROP_NODE_LOG_LEVEL._2)
def h2oNodeLogDir = sparkConf.getOption(PROP_NODE_LOG_DIR._1)

def uiUpdateInterval = sparkConf.getInt(PROP_UI_UPDATE_INTERVAL._1, PROP_UI_UPDATE_INTERVAL._2)

def setUiUpdateInterval(interval: Int): H2OConf = {
sparkConf.set(PROP_UI_UPDATE_INTERVAL._1, interval.toString)
self
}

def setH2ONodeLogLevel(level: String): H2OConf = {
sparkConf.set(PROP_NODE_LOG_LEVEL._1, level)
self
Expand Down Expand Up @@ -105,6 +112,8 @@ trait SharedBackendConf {

object SharedBackendConf {

/** Interval for updates of Spark UI in milliseconds */
val PROP_UI_UPDATE_INTERVAL = ("spark.ext.h2o.ui.update.interval", 10000)
/** H2O internal log level for launched remote nodes. */
val PROP_NODE_LOG_LEVEL = ("spark.ext.h2o.node.log.level", "INFO")

Expand Down
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.h2o.ui

import org.apache.spark.SparkContext
import org.apache.spark.h2o.H2OConf
import water.H2O

/**
* Periodically publish info to UI
*/
class H2ORuntimeInfoUIThread(sc: SparkContext, conf : H2OConf) extends Thread{
override def run(): Unit = {
while(!Thread.interrupted()){
sc.listenerBus.post(SparkListenerH2ORuntimeUpdate(H2O.CLOUD.healthy(),System.currentTimeMillis()))
Thread.sleep(conf.uiUpdateInterval)
}
}
}

0 comments on commit a97ed74

Please sign in to comment.