Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-2496] Fix the Flow link for Databricks Azure #2417

Merged
merged 2 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 19 additions & 4 deletions core/src/main/scala/ai/h2o/sparkling/H2OContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class H2OContext private[sparkling] (private val conf: H2OConf) extends H2OConte
cloudV3.compiled_on)
val h2oClusterInfo = H2OClusterInfo(
s"$flowIp:$flowPort",
visibleFlowURL,
cloudV3.cloud_healthy,
cloudV3.internal_security_enabled,
nodes.map(_.ipPort()),
Expand Down Expand Up @@ -270,15 +271,29 @@ class H2OContext private[sparkling] (private val conf: H2OConf) extends H2OConte
}

def flowURL(): String = {
if (AzureDatabricksUtils.isRunningOnAzureDatabricks(conf)) {
AzureDatabricksUtils.flowURL(conf)
} else if (conf.clientFlowBaseurlOverride.isDefined) {
if (conf.clientFlowBaseurlOverride.isDefined) {
conf.clientFlowBaseurlOverride.get + conf.contextPath.getOrElse("")
} else {
"%s://%s:%d%s".format(conf.getScheme(), flowIp, flowPort, conf.contextPath.getOrElse(""))
}
}

def visibleFlowURL(): String = {
if (AzureDatabricksUtils.isRunningOnAzureDatabricks(conf)) {
AzureDatabricksUtils.relativeFlowURL(conf)
} else {
flowURL()
}
}

private def getFlowUIHint() = {
if (AzureDatabricksUtils.isRunningOnAzureDatabricks(conf)) {
"Go to Spark UI > Sparkling Water tab > click Flow UI link"
} else {
s"${flowURL()} (CMD + click in Mac OSX)"
}
}

/** Open H2O Flow running in this client. */
def openFlow(): Unit = openURI(flowURL())

Expand All @@ -295,7 +310,7 @@ class H2OContext private[sparkling] (private val conf: H2OConf) extends H2OConte
| ${nodes.mkString("\n ")}
| ------------------------
|
| Open H2O Flow in browser: ${flowURL()} (CMD + click in Mac OSX)
| Open H2O Flow in browser: ${getFlowUIHint}
|
""".stripMargin
val sparkYarnAppId = if (sparkContext.master.toLowerCase.startsWith("yarn")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package ai.h2o.sparkling.backend.utils

import java.io.FileNotFoundException

import ai.h2o.sparkling.H2OConf
import ai.h2o.sparkling.backend.SharedBackendConf
import org.apache.spark.SparkConf
import org.apache.spark.expose.Logging

object AzureDatabricksUtils extends Logging {
Expand All @@ -43,34 +42,15 @@ object AzureDatabricksUtils extends Logging {
conf.clientCheckRetryTimeout
}

def isRunningOnAzureDatabricks(conf: H2OConf): Boolean = {
def isRunningOnAzureDatabricks(conf: H2OConf): Boolean = isRunningOnAzureDatabricks(conf.sparkConf)

def isRunningOnAzureDatabricks(conf: SparkConf): Boolean = {
conf.getOption("spark.databricks.cloudProvider").contains("Azure")
}

def flowURL(conf: H2OConf): String = {
def relativeFlowURL(conf: H2OConf): String = {
val clusterId = conf.get("spark.databricks.clusterUsageTags.clusterId")
val orgId = conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")
val azureHost = s"https://${azureRegion()}.azuredatabricks.net"
s"$azureHost/driver-proxy/o/$orgId/$clusterId/${conf.clientWebPort}/flow/index.html"
}

private def azureRegion(): String = {
val substRegion = "YOUR_AZURE_REGION"
val region = try {
val confFile = scala.io.Source.fromFile("/databricks/common/conf/deploy.conf")
confFile.getLines
.find(_.contains("databricks.region.name"))
.map(_.trim.split("=")(1).trim().replaceAll("\"", ""))
.getOrElse(substRegion)
} catch {
case e: FileNotFoundException => substRegion
}

if (region == substRegion) {
logWarning(
"Azure region could not be determined automatically, please replace " +
s"'$substRegion' in the provided flow URL with your region.")
}
region
s"/driver-proxy/o/$orgId/$clusterId/${conf.clientWebPort}/flow/index.html"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.h2o.ui

case class H2OClusterInfo(
localClientIpPort: String,
flowURL: String,
cloudHealthy: Boolean,
cloudSecured: Boolean,
cloudNodes: Array[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.h2o.ui

import ai.h2o.sparkling.backend.utils.AzureDatabricksUtils
import javax.servlet.http.HttpServletRequest
import org.apache.spark.h2o.SparkSpecificUtils
import org.apache.spark.ui.{UIUtils, WebUIPage}

import scala.xml.Node
import scala.xml.{Attribute, Node, Null, Text}

/**
* Sparkling Water info page.
Expand All @@ -41,7 +42,7 @@ case class SparklingWaterInfoPage(parent: SparklingWaterUITab) extends WebUIPage
("H2O Build On", h2oBuildInfo.h2oBuildOn))
}

private def flowUrl(): String = s"http://${provider.localIpPort}"
private def flowUrl(): String = parent.provider.H2OClusterInfo.flowURL

private def swProperties(): Seq[(String, String, String)] = provider.sparklingWaterProperties

Expand All @@ -58,7 +59,7 @@ case class SparklingWaterInfoPage(parent: SparklingWaterUITab) extends WebUIPage

val content = if (provider.isSparklingWaterStarted) {

val swInfoTable = UIUtils.listingTable(propertyHeader, h2oRow, swInfo(), fixedWidth = true)
val swInfoTable = UIUtils.listingTable(propertyHeader, h2oRowWithId, swInfo(), fixedWidth = true)
val swPropertiesTable =
UIUtils.listingTable(Seq("Name", "Value", "Documentation"), propertiesRow, swProperties(), fixedWidth = true)
val h2oInfoTable = UIUtils.listingTable(propertyHeader, h2oRow, h2oInfo(), fixedWidth = true)
Expand Down Expand Up @@ -88,18 +89,17 @@ case class SparklingWaterInfoPage(parent: SparklingWaterUITab) extends WebUIPage
<strong>Memory Info:</strong>{memoryInfo}
</li>
<li>
<a href={flowUrl()}>
<a id="Flow UI Link" target="_blank" href={flowUrl()}>
<strong>Flow UI</strong>
</a>
</li>
</ul>
</div>
<span>
<h4>Sparkling Water</h4>{swInfoTable}<h4>Sparkling Water Properties</h4>{swPropertiesTable}<h4>H2O Build Information</h4>{
h2oInfoTable
}
</span>

<span>
<h4>Sparkling Water</h4>{swInfoTable}<h4>Sparkling Water Properties</h4>{swPropertiesTable}<h4>H2O Build Information</h4>
{h2oInfoTable}
{additionalScript()}
</span>
} else {
<div>
<h4>Sparkling Water UI not ready yet!</h4>
Expand All @@ -108,6 +108,19 @@ case class SparklingWaterInfoPage(parent: SparklingWaterUITab) extends WebUIPage
SparkSpecificUtils.headerSparkPage(request, "Sparkling Water", content, parent, helpText)
}

private def additionalScript(): Seq[Node] = {
if (AzureDatabricksUtils.isRunningOnAzureDatabricks(parent.parent.conf)) {
val javaScript = scala.xml.Unparsed(
s"""document.getElementById("Flow UI").innerHTML = window.location.protocol + "//" + window.location.hostname + "${flowUrl()}";
|document.getElementById("Flow UI Link").href = "${flowUrl()}";""".stripMargin)
<script type="text/javascript">{
javaScript
}</script>
} else {
Seq.empty
}
}

private def propertyHeader = Seq("Name", "Value")

private def propertiesRow(kv: (String, String, String)) = <tr>
Expand All @@ -127,4 +140,12 @@ case class SparklingWaterInfoPage(parent: SparklingWaterUITab) extends WebUIPage
{kv._2}
</td>
</tr>

private def h2oRowWithId(kv: (String, String)) = <tr>
<td>
{kv._1}
</td> {<td>
{kv._2}
</td> % Attribute(None, "id", Text(kv._1), Null)}
</tr>
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.h2o.ui.{AppStatusListener, AppStatusStore, CrossSparkUtils, SparklingWaterUITab}
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.ui.{SparkUITab, UIUtils}
import java.lang.Boolean._

import scala.xml.Node

Expand All @@ -32,7 +33,17 @@ object SparkSpecificUtils extends CrossSparkUtils {
content: => Seq[Node],
activeTab: SparkUITab,
helpText: String): Seq[Node] = {
UIUtils.headerSparkPage(request, "Sparkling Water", content, activeTab, helpText = Some(helpText))
val method = UIUtils.getClass.getMethods.find(m => m.getName == "headerSparkPage").get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is obviously fragile and can break if an overloaded method is added to spark.ui

But I guess that you can live with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the method doesn't exist the Spark UI shows an exception if Sparkling Water tab is clicked. For Apache Spark we know that there is such a method. This could if a user deploys SW to a obscure platform with a custom build of Spark which we haven't tested. IMHO, we can live with that.

val arguments = Seq[AnyRef](request, title, () => content, activeTab, Some(helpText), FALSE, FALSE)
val result = if (arguments.length == method.getParameterCount) {
method.invoke(UIUtils, arguments: _*)
} else if (arguments.length + 1 == method.getParameterCount) {
method.invoke(UIUtils, (arguments ++ Seq[AnyRef](FALSE)): _*)
} else {
throw new RuntimeException(
s"UIUtils.headerSparkPage has ${method.getParameterCount} parameters which is unexpected!")
}
result.asInstanceOf[Seq[Node]]
}

override def addSparklingWaterTab(sc: SparkContext): Unit = {
Expand Down