Skip to content

Commit

Permalink
[SW-1787] Perform version checks via rest api (#1713)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubhava committed Dec 17, 2019
1 parent e59ed74 commit 0088af6
Showing 1 changed file with 13 additions and 46 deletions.
Expand Up @@ -20,17 +20,16 @@ package org.apache.spark.h2o.backends.external

import java.io.{File, FileInputStream}
import java.util.Properties
import java.util.jar.JarFile

import org.apache.spark.expose.Logging
import org.apache.spark.h2o.backends.{SharedBackendConf, SparklingBackend}
import org.apache.spark.h2o.utils.NodeDesc
import org.apache.spark.h2o.{BuildInfo, H2OConf, H2OContext}
import org.apache.spark.expose.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkEnv, SparkFiles}
import water.api.RestAPIManager
import water.init.{AbstractBuildVersion, NetworkUtils}
import water.{H2O, H2OStarter, MRTask}
import water.init.NetworkUtils
import water.{H2O, H2OStarter}

import scala.io.Source
import scala.util.control.NoStackTrace
Expand Down Expand Up @@ -179,11 +178,6 @@ class ExternalH2OBackend(val hc: H2OContext) extends SparklingBackend with Loggi

override def init(conf: H2OConf): Array[NodeDesc] = {
if (conf.isAutoClusterStartUsed) {
if (!isRestApiBasedClient(hc) && !conf.isBackendVersionCheckDisabled()) {
// For automatic mode we can check the driver version early
verifyVersionFromDriverJAR(conf.h2oDriverPath.get)
}
// start h2o instances on yarn
logInfo("Starting the external H2O cluster on YARN.")
val ipPort = launchH2OOnYarn(conf)
conf.setH2OCluster(ipPort)
Expand All @@ -209,14 +203,14 @@ class ExternalH2OBackend(val hc: H2OContext) extends SparklingBackend with Loggi
lockCloud(conf)
val nodes = getNodes(conf)
verifyWebOpen(nodes, conf)
if (!conf.isBackendVersionCheckDisabled()) {
verifyVersionFromRestCall(nodes)
}
val leaderIpPort = getLeaderNode(conf).ipPort()
if (conf.h2oCluster.get != leaderIpPort){
logInfo(s"Updating spark.ext.h2o.cloud.representative to H2O's leader node $leaderIpPort")
conf.setH2OCluster(leaderIpPort)
}
if (!conf.isBackendVersionCheckDisabled()) {
verifyVersionFromRestCall(nodes)
}
nodes
} catch {
case cause: RestApiException =>
Expand All @@ -236,17 +230,13 @@ class ExternalH2OBackend(val hc: H2OContext) extends SparklingBackend with Loggi

val expectedSize = conf.clusterSize.get.toInt
val discoveredSize = ExternalH2OBackend.waitForCloudSize(expectedSize, clusterBuildTimeout)
if (conf.isManualClusterStartUsed && !conf.isBackendVersionCheckDisabled()) {
verifyVersionFromRuntime()
}
if (discoveredSize < expectedSize) {
if (conf.isAutoClusterStartUsed) {
logError(s"Exiting! External H2O cluster was of size $discoveredSize but expected was $expectedSize!!")
H2O.shutdown(-1)
}
throw new RuntimeException("Cloud size " + discoveredSize + " under " + expectedSize);
}

// Register web API for client
RestAPIManager(hc).registerAll()
H2O.startServingRestApi()
Expand All @@ -273,6 +263,9 @@ class ExternalH2OBackend(val hc: H2OContext) extends SparklingBackend with Loggi
}
}

if (!conf.isBackendVersionCheckDisabled()) {
verifyVersionFromRestCall(cloudMembers)
}
cloudMembers
}
nodes
Expand All @@ -298,47 +291,21 @@ class ExternalH2OBackend(val hc: H2OContext) extends SparklingBackend with Loggi
""
}

private def verifyVersionFromRestCall(nodes: Array[NodeDesc]) = {
private def verifyVersionFromRestCall(nodes: Array[NodeDesc]): Unit = {
val referencedVersion = BuildInfo.H2OVersion
for (node <- nodes) {
val externalVersion = getCloudInfoFromNode(node, hc.getConf).version
if (referencedVersion != externalVersion) {
if (hc.getConf.isAutoClusterStartUsed) {
stopExternalH2OCluster()
}
throw new RuntimeException(
s"""The external H2O node ${node.ipPort()} is of version $externalVersion but Sparkling Water
|is using version of H2O $referencedVersion. Please make sure to use the corresponding assembly H2O JAR.""".stripMargin)
}
}
}

private def verifyVersionFromDriverJAR(driverPath: String): Unit = {
val clientVersion = BuildInfo.H2OVersion
val jarFile = new JarFile(driverPath)
val entry = jarFile.getJarEntry("h2o.version")
val is = jarFile.getInputStream(entry)
val externalVersion = scala.io.Source.fromInputStream(is).mkString
jarFile.close()
throwWrongVersionException(clientVersion, externalVersion, Some(driverPath))
}

private def verifyVersionFromRuntime(): Unit = {
val clientVersion = BuildInfo.H2OVersion
new MRTask() {
override def setupLocal(): Unit = {
val externalVersion = AbstractBuildVersion.getBuildVersion.projectVersion()
throwWrongVersionException(clientVersion, externalVersion)
}
}.doAllNodes()
}

private def throwWrongVersionException(clientVersion: String, externalVersion: String, driverPath: Option[String] = None): Unit = {
val driverPathStr = if (driverPath.isDefined) s"(=$driverPath)" else ""
if (clientVersion != externalVersion) {
throw new RuntimeException(
s"""
|The external H2O cluster$driverPathStr is of version $externalVersion but Sparkling Water
|is using version of H2O $clientVersion. Please make sure to use the corresponding extended H2O JAR.""".stripMargin)
}
}
}

object ExternalH2OBackend extends ExternalBackendUtils {
Expand Down

0 comments on commit 0088af6

Please sign in to comment.