Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-1755] Don't need to stop worker nodes in internal backend, spark takes care of it as it shutdowns the executors #1661

Merged
merged 2 commits into from Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/h2o/H2OContext.scala
Expand Up @@ -311,9 +311,11 @@ abstract class H2OContext private(val sparkSession: SparkSession, private val co
if (stopSparkContext) {
sparkContext.stop()
}
// H2O cluster is managed by the user in case of manual start of external backend.
if (!(conf.isManualClusterStartUsed && conf.runsInExternalClusterMode)) {
H2O.orderlyShutdown()
// Run orderly shutdown only in case of automatic mode of external backend, because:
// In internal backend, Spark takes care of stopping executors automatically
// In manual mode of external backend, the H2O cluster is managed by the user
if (conf.runsInExternalClusterMode && conf.isAutoClusterStartUsed) {
H2O.orderlyShutdown(conf.externalBackendStopTimeout)
}
H2OContext.instantiatedContext.set(null)
stopped = true
Expand Down
Expand Up @@ -64,6 +64,7 @@ trait ExternalBackendConf extends SharedBackendConf {
PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1,
PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._2)
def externalCommunicationBlockSize: String = sparkConf.get(PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1, PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._2)
def externalBackendStopTimeout: Int = sparkConf.getInt(PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._1, PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._2)
private[backends] def isBackendVersionCheckDisabled() = sparkConf.getBoolean(PROP_EXTERNAL_DISABLE_VERSION_CHECK._1, PROP_EXTERNAL_DISABLE_VERSION_CHECK._2)

/** Setters */
Expand Down Expand Up @@ -130,6 +131,7 @@ trait ExternalBackendConf extends SharedBackendConf {
def setExternalH2ODriverPortRange(portRange: String): H2OConf = set(PROP_EXTERNAL_DRIVER_PORT_RANGE._1, portRange)
def setExternalExtraMemoryPercent(memoryPercent: Int): H2OConf = set(PROP_EXTERNAL_EXTRA_MEMORY_PERCENT._1, memoryPercent.toString)
def setExternalCommunicationBlockSize(blockSize: String): H2OConf = set(PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE._1, blockSize)
def setExternalBackendStopTimeout(timeout: Int): H2OConf = set(PROP_EXTERNAL_BACKEND_STOP_TIMEOUT._1, timeout.toString)

def externalConfString: String =
s"""Sparkling Water configuration:
Expand Down Expand Up @@ -231,6 +233,12 @@ object ExternalBackendConf {
*/
val PROP_EXTERNAL_COMMUNICATION_BLOCK_SIZE = ("spark.ext.h2o.external.communication.blockSize", "1m")

/**
* Timeout for confirmation from worker nodes when stopping the external backend. It is also
* possible to pass -1 to ensure the indefinite timeout. The unit is milliseconds.
*/
val PROP_EXTERNAL_BACKEND_STOP_TIMEOUT = ("spark.ext.h2o.external.backend.stop.timeout", 10000)

/** Disable version check of external H2O backend */
val PROP_EXTERNAL_DISABLE_VERSION_CHECK = ("spark.ext.h2o.external.disable.version.check", false)
}
Expand Up @@ -364,6 +364,13 @@ External backend configuration properties
| | | suffix "k", "m" or "g" (e.g. |
| | | ``450k``, ``3m``) |
+-------------------------------------------------------+----------------+-------------------------------------+
| ``spark.ext.h2o.external.backend.stop.timeout`` | ``10000ms`` | Timeout for confirmation from |
| | | worker nodes when stopping the |
| | | external backend. It is also |
| | | possible to pass ``-1`` to ensure |
| | | the indefinite timeout. The unit is |
| | | milliseconds. |
+-------------------------------------------------------+----------------+-------------------------------------+

--------------

Expand Down
9 changes: 8 additions & 1 deletion py/src/ai/h2o/sparkling/H2OConf.py
Expand Up @@ -464,6 +464,10 @@ def setExternalCommunicationBlockSize(self, blockSize):
self._jconf.setExternalCommunicationBlockSize(blockSize)
return self

def setExternalBackendStopTimeout(self, timeout):
self._jconf.setExternalBackendStopTimeout(timeout)
return self

# getters independent on backend

def backend_cluster_mode(self):
Expand Down Expand Up @@ -722,7 +726,10 @@ def externalExtraMemoryPercent(self):

def externalCommunicationBlockSizeAsBytes(self):
return self._jconf.externalCommunicationBlockSizeAsBytes()


def externalBackendStopTimeout(self):
return self._jconf.externalBackendStopTimeout()

def set(self, key, value):
self._jconf.set(key, value)
return self
Expand Down
2 changes: 1 addition & 1 deletion py/src/ai/h2o/sparkling/H2OContext.py
Expand Up @@ -180,7 +180,7 @@ def __getClientConnectedField(self):

def stop(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used only for explicit stop by the user, if the user does not call this stop() method in pysparkling, the shutdown hook is called and stop() method on scala side is invoked

try:
if not (self._conf.is_manual_cluster_start_used() and self._conf.runs_in_external_cluster_mode()):
if self._conf.runs_in_external_cluster_mode() and self._conf.is_auto_cluster_start_used():
h2o.cluster().shutdown()
except:
pass
Expand Down