Skip to content

Commit

Permalink
[SW-1755] Don't need to stop worker nodes in internal backend, spark …
Browse files Browse the repository at this point in the history
…takes care of it as it shutdowns the executors
  • Loading branch information
jakubhava committed Dec 4, 2019
1 parent c35de8b commit 019827a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 5 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/h2o/H2OContext.scala
Expand Up @@ -311,9 +311,11 @@ abstract class H2OContext private(val sparkSession: SparkSession, private val co
if (stopSparkContext) {
sparkContext.stop()
}
// H2O cluster is managed by the user in case of manual start of external backend.
if (!(conf.isManualClusterStartUsed && conf.runsInExternalClusterMode)) {
H2O.orderlyShutdown()
// Run orderly shutdown only in case of automatic mode of external backend, because:
// In internal backend, Spark takes care of stopping executors automatically
// In manual mode of external backend, the H2O cluster is managed by the user
if (conf.runsInExternalClusterMode && conf.isAutoClusterStartUsed) {
H2O.orderlyShutdown(conf.externalBackendStopTimeout)
}
H2OContext.instantiatedContext.set(null)
stopped = true
Expand Down
Expand Up @@ -64,6 +64,7 @@ trait ExternalBackendConf extends SharedBackendConf {
PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1,
PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._2)
def externalCommunicationBlockSize: String = sparkConf.get(PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1, PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._2)
def externalBackendStopTimeout: Int = sparkConf.getInt(PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._1, PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._2)
private[backends] def isBackendVersionCheckDisabled() = sparkConf.getBoolean(PROP_EXTERNAL_DISABLE_VERSION_CHECK._1, PROP_EXTERNAL_DISABLE_VERSION_CHECK._2)

/** Setters */
Expand Down Expand Up @@ -130,6 +131,7 @@ trait ExternalBackendConf extends SharedBackendConf {
def setExternalH2ODriverPortRange(portRange: String): H2OConf = set(PROP_EXTERNAL_DRIVER_PORT_RANGE._1, portRange)
def setExternalExtraMemoryPercent(memoryPercent: Int): H2OConf = set(PROP_EXTERNAL_EXTRA_MEMORY_PERCENT._1, memoryPercent.toString)
def setExternalCommunicationBlockSize(blockSize: String): H2OConf = set(PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1, blockSize)
def setExternalBackendStopTimeout(timeout: Int): H2OConf = set(PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._1, timeout.toString)

def externalConfString: String =
s"""Sparkling Water configuration:
Expand Down Expand Up @@ -231,6 +233,13 @@ object ExternalBackendConf {
*/
val PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE = ("spark.ext.h2o.external.communication.blockSize", "1m")

/**
* How long we wait before for proper stopping of automatic mode of external backend before we give
* up on confirmation from worker nodes. It is also possible to pass -1 to ensure we wait indefinitely.
* The unit is milliseconds.
*/
val PROP_EXTERNAL_BACKEND_STOP_TIMEOUT = ("spark.ext.h2o.external.backend.stop.timeout", 10000)

/** Disable version check of external H2O backend */
val PROP_EXTERNAL_DISABLE_VERSION_CHECK = ("spark.ext.h2o.external.disable.version.check", false)
}
Expand Up @@ -364,6 +364,14 @@ External backend configuration properties
| | | suffix "k", "m" or "g" (e.g. |
| | | ``450k``, ``3m``) |
+-------------------------------------------------------+----------------+-------------------------------------+
| ``sspark.ext.h2o.external.backend.stop.timeout`` | ``10000ms`` | How long we wait before for proper |
| | | external backend before we give |
| | | up on confirmation from worker |
| | | nodes. It is also possible to |
| | | pass ``-1`` to ensure we wait |
| | | indefinitely. The unit is |
| | | milliseconds. |
+-------------------------------------------------------+----------------+-------------------------------------+

--------------

Expand Down
9 changes: 8 additions & 1 deletion py/src/ai/h2o/sparkling/H2OConf.py
Expand Up @@ -464,6 +464,10 @@ def setExternalCommunicationBlockSize(self, blockSize):
self._jconf.setExternalCommunicationBlockSize(blockSize)
return self

def setExternalBackendStopTimeout(self, timeout):
self._jconf.setExternalBackendStopTimeout(timeout)
return self

# getters independent on backend

def backend_cluster_mode(self):
Expand Down Expand Up @@ -722,7 +726,10 @@ def externalExtraMemoryPercent(self):

def externalCommunicationBlockSizeAsBytes(self):
return self._jconf.externalCommunicationBlockSizeAsBytes()


def externalBackendStopTimeout(self):
return self._jconf.externalBackendStopTimeout()

def set(self, key, value):
self._jconf.set(key, value)
return self
Expand Down
2 changes: 1 addition & 1 deletion py/src/ai/h2o/sparkling/H2OContext.py
Expand Up @@ -180,7 +180,7 @@ def __getClientConnectedField(self):

def stop(self):
try:
if not (self._conf.is_manual_cluster_start_used() and self._conf.runs_in_external_cluster_mode()):
if self._conf.runs_in_external_cluster_mode() and self._conf.is_auto_cluster_start_used():
h2o.cluster().shutdown()
except:
pass
Expand Down

0 comments on commit 019827a

Please sign in to comment.