From 26a272e2b5cc8db030e5fb95ba817d2df144f554 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20H=C3=A1va?= Date: Thu, 18 Apr 2019 17:29:57 +0200 Subject: [PATCH] [SW-1219] Remove suport for testing external cluster in manual mode (#1146) --- .../local/H2OContextLocalClusterSuite.scala | 7 +- .../itest/local/H2OContextLocalSuite.scala | 10 +-- .../ExternalBackendManualTestStarter.scala | 69 ------------------- .../h2o/utils/H2OContextTestHelper.scala | 65 ----------------- .../h2o/utils/SharedH2OTestContext.scala | 6 +- .../spark/h2o/utils/SparkTestContext.scala | 2 +- .../sparkling/itest/IntegTestHelper.scala | 13 +--- .../sparkling/scripts/ScriptTestHelper.scala | 12 +--- .../spark/models/PipelinePredictionTest.scala | 9 +-- 9 files changed, 20 insertions(+), 173 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/h2o/utils/ExternalBackendManualTestStarter.scala delete mode 100644 core/src/test/scala/org/apache/spark/h2o/utils/H2OContextTestHelper.scala diff --git a/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalClusterSuite.scala b/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalClusterSuite.scala index dcf8f0ba86..deebefdff9 100644 --- a/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalClusterSuite.scala +++ b/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalClusterSuite.scala @@ -17,7 +17,8 @@ package water.sparkling.itest.local import org.apache.spark.SparkContext -import org.apache.spark.h2o.utils.{H2OContextTestHelper, SparkTestContext} +import org.apache.spark.h2o.utils.SparkTestContext +import org.apache.spark.h2o.{H2OConf, H2OContext} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} @@ -37,10 +38,10 @@ class H2OContextLocalClusterSuite extends FunSuite val conf = defaultSparkConf.setJars(swassembly :: Nil) sc = new SparkContext("local", "test-local-cluster", conf) - val hc = H2OContextTestHelper.createH2OContext(sc, 1) + val hc = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1)) assert(water.H2O.CLOUD.members().length == 1, "H2O cloud should have 1 member") - H2OContextTestHelper.stopH2OContext(sc, hc) + hc.stop() // Does not reset resetSparkContext() } diff --git a/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalSuite.scala b/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalSuite.scala index dbf1ef2433..f891049ca6 100644 --- a/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalSuite.scala +++ b/core/src/integTest/scala/water/sparkling/itest/local/H2OContextLocalSuite.scala @@ -17,7 +17,8 @@ package water.sparkling.itest.local import org.apache.spark.SparkContext -import org.apache.spark.h2o.utils.{H2OContextTestHelper, SparkTestContext} +import org.apache.spark.h2o.utils.SparkTestContext +import org.apache.spark.h2o.{H2OConf, H2OContext} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} @@ -33,8 +34,8 @@ class H2OContextLocalSuite extends FunSuite test("verify H2O cloud building on local JVM") { sc = new SparkContext("local[*]", "test-local", defaultSparkConf) - - val hc = H2OContextTestHelper.createH2OContext(sc, 1) + + val hc = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1)) // Number of nodes should be on assert(water.H2O.CLOUD.members().length == 1, "H2O cloud should have 1 members") @@ -45,8 +46,7 @@ class H2OContextLocalSuite extends FunSuite DKV.put(Key.make(), icedInt) assert(water.H2O.store_size() == 1) - // stop h2o cloud in case of external cluster mode - H2OContextTestHelper.stopH2OContext(sc, hc) + hc.stop() // Reset this context resetSparkContext() } diff --git a/core/src/test/scala/org/apache/spark/h2o/utils/ExternalBackendManualTestStarter.scala b/core/src/test/scala/org/apache/spark/h2o/utils/ExternalBackendManualTestStarter.scala deleted file mode 100644 index 92cc9b2ae6..0000000000 --- a/core/src/test/scala/org/apache/spark/h2o/utils/ExternalBackendManualTestStarter.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.h2o.utils - -import org.apache.spark.SparkConf -import org.apache.spark.h2o.backends.SharedBackendConf._ - -import scala.sys.process.Process -import scala.util.Random - -/** - * Used to start H2O nodes from scala code - */ -trait ExternalBackendManualTestStarter { - @transient var nodeProcesses: Seq[Process] = _ - - lazy val swJar = sys.props.getOrElse("sparkling.assembly.jar", sys.env.getOrElse("sparkling.assembly.jar", - throw new IllegalArgumentException("sparkling.assembly.jar environment variable is not set! It should point to the location of sparkling-water" + - " assembly JAR"))) - - lazy val h2oExtendedJar = sys.props.getOrElse("H2O_EXTENDED_JAR", sys.env.getOrElse("H2O_EXTENDED_JAR", - throw new IllegalArgumentException("H2O_EXTENDED_JAR environment variable is not set! It should point to the location of H2O assembly jar file"))) - - lazy val clusterStartTimeout = sys.props.getOrElse("cluster.start.timeout", sys.env.getOrElse("cluster.start.timeout", "6000")).toInt - - def uniqueCloudName(customPart: String) = s"sparkling-water-$customPart-${Random.nextInt()}" - - private def launchSingleExternalH2ONode(cloudName: String, ip: String, additionalCp: String*): Process = { - // Since some tests requires additional classes to be present at H2O classpath we add them here - // instead of extending h2o jar by another classes - // The best solution would be to implement distributed classloading for H2O - val jarList = List(h2oExtendedJar) ++ additionalCp.toList - val cmdToLaunch = Seq[String]("java", "-ea", "-cp", jarList.mkString(":"), "water.H2OApp", "-name", cloudName, "-ip", ip) - Process(cmdToLaunch).run() - } - - def startExternalH2OCloud(cloudSize: Int, cloudName: String, ip: String, additionalCp: String*): Unit = { - // do not start h2o nodes if this property is set, they will be started on yarn automatically - nodeProcesses = (1 to cloudSize).map { _ => launchSingleExternalH2ONode(cloudName, ip, additionalCp: _*) } - // Wait to ensure that h2o nodes are created earlier than h2o client - Thread.sleep(clusterStartTimeout) - } - - def startExternalH2OCloud(cloudSize: Int, sparkConf: SparkConf, additionalCp: String*): Unit = { - startExternalH2OCloud(cloudSize, sparkConf.get(PROP_CLOUD_NAME._1), sparkConf.get(PROP_CLIENT_IP._1), additionalCp: _*) - } - - def stopExternalH2OCloud(): Unit = { - if (nodeProcesses != null) { - nodeProcesses.foreach(_.destroy()) - nodeProcesses = null - } - } -} diff --git a/core/src/test/scala/org/apache/spark/h2o/utils/H2OContextTestHelper.scala b/core/src/test/scala/org/apache/spark/h2o/utils/H2OContextTestHelper.scala deleted file mode 100644 index cfbe98c8ea..0000000000 --- a/core/src/test/scala/org/apache/spark/h2o/utils/H2OContextTestHelper.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.h2o.utils - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.h2o.{H2OConf, H2OContext} -import org.apache.spark.h2o.backends.SharedBackendConf.PROP_BACKEND_CLUSTER_MODE -import org.apache.spark.h2o.backends.external.ExternalBackendConf - -object H2OContextTestHelper extends ExternalBackendManualTestStarter { - - def isManualClusterStartModeUsed(conf: Option[SparkConf] = None): Boolean = { - if (conf.isDefined) { - conf.get.getOption(ExternalBackendConf.PROP_EXTERNAL_CLUSTER_START_MODE._1).getOrElse(ExternalBackendConf.PROP_EXTERNAL_CLUSTER_START_MODE._2) == "manual" - } else { - sys.props.getOrElse(ExternalBackendConf.PROP_EXTERNAL_CLUSTER_START_MODE._1, ExternalBackendConf.PROP_EXTERNAL_CLUSTER_START_MODE._1) == "manual" - } - } - - def isManualClusterStartModeUsed(conf: SparkConf): Boolean = isManualClusterStartModeUsed() - - def isAutoClusterStartModeUsed(conf: Option[SparkConf] = None): Boolean = !isManualClusterStartModeUsed(conf) - def isAutoClusterStartModeUsed(): Boolean = !isManualClusterStartModeUsed() - - def isExternalClusterUsed(conf: Option[SparkConf] = None): Boolean = { - if (conf.isDefined) { - conf.get.getOption(PROP_BACKEND_CLUSTER_MODE._1).getOrElse(PROP_BACKEND_CLUSTER_MODE._2) == "external" - } else { - sys.props.getOrElse(PROP_BACKEND_CLUSTER_MODE._1, PROP_BACKEND_CLUSTER_MODE._2) == "external" - } - } - - def isExternalClusterUsed(conf: SparkConf): Boolean = isExternalClusterUsed(Option(conf)) - - - def createH2OContext(sc: SparkContext, numH2ONodes: Int): H2OContext = { - val h2oConf = new H2OConf(sc) - if (isExternalClusterUsed(sc.getConf) && isManualClusterStartModeUsed(sc.getConf)) { - startExternalH2OCloud(numH2ONodes, sc.getConf) - } - H2OContext.getOrCreate(sc, h2oConf.setNumOfExternalH2ONodes(numH2ONodes)) - } - - def stopH2OContext(sc: SparkContext, hc: H2OContext): Unit = { - if (isExternalClusterUsed(sc.getConf) && isManualClusterStartModeUsed(sc.getConf)) { - stopExternalH2OCloud() - } - hc.stop() - } -} diff --git a/core/src/test/scala/org/apache/spark/h2o/utils/SharedH2OTestContext.scala b/core/src/test/scala/org/apache/spark/h2o/utils/SharedH2OTestContext.scala index c9e5e49dde..fbfdfff5e3 100644 --- a/core/src/test/scala/org/apache/spark/h2o/utils/SharedH2OTestContext.scala +++ b/core/src/test/scala/org/apache/spark/h2o/utils/SharedH2OTestContext.scala @@ -20,7 +20,7 @@ package org.apache.spark.h2o.utils import java.security.Permission import org.apache.spark.SparkContext -import org.apache.spark.h2o.H2OContext +import org.apache.spark.h2o.{H2OConf, H2OContext} import org.scalatest.Suite /** @@ -37,7 +37,7 @@ trait SharedH2OTestContext extends SparkTestContext { override def beforeAll() { super.beforeAll() sc = createSparkContext - hc = H2OContextTestHelper.createH2OContext(sc, 1) + hc = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1)) } override def afterAll() { @@ -49,7 +49,7 @@ trait SharedH2OTestContext extends SparkTestContext { try { val securityManager = new NoExitCheckSecurityManager System.setSecurityManager(securityManager) - H2OContextTestHelper.stopH2OContext(sc, hc) + hc.stop() hc = null resetSparkContext() super.afterAll() diff --git a/core/src/test/scala/org/apache/spark/h2o/utils/SparkTestContext.scala b/core/src/test/scala/org/apache/spark/h2o/utils/SparkTestContext.scala index 3419da1ea2..fcba4e5f31 100644 --- a/core/src/test/scala/org/apache/spark/h2o/utils/SparkTestContext.scala +++ b/core/src/test/scala/org/apache/spark/h2o/utils/SparkTestContext.scala @@ -61,7 +61,7 @@ trait SparkTestContext extends BeforeAndAfterEach with BeforeAndAfterAll { .set("spark.scheduler.minRegisteredResourcesRatio", "1") .set("spark.ext.h2o.backend.cluster.mode", sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal")) .set("spark.ext.h2o.client.ip", sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress)) - .set("spark.ext.h2o.external.start.mode", sys.props.getOrElse("spark.ext.h2o.external.start.mode", "manual")) + .set("spark.ext.h2o.external.start.mode", "auto") conf }) } diff --git a/examples/src/integTest/scala/water/sparkling/itest/IntegTestHelper.scala b/examples/src/integTest/scala/water/sparkling/itest/IntegTestHelper.scala index 582819d35a..35e11c5707 100644 --- a/examples/src/integTest/scala/water/sparkling/itest/IntegTestHelper.scala +++ b/examples/src/integTest/scala/water/sparkling/itest/IntegTestHelper.scala @@ -2,8 +2,6 @@ package water.sparkling.itest import org.apache.spark.h2o.backends.SharedBackendConf import org.apache.spark.h2o.backends.external.ExternalBackendConf -import org.apache.spark.h2o.utils.H2OContextTestHelper -import org.apache.spark.h2o.utils.H2OContextTestHelper._ import org.scalatest.{BeforeAndAfterEach, Suite, Tag} import water.init.NetworkInit @@ -46,7 +44,7 @@ trait IntegTestHelper extends BeforeAndAfterEach { // Need to disable timeline service which requires Jersey libraries v1, but which are not available in Spark2.0 // See: https://www.hackingnote.com/en/spark/trouble-shooting/NoClassDefFoundError-ClientConfig/ Seq("--conf", "spark.hadoop.yarn.timeline-service.enabled=false") ++ - Seq("--conf", s"spark.ext.h2o.external.start.mode=${sys.props.getOrElse("spark.ext.h2o.external.start.mode", "manual")}") ++ + Seq("--conf", s"spark.ext.h2o.external.start.mode=auto") ++ Seq("--conf", s"spark.ext.h2o.backend.cluster.mode=${sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal")}") ++ env.sparkConf.flatMap(p => Seq("--conf", s"${p._1}=${p._2}")) ++ Seq[String](env.itestJar) @@ -71,25 +69,16 @@ trait IntegTestHelper extends BeforeAndAfterEach { override protected def beforeEach(): Unit = { super.beforeEach() testEnv = new TestEnvironment - val cloudName = H2OContextTestHelper.uniqueCloudName("integ-tests") - testEnv.sparkConf += SharedBackendConf.PROP_CLOUD_NAME._1 -> cloudName testEnv.sparkConf += SharedBackendConf.PROP_CLIENT_IP._1 -> sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress) val cloudSize = 1 testEnv.sparkConf += ExternalBackendConf.PROP_EXTERNAL_H2O_NODES._1 -> cloudSize.toString - if (isExternalClusterUsed() && isManualClusterStartModeUsed()) { - testEnv.sparkConf += SharedBackendConf.PROP_BACKEND_CLUSTER_MODE._1 -> "external" - startExternalH2OCloud(cloudSize, cloudName, testEnv.sparkConf("spark.ext.h2o.client.ip"), testEnv.assemblyJar) - } } override protected def afterEach(): Unit = { testEnv = null - if (isExternalClusterUsed() && isManualClusterStartModeUsed()) { - stopExternalH2OCloud() - } super.afterEach() } diff --git a/examples/src/scriptsTest/scala/water/sparkling/scripts/ScriptTestHelper.scala b/examples/src/scriptsTest/scala/water/sparkling/scripts/ScriptTestHelper.scala index 91299a84a8..f85b3fb360 100644 --- a/examples/src/scriptsTest/scala/water/sparkling/scripts/ScriptTestHelper.scala +++ b/examples/src/scriptsTest/scala/water/sparkling/scripts/ScriptTestHelper.scala @@ -6,7 +6,6 @@ import org.apache.spark.h2o.FunSuiteWithLogging import org.apache.spark.h2o.backends.SharedBackendConf import org.apache.spark.h2o.backends.SharedBackendConf._ import org.apache.spark.h2o.backends.external.ExternalBackendConf -import org.apache.spark.h2o.utils.H2OContextTestHelper._ import org.apache.spark.repl.h2o.{CodeResults, H2OInterpreter} import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, Suite} @@ -26,28 +25,19 @@ trait ScriptsTestHelper extends FunSuiteWithLogging with BeforeAndAfterAll{ fail("The variable 'sparkling.assembly.jar' is not set! It should point to assembly jar file.")) override protected def beforeAll(): Unit = { - val cloudName = uniqueCloudName("scripts-tests") - sparkConf.set(PROP_CLOUD_NAME._1, cloudName) sparkConf.set(PROP_CLIENT_IP._1, sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress)) - sparkConf.set(SharedBackendConf.PROP_CLIENT_IP._1, sys.props.getOrElse("H2O_CLIENT_IP", NetworkInit.findInetAddressForSelf().getHostAddress)) val cloudSize = 1 sparkConf.set(ExternalBackendConf.PROP_EXTERNAL_H2O_NODES._1, cloudSize.toString) - if(isExternalClusterUsed(sparkConf) && isManualClusterStartModeUsed(sparkConf)){ - startExternalH2OCloud(cloudSize, cloudName, sparkConf.get("spark.ext.h2o.client.ip"), assemblyJar) - } sc = new SparkContext(org.apache.spark.h2o.H2OConf.checkSparkConf(sparkConf)) super.beforeAll() } override protected def afterAll(): Unit = { - if(isExternalClusterUsed(sparkConf) && isManualClusterStartModeUsed(sparkConf)) { - stopExternalH2OCloud() - } if (sc != null){ sc.stop() @@ -66,7 +56,7 @@ trait ScriptsTestHelper extends FunSuiteWithLogging with BeforeAndAfterAll{ .set("spark.network.timeout", "360s") // Increase network timeout if jenkins machines are busy .set("spark.worker.timeout", "360") // Increase worker timeout if jenkins machines are busy .set("spark.ext.h2o.backend.cluster.mode", sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal")) - .set("spark.ext.h2o.external.start.mode", sys.props.getOrElse("spark.ext.h2o.external.start.mode", "manual")) + .set("spark.ext.h2o.external.start.mode", "auto") .set("spark.ext.h2o.hadoop.memory", "3G") .setJars(Array(assemblyJar)) conf diff --git a/ml/src/test/scala/org/apache/spark/ml/spark/models/PipelinePredictionTest.scala b/ml/src/test/scala/org/apache/spark/ml/spark/models/PipelinePredictionTest.scala index e35442aeb5..88c63ecc3e 100644 --- a/ml/src/test/scala/org/apache/spark/ml/spark/models/PipelinePredictionTest.scala +++ b/ml/src/test/scala/org/apache/spark/ml/spark/models/PipelinePredictionTest.scala @@ -20,8 +20,8 @@ package org.apache.spark.ml.spark.models import java.io.{File, PrintWriter} import java.util.Locale -import org.apache.spark.h2o.H2OContext -import org.apache.spark.h2o.utils.{H2OContextTestHelper, SparkTestContext} +import org.apache.spark.h2o.utils.SparkTestContext +import org.apache.spark.h2o.{H2OConf, H2OContext} import org.apache.spark.ml.feature._ import org.apache.spark.ml.h2o.algos.H2OGBM import org.apache.spark.ml.h2o.features.ColumnPruner @@ -62,7 +62,8 @@ abstract class PipelinePredictionTestBase extends FunSuite with SparkTestContext } def trainedPipelineModel(spark: SparkSession): PipelineModel = { - implicit val hc: H2OContext = H2OContextTestHelper.createH2OContext(spark.sparkContext, 1) + implicit val hc: H2OContext = H2OContext.getOrCreate(sc, new H2OConf(spark).setNumOfExternalH2ONodes(1)) + implicit val sqlContext: SQLContext = spark.sqlContext /** * Define the pipeline stages @@ -113,7 +114,7 @@ abstract class PipelinePredictionTestBase extends FunSuite with SparkTestContext val data = load(spark.sparkContext, "smsData.txt") val model = pipeline.fit(data) - H2OContextTestHelper.stopH2OContext(spark.sparkContext, hc) + hc.stop() // return the trained model model }