diff --git a/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala b/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala index 0bbc2904f7..c903fe3a77 100644 --- a/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala +++ b/core/src/main/scala/org/apache/spark/h2o/H2OContext.scala @@ -311,9 +311,11 @@ abstract class H2OContext private(val sparkSession: SparkSession, private val co if (stopSparkContext) { sparkContext.stop() } - // H2O cluster is managed by the user in case of manual start of external backend. - if (!(conf.isManualClusterStartUsed && conf.runsInExternalClusterMode)) { - H2O.orderlyShutdown() + // Run orderly shutdown only in case of automatic mode of external backend, because: + // In internal backend, Spark takes care of stopping executors automatically + // In manual mode of external backend, the H2O cluster is managed by the user + if (conf.runsInExternalClusterMode && conf.isAutoClusterStartUsed) { + H2O.orderlyShutdown(conf.externalBackendStopTimeout) } H2OContext.instantiatedContext.set(null) stopped = true diff --git a/core/src/main/scala/org/apache/spark/h2o/backends/external/ExternalBackendConf.scala b/core/src/main/scala/org/apache/spark/h2o/backends/external/ExternalBackendConf.scala index 15ff6ffd59..f40ee46767 100644 --- a/core/src/main/scala/org/apache/spark/h2o/backends/external/ExternalBackendConf.scala +++ b/core/src/main/scala/org/apache/spark/h2o/backends/external/ExternalBackendConf.scala @@ -64,6 +64,7 @@ trait ExternalBackendConf extends SharedBackendConf { PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1, PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._2) def externalCommunicationBlockSize: String = sparkConf.get(PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1, PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._2) + def externalBackendStopTimeout: Int = sparkConf.getInt(PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._1, PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._2) private[backends] def isBackendVersionCheckDisabled() = sparkConf.getBoolean(PROP_EXTERNAL_DISABLE_VERSION_CHECK._1, PROP_EXTERNAL_DISABLE_VERSION_CHECK._2) /** Setters */ @@ -130,6 +131,7 @@ trait ExternalBackendConf extends SharedBackendConf { def setExternalH2ODriverPortRange(portRange: String): H2OConf = set(PROP_EXTERNAL_DRIVER_PORT_RANGE._1, portRange) def setExternalExtraMemoryPercent(memoryPercent: Int): H2OConf = set(PROP_EXTERNAL_EXTRA_MEMORY_PERCENT._1, memoryPercent.toString) def setExternalCommunicationBlockSize(blockSize: String): H2OConf = set(PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1, blockSize) + def setExternalBackendStopTimeout(timeout: Int): H2OConf = set(PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._1, timeout.toString) def externalConfString: String = s"""Sparkling Water configuration: @@ -231,6 +233,12 @@ object ExternalBackendConf { */ val PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE = ("spark.ext.h2o.external.communication.blockSize", "1m") + /** + * Timeout for confirmation from worker nodes when stopping the external backend. It is also + * possible to pass -1 to ensure the indefinite timeout. The unit is milliseconds. + */ + val PROP_EXTERNAL_BACKEND_STOP_TIMEOUT = ("spark.ext.h2o.external.backend.stop.timeout", 10000) + /** Disable version check of external H2O backend */ val PROP_EXTERNAL_DISABLE_VERSION_CHECK = ("spark.ext.h2o.external.disable.version.check", false) } diff --git a/doc/src/site/sphinx/configuration/configuration_properties.rst b/doc/src/site/sphinx/configuration/configuration_properties.rst index f75e4f6713..849d62f018 100644 --- a/doc/src/site/sphinx/configuration/configuration_properties.rst +++ b/doc/src/site/sphinx/configuration/configuration_properties.rst @@ -364,6 +364,13 @@ External backend configuration properties | | | suffix "k", "m" or "g" (e.g. | | | | ``450k``, ``3m``) | +-------------------------------------------------------+----------------+-------------------------------------+ +| ``spark.ext.h2o.external.backend.stop.timeout`` | ``10000ms`` | Timeout for confirmation from | +| | | worker nodes when stopping the | +| | | external backend. It is also | +| | | possible to pass ``-1`` to ensure | +| | | the indefinite timeout. The unit is | +| | | milliseconds. | ++-------------------------------------------------------+----------------+-------------------------------------+ -------------- diff --git a/py/src/ai/h2o/sparkling/H2OConf.py b/py/src/ai/h2o/sparkling/H2OConf.py index 694ea59810..d7dc26779a 100644 --- a/py/src/ai/h2o/sparkling/H2OConf.py +++ b/py/src/ai/h2o/sparkling/H2OConf.py @@ -464,6 +464,10 @@ def setExternalCommunicationBlockSize(self, blockSize): self._jconf.setExternalCommunicationBlockSize(blockSize) return self + def setExternalBackendStopTimeout(self, timeout): + self._jconf.setExternalBackendStopTimeout(timeout) + return self + # getters independent on backend def backend_cluster_mode(self): @@ -722,7 +726,10 @@ def externalExtraMemoryPercent(self): def externalCommunicationBlockSizeAsBytes(self): return self._jconf.externalCommunicationBlockSizeAsBytes() - + + def externalBackendStopTimeout(self): + return self._jconf.externalBackendStopTimeout() + def set(self, key, value): self._jconf.set(key, value) return self diff --git a/py/src/ai/h2o/sparkling/H2OContext.py b/py/src/ai/h2o/sparkling/H2OContext.py index 37d476fb27..fac879167c 100644 --- a/py/src/ai/h2o/sparkling/H2OContext.py +++ b/py/src/ai/h2o/sparkling/H2OContext.py @@ -180,7 +180,7 @@ def __getClientConnectedField(self): def stop(self): try: - if not (self._conf.is_manual_cluster_start_used() and self._conf.runs_in_external_cluster_mode()): + if self._conf.runs_in_external_cluster_mode() and self._conf.is_auto_cluster_start_used(): h2o.cluster().shutdown() except: pass