Skip to content

Commit

Permalink
[SW-1219] Remove suport for testing external cluster in manual mode (#…
Browse files Browse the repository at this point in the history
…1146)

(cherry picked from commit 26a272e)
  • Loading branch information
jakubhava committed Apr 18, 2019
1 parent aa2a65e commit e37e2ec
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 173 deletions.
Expand Up @@ -17,7 +17,8 @@
package water.sparkling.itest.local

import org.apache.spark.SparkContext
import org.apache.spark.h2o.utils.{H2OContextTestHelper, SparkTestContext}
import org.apache.spark.h2o.utils.SparkTestContext
import org.apache.spark.h2o.{H2OConf, H2OContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
Expand All @@ -37,10 +38,10 @@ class H2OContextLocalClusterSuite extends FunSuite
val conf = defaultSparkConf.setJars(swassembly :: Nil)
sc = new SparkContext("local", "test-local-cluster", conf)

val hc = H2OContextTestHelper.createH2OContext(sc, 1)
val hc = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1))
assert(water.H2O.CLOUD.members().length == 1, "H2O cloud should have 1 member")

H2OContextTestHelper.stopH2OContext(sc, hc)
hc.stop()
// Does not reset
resetSparkContext()
}
Expand Down
Expand Up @@ -17,7 +17,8 @@
package water.sparkling.itest.local

import org.apache.spark.SparkContext
import org.apache.spark.h2o.utils.{H2OContextTestHelper, SparkTestContext}
import org.apache.spark.h2o.utils.SparkTestContext
import org.apache.spark.h2o.{H2OConf, H2OContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
Expand All @@ -33,8 +34,8 @@ class H2OContextLocalSuite extends FunSuite

test("verify H2O cloud building on local JVM") {
sc = new SparkContext("local[*]", "test-local", defaultSparkConf)
val hc = H2OContextTestHelper.createH2OContext(sc, 1)

val hc = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1))

// Number of nodes should be on
assert(water.H2O.CLOUD.members().length == 1, "H2O cloud should have 1 members")
Expand All @@ -45,8 +46,7 @@ class H2OContextLocalSuite extends FunSuite
DKV.put(Key.make(), icedInt)
assert(water.H2O.store_size() == 1)

// stop h2o cloud in case of external cluster mode
H2OContextTestHelper.stopH2OContext(sc, hc)
hc.stop()
// Reset this context
resetSparkContext()
}
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.h2o.utils
import java.security.Permission

import org.apache.spark.SparkContext
import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.{H2OConf, H2OContext}
import org.scalatest.Suite

/**
Expand All @@ -37,7 +37,7 @@ trait SharedH2OTestContext extends SparkTestContext {
override def beforeAll() {
super.beforeAll()
sc = createSparkContext
hc = H2OContextTestHelper.createH2OContext(sc, 1)
hc = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1))
}

override def afterAll() {
Expand All @@ -49,7 +49,7 @@ trait SharedH2OTestContext extends SparkTestContext {
try {
val securityManager = new NoExitCheckSecurityManager
System.setSecurityManager(securityManager)
H2OContextTestHelper.stopH2OContext(sc, hc)
hc.stop()
hc = null
resetSparkContext()
super.afterAll()
Expand Down
Expand Up @@ -61,7 +61,7 @@ trait SparkTestContext extends BeforeAndAfterEach with BeforeAndAfterAll {
.set("spark.scheduler.minRegisteredResourcesRatio", "1")
.set("spark.ext.h2o.backend.cluster.mode", sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal"))
.set("spark.ext.h2o.client.ip", sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress))
.set("spark.ext.h2o.external.start.mode", sys.props.getOrElse("spark.ext.h2o.external.start.mode", "manual"))
.set("spark.ext.h2o.external.start.mode", "auto")
conf
})
}
Expand Down
Expand Up @@ -2,8 +2,6 @@ package water.sparkling.itest

import org.apache.spark.h2o.backends.SharedBackendConf
import org.apache.spark.h2o.backends.external.ExternalBackendConf
import org.apache.spark.h2o.utils.H2OContextTestHelper
import org.apache.spark.h2o.utils.H2OContextTestHelper._
import org.scalatest.{BeforeAndAfterEach, Suite, Tag}
import water.init.NetworkInit

Expand Down Expand Up @@ -46,7 +44,7 @@ trait IntegTestHelper extends BeforeAndAfterEach {
// Need to disable timeline service which requires Jersey libraries v1, but which are not available in Spark2.0
// See: https://www.hackingnote.com/en/spark/trouble-shooting/NoClassDefFoundError-ClientConfig/
Seq("--conf", "spark.hadoop.yarn.timeline-service.enabled=false") ++
Seq("--conf", s"spark.ext.h2o.external.start.mode=${sys.props.getOrElse("spark.ext.h2o.external.start.mode", "manual")}") ++
Seq("--conf", s"spark.ext.h2o.external.start.mode=auto") ++
Seq("--conf", s"spark.ext.h2o.backend.cluster.mode=${sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal")}") ++
env.sparkConf.flatMap(p => Seq("--conf", s"${p._1}=${p._2}")) ++
Seq[String](env.itestJar)
Expand All @@ -71,25 +69,16 @@ trait IntegTestHelper extends BeforeAndAfterEach {
override protected def beforeEach(): Unit = {
super.beforeEach()
testEnv = new TestEnvironment
val cloudName = H2OContextTestHelper.uniqueCloudName("integ-tests")
testEnv.sparkConf += SharedBackendConf.PROP_CLOUD_NAME._1 -> cloudName

testEnv.sparkConf += SharedBackendConf.PROP_CLIENT_IP._1 ->
sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress)

val cloudSize = 1
testEnv.sparkConf += ExternalBackendConf.PROP_EXTERNAL_H2O_NODES._1 -> cloudSize.toString
if (isExternalClusterUsed() && isManualClusterStartModeUsed()) {
testEnv.sparkConf += SharedBackendConf.PROP_BACKEND_CLUSTER_MODE._1 -> "external"
startExternalH2OCloud(cloudSize, cloudName, testEnv.sparkConf("spark.ext.h2o.client.ip"), testEnv.assemblyJar)
}
}

override protected def afterEach(): Unit = {
testEnv = null
if (isExternalClusterUsed() && isManualClusterStartModeUsed()) {
stopExternalH2OCloud()
}
super.afterEach()
}

Expand Down
Expand Up @@ -6,7 +6,6 @@ import org.apache.spark.h2o.FunSuiteWithLogging
import org.apache.spark.h2o.backends.SharedBackendConf
import org.apache.spark.h2o.backends.SharedBackendConf._
import org.apache.spark.h2o.backends.external.ExternalBackendConf
import org.apache.spark.h2o.utils.H2OContextTestHelper._
import org.apache.spark.repl.h2o.{CodeResults, H2OInterpreter}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, Suite}
Expand All @@ -26,28 +25,19 @@ trait ScriptsTestHelper extends FunSuiteWithLogging with BeforeAndAfterAll{
fail("The variable 'sparkling.assembly.jar' is not set! It should point to assembly jar file."))

override protected def beforeAll(): Unit = {
val cloudName = uniqueCloudName("scripts-tests")
sparkConf.set(PROP_CLOUD_NAME._1, cloudName)
sparkConf.set(PROP_CLIENT_IP._1, sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress))


sparkConf.set(SharedBackendConf.PROP_CLIENT_IP._1,
sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress))

val cloudSize = 1
sparkConf.set(ExternalBackendConf.PROP_EXTERNAL_H2O_NODES._1, cloudSize.toString)

if(isExternalClusterUsed(sparkConf) && isManualClusterStartModeUsed(sparkConf)){
startExternalH2OCloud(cloudSize, cloudName, sparkConf.get("spark.ext.h2o.client.ip"), assemblyJar)
}
sc = new SparkContext(org.apache.spark.h2o.H2OConf.checkSparkConf(sparkConf))
super.beforeAll()
}

override protected def afterAll(): Unit = {
if(isExternalClusterUsed(sparkConf) && isManualClusterStartModeUsed(sparkConf)) {
stopExternalH2OCloud()
}

if (sc != null){
sc.stop()
Expand All @@ -66,7 +56,7 @@ trait ScriptsTestHelper extends FunSuiteWithLogging with BeforeAndAfterAll{
.set("spark.network.timeout", "360s") // Increase network timeout if jenkins machines are busy
.set("spark.worker.timeout", "360") // Increase worker timeout if jenkins machines are busy
.set("spark.ext.h2o.backend.cluster.mode", sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal"))
.set("spark.ext.h2o.external.start.mode", sys.props.getOrElse("spark.ext.h2o.external.start.mode", "manual"))
.set("spark.ext.h2o.external.start.mode", "auto")
.set("spark.ext.h2o.hadoop.memory", "3G")
.setJars(Array(assemblyJar))
conf
Expand Down
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.ml.spark.models
import java.io.{File, PrintWriter}
import java.util.Locale

import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.utils.{H2OContextTestHelper, SparkTestContext}
import org.apache.spark.h2o.utils.SparkTestContext
import org.apache.spark.h2o.{H2OConf, H2OContext}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.h2o.algos.H2OGBM
import org.apache.spark.ml.h2o.features.ColumnPruner
Expand Down Expand Up @@ -62,7 +62,8 @@ abstract class PipelinePredictionTestBase extends FunSuite with SparkTestContext
}

def trainedPipelineModel(spark: SparkSession): PipelineModel = {
implicit val hc: H2OContext = H2OContextTestHelper.createH2OContext(spark.sparkContext, 1)
implicit val hc: H2OContext = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1))

implicit val sqlContext: SQLContext = spark.sqlContext
/**
* Define the pipeline stages
Expand Down Expand Up @@ -113,7 +114,7 @@ abstract class PipelinePredictionTestBase extends FunSuite with SparkTestContext
val data = load(spark.sparkContext, "smsData.txt")
val model = pipeline.fit(data)

H2OContextTestHelper.stopH2OContext(spark.sparkContext, hc)
hc.stop()
// return the trained model
model
}
Expand Down

0 comments on commit e37e2ec

Please sign in to comment.