Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-407] Make Configuration consistent #336

Merged
merged 1 commit into from Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,110 +27,124 @@ trait SharedBackendConf {

import SharedBackendConf._

/** Getters */

/** Generic parameters */
def backendClusterMode = sparkConf.get(PROP_BACKEND_CLUSTER_MODE._1, PROP_BACKEND_CLUSTER_MODE._2)
def cloudName = sparkConf.getOption(PROP_CLOUD_NAME._1)
def nthreads = sparkConf.getInt(PROP_NTHREADS._1, PROP_NTHREADS._2)
def disableGA = sparkConf.getBoolean(PROP_DISABLE_GA._1, PROP_DISABLE_GA._2)
def isH2OReplEnabled = sparkConf.getBoolean(PROP_REPL_ENABLED._1, PROP_REPL_ENABLED._2)
def scalaIntDefaultNum = sparkConf.getInt(PROP_SCALA_INT_DEFAULT_NUM._1, PROP_SCALA_INT_DEFAULT_NUM._2)
def isClusterTopologyListenerEnabled = sparkConf.getBoolean(PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED._1, PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED._2)
def isSparkVersionCheckEnabled = sparkConf.getBoolean(PROP_SPARK_VERSION_CHECK_ENABLED._1, PROP_SPARK_VERSION_CHECK_ENABLED._2)
def isFailOnUnsupportedSparkParamEnabled = sparkConf.getBoolean(PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM._1, PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM._2)
def jks = sparkConf.getOption(PROP_JKS._1)
def jksPass = sparkConf.getOption(PROP_JKS_PASS._1)
def hashLogin = sparkConf.getBoolean(PROP_HASH_LOGIN._1, PROP_HASH_LOGIN._2)
def ldapLogin = sparkConf.getBoolean(PROP_LDAP_LOGIN._1, PROP_LDAP_LOGIN._2)
def kerberosLogin = sparkConf.getBoolean(PROP_KERBEROS_LOGIN._1, PROP_KERBEROS_LOGIN._2)
def loginConf = sparkConf.getOption(PROP_LOGIN_CONF._1)
def userName = sparkConf.getOption(PROP_USER_NAME._1)
def sslConf = sparkConf.getOption(PROP_SSL_CONF._1)
def h2oNodeLogLevel = sparkConf.get(PROP_NODE_LOG_LEVEL._1, PROP_NODE_LOG_LEVEL._2)
def h2oNodeLogDir = sparkConf.getOption(PROP_NODE_LOG_DIR._1)
def uiUpdateInterval = sparkConf.getInt(PROP_UI_UPDATE_INTERVAL._1, PROP_UI_UPDATE_INTERVAL._2)
def cloudTimeout = sparkConf.getInt(PROP_CLOUD_TIMEOUT._1, PROP_CLOUD_TIMEOUT._2)


/** H2O Client parameters */
def flowDir = sparkConf.getOption(PROP_FLOW_DIR._1)
def clientIp = sparkConf.getOption(PROP_CLIENT_IP._1)
def clientVerboseOutput = sparkConf.getBoolean(PROP_CLIENT_VERBOSE._1, PROP_CLIENT_VERBOSE._2)
def clientBasePort = sparkConf.getInt(PROP_CLIENT_PORT_BASE._1, PROP_CLIENT_PORT_BASE._2)
def cloudName = sparkConf.getOption(PROP_CLOUD_NAME._1)
def clientIcedDir = sparkConf.getOption(PROP_CLIENT_ICED_DIR._1)
def h2oClientLogLevel = sparkConf.get(PROP_CLIENT_LOG_LEVEL._1, PROP_CLIENT_LOG_LEVEL._2)
def h2oClientLogDir = sparkConf.getOption(PROP_CLIENT_LOG_DIR._1)
def clientNetworkMask = sparkConf.getOption(PROP_CLIENT_NETWORK_MASK._1)
def nthreads = sparkConf.getInt(PROP_NTHREADS._1, PROP_NTHREADS._2)
def disableGA = sparkConf.getBoolean(PROP_DISABLE_GA._1, PROP_DISABLE_GA._2)
def clientBasePort = sparkConf.getInt(PROP_CLIENT_PORT_BASE._1, PROP_CLIENT_PORT_BASE._2)
def clientWebPort = sparkConf.getInt(PROP_CLIENT_WEB_PORT._1, PROP_CLIENT_WEB_PORT._2)
def clientIcedDir = sparkConf.getOption(PROP_CLIENT_ICED_DIR._1)
def clientVerboseOutput = sparkConf.getBoolean(PROP_CLIENT_VERBOSE._1, PROP_CLIENT_VERBOSE._2)
def clientNetworkMask = sparkConf.getOption(PROP_CLIENT_NETWORK_MASK._1)

def jks = sparkConf.getOption(PROP_JKS._1)
def jksPass = sparkConf.getOption(PROP_JKS_PASS._1)
def hashLogin = sparkConf.getBoolean(PROP_HASH_LOGIN._1, PROP_HASH_LOGIN._2)
def ldapLogin = sparkConf.getBoolean(PROP_LDAP_LOGIN._1, PROP_LDAP_LOGIN._2)
def kerberosLogin = sparkConf.getBoolean(PROP_KERBEROS_LOGIN._1, PROP_KERBEROS_LOGIN._2)
def loginConf = sparkConf.getOption(PROP_LOGIN_CONF._1)
def userName = sparkConf.getOption(PROP_USER_NAME._1)
def runsInExternalClusterMode: Boolean = backendClusterMode.toLowerCase() == "external"
def runsInInternalClusterMode: Boolean = !runsInExternalClusterMode

def sslConf = sparkConf.getOption(PROP_SSL_CONF._1)
/** Setters */

def isFailOnUnsupportedSparkParamEnabled = sparkConf.getBoolean(PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM._1, PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM._2)
def scalaIntDefaultNum = sparkConf.getInt(PROP_SCALA_INT_DEFAULT_NUM._1, PROP_SCALA_INT_DEFAULT_NUM._2)
def isH2OReplEnabled = sparkConf.getBoolean(PROP_REPL_ENABLED._1, PROP_REPL_ENABLED._2)
def isClusterTopologyListenerEnabled = sparkConf.getBoolean(PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED._1, PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED._2)
/** Generic parameters */
def setInternalClusterMode() = {
if(runsInExternalClusterMode){
logWarning("Using internal cluster mode!")
}
setBackendClusterMode("internal")
}

def isSparkVersionCheckEnabled = sparkConf.getBoolean(PROP_SPARK_VERSION_CHECK_ENABLED._1, PROP_SPARK_VERSION_CHECK_ENABLED._2)
def setExternalClusterMode() = {
if(runsInInternalClusterMode){
logWarning("Using external cluster mode!")
}
setBackendClusterMode("external")
}

def runsInExternalClusterMode: Boolean = backendClusterMode.toLowerCase() == "external"
def runsInInternalClusterMode: Boolean = !runsInExternalClusterMode
def setCloudName(cloudName: String) = set(PROP_CLOUD_NAME._1, cloudName)
def setNthreads(numThreads: Int) = set(PROP_NTHREADS._1, nthreads.toString)

def h2oNodeLogLevel = sparkConf.get(PROP_NODE_LOG_LEVEL._1, PROP_NODE_LOG_LEVEL._2)
def h2oNodeLogDir = sparkConf.getOption(PROP_NODE_LOG_DIR._1)
def setGAEnabled() = set(PROP_DISABLE_GA._1, true.toString)
def setGADisabled() = set(PROP_DISABLE_GA._1, false.toString)

def uiUpdateInterval = sparkConf.getInt(PROP_UI_UPDATE_INTERVAL._1, PROP_UI_UPDATE_INTERVAL._2)
def flowDir = sparkConf.getOption(PROP_FLOW_DIR._1)
def setReplEnabled() = set(PROP_REPL_ENABLED._1, true.toString)
def setReplDisabled() = set(PROP_REPL_ENABLED._1, false.toString)

def setUiUpdateInterval(interval: Int): H2OConf = {
sparkConf.set(PROP_UI_UPDATE_INTERVAL._1, interval.toString)
self
}
def setDefaultNumReplSessions(numSessions: Int) = set(PROP_SCALA_INT_DEFAULT_NUM._1, numSessions.toString)

def setH2ONodeLogLevel(level: String): H2OConf = {
sparkConf.set(PROP_NODE_LOG_LEVEL._1, level)
self
}
def setClusterTopologyListenerEnabled() = set(PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED._1, true.toString)
def setClusterTopologyListenerDisabled() = set(PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED._1, false.toString)

def setCloudName(cloudName: String): H2OConf = {
sparkConf.set(PROP_CLOUD_NAME._1, cloudName)
self
}
def setSparkVersionCheckEnable() = set(PROP_SPARK_VERSION_CHECK_ENABLED._1, true.toString)
def setSparkVersionCheckDisabled() = set(PROP_SPARK_VERSION_CHECK_ENABLED._1, false.toString)

private[this] def setBackendClusterMode(backendClusterMode: String): H2OConf = {
sparkConf.set(PROP_BACKEND_CLUSTER_MODE._1, backendClusterMode)
self
}
def setFailOnUnsupportedSparkParamEnabled() = set(PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM._1, true.toString)
def setFailOnUnsupportedSparkParamDisabled() = set(PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM._1, false.toString)

def setInternalClusterMode(): H2OConf = setBackendClusterMode("internal")
def setJks(path: String) = set(PROP_JKS._1, path)
def setJksPass(password: String) = set(PROP_JKS_PASS._1, password)

def setReplDisabled(): H2OConf = {
sparkConf.set(PROP_REPL_ENABLED._1, false.toString)
self
}
def setReplEnabled(): H2OConf = {
sparkConf.set(PROP_REPL_ENABLED._1, true.toString)
self
}
def setHashLoginEnabled() = set(PROP_HASH_LOGIN._1, true.toString)
def setHashLoginDisabled() = set(PROP_HASH_LOGIN._1, false.toString)

def setExternalClusterMode(): H2OConf = setBackendClusterMode("external")
def setLdapLoginEnabled() = set(PROP_LDAP_LOGIN._1, true.toString)
def setLdapLoginDisabled() = set(PROP_LDAP_LOGIN._1, false.toString)

def setClientIp(ip: String): H2OConf = {
sparkConf.set(PROP_CLIENT_IP._1, ip)
self
}
def setKerberosLoginEnabled() = set(PROP_KERBEROS_LOGIN._1, true.toString)
def setKerberosLoginDisabled() = set(PROP_KERBEROS_LOGIN._1, false.toString)

def setH2OClientLogLevel(level: String): H2OConf = {
sparkConf.set(PROP_CLIENT_LOG_LEVEL._1, level)
self
}
def setLoginConf(file: String) = set(PROP_LOGIN_CONF._1, file)
def setUserName(username: String) = set(PROP_USER_NAME._1, username)
def setSslConf(path: String) = set(PROP_SSL_CONF._1, path)
def setH2ONodeLogLevel(level: String) = set(PROP_NODE_LOG_LEVEL._1, level)
def setH2ONodeLogDir(dir: String) = set(PROP_NODE_LOG_DIR._1, dir)
def setUiUpdateInterval(interval: Int) = set(PROP_UI_UPDATE_INTERVAL._1, interval.toString)
def setCloudTimeout(timeout: Int) = set(PROP_CLOUD_TIMEOUT._1, timeout.toString)

def setH2OClientLogDir(dir: String): H2OConf = {
sparkConf.set(PROP_CLIENT_LOG_DIR._1, dir)
self
}

def setFlowDir(dir: String): H2OConf = {
sparkConf.set(PROP_FLOW_DIR._1, dir)
self
/** H2O Client parameters */
def setFlowDir(dir: String) = set(PROP_FLOW_DIR._1, dir)
def setClientIp(ip: String) = set(PROP_CLIENT_IP._1, ip)
def setClientIcedDir(icedDir: String) = set(PROP_CLIENT_ICED_DIR._1, icedDir)
def setH2OClientLogLevel(level: String) = set(PROP_CLIENT_LOG_LEVEL._1, level)
def setH2OClientLogDir(dir: String) = set(PROP_CLIENT_LOG_DIR._1, dir)
def setClientPortBase(basePort: Int) = set(PROP_CLIENT_PORT_BASE._1, basePort.toString)

def setClientVerboseEnabled() = set(PROP_CLIENT_VERBOSE._1, true.toString)
def setClientVerboseDisabled() = set(PROP_CLIENT_VERBOSE._1, false.toString)

def setClientNetworkMask(mask: String) = set(PROP_CLIENT_NETWORK_MASK._1, mask)

private[this] def setBackendClusterMode(backendClusterMode: String) = {
set(PROP_BACKEND_CLUSTER_MODE._1, backendClusterMode)
}
}

object SharedBackendConf {

/** Interval for updates of Spark UI in milliseconds */
val PROP_UI_UPDATE_INTERVAL = ("spark.ext.h2o.ui.update.interval", 10000)
/** H2O internal log level for launched remote nodes. */
val PROP_NODE_LOG_LEVEL = ("spark.ext.h2o.node.log.level", "INFO")

/** Location of log directory for remote nodes. */
val PROP_NODE_LOG_DIR = ("spark.ext.h2o.node.log.dir", None)

/**
* This option can be set either to "internal" or "external"
* When set to "external" H2O Context is created by connecting to existing H2O cluster, otherwise it creates
Expand All @@ -139,39 +153,29 @@ object SharedBackendConf {
*/
val PROP_BACKEND_CLUSTER_MODE = ("spark.ext.h2o.backend.cluster.mode", "internal")

/** IP of H2O client node */
val PROP_CLIENT_IP = ("spark.ext.h2o.client.ip", None)

/** Print detailed messages to client stdout */
val PROP_CLIENT_VERBOSE = ("spark.ext.h2o.client.verbose", false)

/** Port on which H2O client publishes its API. If already occupied, the next odd port is tried and so on */
val PROP_CLIENT_PORT_BASE = ( "spark.ext.h2o.client.port.base", 54321 )

/** Configuration property - name of H2O cloud */
val PROP_CLOUD_NAME = ("spark.ext.h2o.cloud.name", None)

/** H2O log level for client running in Spark driver */
val PROP_CLIENT_LOG_LEVEL = ("spark.ext.h2o.client.log.level", "WARN")

/** Location of log directory for the driver instance. */
val PROP_CLIENT_LOG_DIR = ("spark.ext.h2o.client.log.dir", None)

/** Subnet selector for H2O client - if the mask is specified then Spark network setup is not discussed. */
val PROP_CLIENT_NETWORK_MASK = ("spark.ext.h2o.client.network.mask", None)

/** Limit for number of threads used by H2O, default -1 means unlimited */
val PROP_NTHREADS = ("spark.ext.h2o.nthreads", -1)

/** Disable GA tracking */
val PROP_DISABLE_GA = ("spark.ext.h2o.disable.ga", true)

/** Exact client port to access web UI.
* The value `-1` means automatic search for free port starting at `spark.ext.h2o.port.base`. */
val PROP_CLIENT_WEB_PORT = ("spark.ext.h2o.client.web.port", -1)
/** Enable/Disable Sparkling-Water REPL **/
val PROP_REPL_ENABLED = ("spark.ext.h2o.repl.enabled", true)

/** Location of iced directory for the driver instance. */
val PROP_CLIENT_ICED_DIR = ("spark.ext.h2o.client.iced.dir", None)
/** Number of executors started at the start of h2o services, by default 1 */
val PROP_SCALA_INT_DEFAULT_NUM = ("spark.ext.scala.int.default.num", 1)

/** Enable/Disable listener which kills H2O when there is a change in underlying cluster's topology**/
val PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED = ("spark.ext.h2o.topology.change.listener.enabled", true)

/** Enable/Disable check for Spark version. */
val PROP_SPARK_VERSION_CHECK_ENABLED = ("spark.ext.h2o.spark.version.check.enabled", true)

/** Enable/Disable exit on unsupported Spark parameters. */
val PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM = ("spark.ext.h2o.fail.on.unsupported.spark.param", true)

/** Path to Java KeyStore file. */
val PROP_JKS = ("spark.ext.h2o.jks", None)
Expand All @@ -194,24 +198,46 @@ object SharedBackendConf {
/** Override user name for cluster. */
val PROP_USER_NAME = ("spark.ext.h2o.user.name", None)

/** Number of executors started at the start of h2o services, by default 1 */
val PROP_SCALA_INT_DEFAULT_NUM = ("spark.ext.scala.int.default.num", 1)

/** Enable/Disable Sparkling-Water REPL **/
val PROP_REPL_ENABLED = ("spark.ext.h2o.repl.enabled", true)
/** Path to Java KeyStore file. */
val PROP_SSL_CONF = ("spark.ext.h2o.internal_security_conf", None)

/** Enable/Disable listener which kills H2O when there is a change in underlying cluster's topology**/
val PROP_CLUSTER_TOPOLOGY_LISTENER_ENABLED = ("spark.ext.h2o.topology.change.listener.enabled", true)
/** H2O internal log level for launched remote nodes. */
val PROP_NODE_LOG_LEVEL = ("spark.ext.h2o.node.log.level", "INFO")

/** Enable/Disable check for Spark version. */
val PROP_SPARK_VERSION_CHECK_ENABLED = ("spark.ext.h2o.spark.version.check.enabled", true)
/** Location of log directory for remote nodes. */
val PROP_NODE_LOG_DIR = ("spark.ext.h2o.node.log.dir", None)

/** Enable/Disable exit on unsupported Spark parameters. */
val PROP_FAIL_ON_UNSUPPORTED_SPARK_PARAM = ("spark.ext.h2o.fail.on.unsupported.spark.param", true)
/** Interval for updates of Spark UI in milliseconds */
val PROP_UI_UPDATE_INTERVAL = ("spark.ext.h2o.ui.update.interval", 10000)

/** Path to Java KeyStore file. */
val PROP_SSL_CONF = ("spark.ext.h2o.internal_security_conf", None)
/** Configuration property - timeout for cloud up. */
val PROP_CLOUD_TIMEOUT = ("spark.ext.h2o.cloud.timeout", 60 * 1000)

/** Path to flow dir. */
val PROP_FLOW_DIR=("spark.ext.h2o.client.flow.dir", None)
val PROP_FLOW_DIR = ("spark.ext.h2o.client.flow.dir", None)

/** IP of H2O client node */
val PROP_CLIENT_IP = ("spark.ext.h2o.client.ip", None)

/** Location of iced directory for the driver instance. */
val PROP_CLIENT_ICED_DIR = ("spark.ext.h2o.client.iced.dir", None)

/** H2O log level for client running in Spark driver */
val PROP_CLIENT_LOG_LEVEL = ("spark.ext.h2o.client.log.level", "WARN")

/** Location of log directory for the driver instance. */
val PROP_CLIENT_LOG_DIR = ("spark.ext.h2o.client.log.dir", None)

/** Port on which H2O client publishes its API. If already occupied, the next odd port is tried and so on */
val PROP_CLIENT_PORT_BASE = ("spark.ext.h2o.client.port.base", 54321)

/** Exact client port to access web UI.
* The value `-1` means automatic search for free port starting at `spark.ext.h2o.port.base`. */
val PROP_CLIENT_WEB_PORT = ("spark.ext.h2o.client.web.port", -1)

/** Print detailed messages to client stdout */
val PROP_CLIENT_VERBOSE = ("spark.ext.h2o.client.verbose", false)

/** Subnet selector for H2O client - if the mask is specified then Spark network setup is not discussed. */
val PROP_CLIENT_NETWORK_MASK = ("spark.ext.h2o.client.network.mask", None)
}