Skip to content

Commit

Permalink
Issue 75 - Configurable Spark prefix in case class configuration pars…
Browse files Browse the repository at this point in the history
…er (#77)

* Initial changes to include spark prefix and additional map

* Fix app for prefix change

* Update Scala docs
  • Loading branch information
alexjbush committed May 8, 2019
1 parent a07fbd9 commit 7edaeec
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 13 deletions.
Expand Up @@ -4,13 +4,22 @@ import java.util.Properties

import com.coxautodata.waimak.dataflow.spark.SparkFlowContext
import com.coxautodata.waimak.log.Logging
import org.apache.spark.sql.RuntimeConfig

import scala.annotation.StaticAnnotation
import scala.util.Try

object CaseClassConfigParser extends Logging {

val configParamPrefix: String = "spark.waimak.config"

/**
* Prefix to add to parameters when looking in the Spark conf. For example, if looking for parameter
* `args.arg1` then the parser will look for `spark.args.arg1` in the Spark conf by default.
* This can be disabled by setting this property to an empty string.
*/
val SPARK_CONF_PROPERTY_PREFIX: String = s"$configParamPrefix.sparkConfPropertyPrefix"
val SPARK_CONF_PROPERTY_PREFIX_DEFAULT: String = "spark."
/**
* Comma separated list of property provider builder object names to instantiate.
* Set this to have the config parser use the custom objects to search for configuration
Expand Down Expand Up @@ -140,6 +149,10 @@ object CaseClassConfigParser extends Logging {
.map(_.getPropertyProvider(context))
}

def getStrippedSparkProperties(conf: RuntimeConfig, prefix: String): Map[String, String] = conf.getAll.collect {
case (k, v) if k.startsWith(prefix) => k.stripPrefix(prefix) -> v
}

/**
* Populate a Case Class from an instance of SparkConf. It will attempt to cast the
* configuration values to the correct types, and most primitive, Option[primitive],
Expand All @@ -152,18 +165,23 @@ object CaseClassConfigParser extends Logging {
* The parameters keys that are looked up will be of the form: {prefix}{parameter},
* e.g. for case class Ex(key: String) and prefix="example.prefix.",
* then the key will have the form "example.prefix.key"
* By default, properties in the SparkConf will be looked up with an additional prefix (see [[SPARK_CONF_PROPERTY_PREFIX]]).
* The order in which properties are looked up are: 1) Spark Conf, 2) Additional conf map, 3) Property Providers (in order they were specified if multiple were given)
*
* @param context Instance of [[SparkFlowContext]] containing a spark session with configuration
* @param prefix Prefix to assign to a Key when looking in SparkConf
* @param context Instance of [[SparkFlowContext]] containing a spark session with configuration
* @param prefix Prefix to assign to a Key when looking in SparkConf
* @param additionalConf An additional set of properties to search. Preference is given to SparkConf values if the key exists in
* both this additionalConf and SparkConf
* @tparam A Case class type to construct
* @return An instantiated case class populated from the SparkConf instance and default arguments
*/
@throws(classOf[NoSuchElementException])
@throws(classOf[UnsupportedOperationException])
@throws(classOf[NumberFormatException])
@throws(classOf[IllegalArgumentException])
def apply[A: TypeTag](context: SparkFlowContext, prefix: String): A = {
fromMap[A](context.spark.conf.getAll, prefix, getPropertyProviders(context))
def apply[A: TypeTag](context: SparkFlowContext, prefix: String, additionalConf: Map[String, String] = Map.empty): A = {
val fromSparkConf = getStrippedSparkProperties(context.spark.conf, context.getString(SPARK_CONF_PROPERTY_PREFIX, SPARK_CONF_PROPERTY_PREFIX_DEFAULT))
fromMap[A](additionalConf ++ fromSparkConf, prefix, getPropertyProviders(context))
}

def fromMap[A: TypeTag](conf: Map[String, String], prefix: String = "", properties: Seq[PropertyProvider] = Seq.empty): A = {
Expand Down Expand Up @@ -225,6 +243,6 @@ trait PropertyProvider {
/**
* A property provider implementation that simply wraps around a [[java.util.Properties]] object
*/
class JavaPropertiesPropertyProvider(properties: Properties) extends PropertyProvider{
class JavaPropertiesPropertyProvider(properties: Properties) extends PropertyProvider {
override def get(key: String): Option[String] = Option(properties.getProperty(key))
}
}
Expand Up @@ -42,6 +42,8 @@ object TestCaseClasses {

case class PropertiesTest(string: String, optionString: Option[String] = None)

case class PrefixTest(arg1: String, arg2: String, arg3: String)

}

class TestCaseClassConfigParser extends SparkSpec {
Expand All @@ -55,6 +57,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parse a case class with all supported primitive types") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("string", "string")
conf.set("byte", "100")
conf.set("short", "1")
Expand All @@ -66,9 +69,27 @@ class TestCaseClassConfigParser extends SparkSpec {
CaseClassConfigParser[PrimTypesTest](context, "") should be(PrimTypesTest("string", 100.toByte, 1, 2, 3, 4.1f, 5.2, true))
}

it("parse a case class taking off the prefix by default for spark config") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set("spark.arg1", "1")
conf.set("spark.arg2", "2")
conf.set("spark.arg3", "3")
CaseClassConfigParser[PrefixTest](context, "") should be(PrefixTest("1", "2", "3"))
}

it("parse a case class taking preference for values in spark conf") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set("spark.arg1", "1")
conf.set("spark.arg2", "2")
CaseClassConfigParser[PrefixTest](context, "", Map("arg2" -> "0", "arg3" -> "0")) should be(PrefixTest("1", "2", "0"))
}

it("parse a case class with all supported primitive option types set with values") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("string", "string")
conf.set("byte", "100")
conf.set("short", "1")
Expand All @@ -84,12 +105,14 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parse a case class with default arguments set and having no config set in SparkConf") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
CaseClassConfigParser[DefArgTest](context, "") should be(DefArgTest(false, None))
}

it("parse a case class with default arguments set and config set in SparkConf") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("boolean", "true")
conf.set("optionInt", "1")
CaseClassConfigParser[DefArgTest](context, "") should be(DefArgTest(true, Some(1)))
Expand All @@ -98,6 +121,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parse two case classes with different prefixes") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("spark.pref1.boolean", "true")
conf.set("spark.pref2.optionInt", "1")
CaseClassConfigParser[Pref1Test](context, "spark.pref1.") should be(Pref1Test(true, None))
Expand All @@ -107,6 +131,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parse a list of strings and ints") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("string", "one,two,three")
conf.set("int", "1")
CaseClassConfigParser[SeqTest](context, "") should be(SeqTest(Seq("one", "two", "three"), Seq(1)))
Expand All @@ -115,6 +140,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parse a list of strings and ints with custom separator") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("string", "one,two,three")
conf.set("int", "1==2==3")
CaseClassConfigParser[SeqSeparatorTest](context, "") should be(SeqSeparatorTest(Seq("one", "two", "three"), Seq(1, 2, 3)))
Expand All @@ -123,6 +149,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parse a list of strings and ints with custom and string value as empty") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("string", "")
conf.set("int", "1")
CaseClassConfigParser[SeqSeparatorTest](context, "") should be(SeqSeparatorTest(Seq(""), Seq(1)))
Expand All @@ -131,13 +158,15 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parse a list type") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("string", "")
CaseClassConfigParser[ListTest](context, "") should be(ListTest(List("")))
}

it("parse all supported collection types") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("list", "one,two,three")
conf.set("seq", "1.1,2.2,3.3")
conf.set("vector", "true,false")
Expand All @@ -153,6 +182,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parsing a missing configuration should throw an exception") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
intercept[NoSuchElementException] {
CaseClassConfigParser[MissingArgTest](context, "")
}
Expand All @@ -161,6 +191,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parsing a configuration with a wrong prefix should throw an exception") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("spark.missing", "value")
// Check for false positive result
CaseClassConfigParser[MissingArgTest](context, "spark.") should be(MissingArgTest("value"))
Expand All @@ -172,6 +203,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parsing a configuration with wrong type should throw a parsing error") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("intError", "wrong")
conf.set("optionIntError", "wrong")
conf.set("booleanError", "wrong")
Expand All @@ -189,6 +221,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parsing a case class with an unsupported type should throw an exception") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("unknown", "wrong")
intercept[UnsupportedOperationException] {
CaseClassConfigParser[ParseUnknownTypeTest](context, "")
Expand All @@ -198,6 +231,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parsing a case class nested in a class should throw an exception with a helpful error") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
intercept[UnsupportedOperationException] {
CaseClassConfigParser[ParseNestedClassTest](context, "")
}
Expand All @@ -206,6 +240,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("parsing a case class with an unsupported collection type should throw an exception") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set("unsupported", "1,2")
intercept[UnsupportedOperationException] {
CaseClassConfigParser[ParseUnsupportedCollectionTest](context, "")
Expand All @@ -219,6 +254,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("get parameters from properties file when it is defined") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set(CONFIG_PROPERTY_PROVIDER_BUILDER_MODULES, "com.coxautodata.waimak.configuration.TestPropertyProvider")
TestPropertyProvider.props.setProperty("test.string", "test1")
TestPropertyProvider.getPropertyProvider(context).get("test.string") should be(Some("test1"))
Expand All @@ -228,6 +264,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("get parameters from properties file when it is defined but the property doesn't exist") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set(CONFIG_PROPERTY_PROVIDER_BUILDER_MODULES, "com.coxautodata.waimak.configuration.TestPropertyProvider")
TestPropertyProvider.props.clear()
TestPropertyProvider.getPropertyProvider(context).get("test.string") should be(None)
Expand All @@ -239,6 +276,7 @@ class TestCaseClassConfigParser extends SparkSpec {
it("define a property provider module that doesn't exist") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set(CONFIG_PROPERTY_PROVIDER_BUILDER_MODULES, "com.coxautodata.waimak.configuration.MissingModule")
intercept[ScalaReflectionException] {
CaseClassConfigParser[PropertiesTest](context, "test.")
Expand All @@ -249,6 +287,7 @@ class TestCaseClassConfigParser extends SparkSpec {

val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "")
conf.set(CONFIG_PROPERTY_PROVIDER_BUILDER_MODULES, "com.coxautodata.waimak.configuration.TestPropertyProvider,com.coxautodata.waimak.configuration.TestPropertyProvider2")

intercept[NoSuchElementException] {
Expand All @@ -265,6 +304,27 @@ class TestCaseClassConfigParser extends SparkSpec {
CaseClassConfigParser[PropertiesTest](context, "test.") should be(PropertiesTest("0"))
}

it("strip spark prefix off parameter but not of map and properties") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set("spark.arg1", "1")
conf.set(CONFIG_PROPERTY_PROVIDER_BUILDER_MODULES, "com.coxautodata.waimak.configuration.TestPropertyProvider")
TestPropertyProvider.props.setProperty("arg3", "3")
TestPropertyProvider.getPropertyProvider(context).get("arg3") should be(Some("3"))
CaseClassConfigParser[PrefixTest](context, "", Map("arg2" -> "2")) should be(PrefixTest("1", "2", "3"))
}

it("strip custom spark prefix off parameter but not of map and properties") {
val context = SparkFlowContext(sparkSession)
val conf: RuntimeConfig = sparkSession.conf
conf.set(SPARK_CONF_PROPERTY_PREFIX, "spark.test.")
conf.set("spark.test.args.arg1", "1")
conf.set(CONFIG_PROPERTY_PROVIDER_BUILDER_MODULES, "com.coxautodata.waimak.configuration.TestPropertyProvider")
TestPropertyProvider.props.setProperty("args.arg3", "3")
TestPropertyProvider.getPropertyProvider(context).get("args.arg3") should be(Some("3"))
CaseClassConfigParser[PrefixTest](context, "args.", Map("args.arg2" -> "2")) should be(PrefixTest("1", "2", "3"))
}

}

case class ParseNestedClassTest(string: String = "")
Expand Down
Expand Up @@ -24,11 +24,11 @@ object EnvironmentManager {
}

def performEnvironmentAction(sparkSession: SparkSession): Unit = {
val environmentAction = CaseClassConfigParser[EnvironmentAction](SparkFlowContext(sparkSession), "spark.waimak.environment.")
val environmentAction = CaseClassConfigParser[EnvironmentAction](SparkFlowContext(sparkSession), "waimak.environment.")
val app = MultiAppRunner.instantiateApp(environmentAction.appClassName)
environmentAction.action.toLowerCase() match {
case "create" => app.createEnv(sparkSession, "spark.waimak.environment.")
case "cleanup" => app.cleanupEnv(sparkSession, "spark.waimak.environment.")
case "create" => app.createEnv(sparkSession, "waimak.environment.")
case "cleanup" => app.cleanupEnv(sparkSession, "waimak.environment.")
case _ => throw new UnsupportedOperationException(s"Unsupported environment action: ${environmentAction.action}")
}
}
Expand Down
Expand Up @@ -47,9 +47,9 @@ object MultiAppRunner {
}

def runAll(sparkSession: SparkSession): Unit = {
val allApps = CaseClassConfigParser[AllApps](SparkFlowContext(sparkSession), "spark.waimak.apprunner.")
val allApps = CaseClassConfigParser[AllApps](SparkFlowContext(sparkSession), "waimak.apprunner.")
val allAppsConfig = allApps.apps.map(appName => appName ->
CaseClassConfigParser[SingleAppConfig](SparkFlowContext(sparkSession), s"spark.waimak.apprunner.$appName."))
CaseClassConfigParser[SingleAppConfig](SparkFlowContext(sparkSession), s"waimak.apprunner.$appName."))
val executor = Waimak.sparkExecutor()
val finalFlow = allAppsConfig.foldLeft(Waimak.sparkFlow(sparkSession))((flow, appConfig) => addAppToFlow(flow, appConfig._1, appConfig._2))
executor.execute(finalFlow)
Expand All @@ -64,7 +64,7 @@ object MultiAppRunner {
def addAppToFlow(flow: SparkDataFlow, appName: String, appConfig: SingleAppConfig): SparkDataFlow = {
val instantiatedApp = instantiateApp(appConfig.appClassName)
flow.executeApp(appConfig.dependencies: _*)(
instantiatedApp.runSparkApp(_, s"spark.waimak.environment.$appName."), appName)
instantiatedApp.runSparkApp(_, s"waimak.environment.$appName."), appName)
}

}
Expand Down
@@ -1,6 +1,7 @@
package com.coxautodata.waimak.spark.app

import com.coxautodata.waimak.configuration.CaseClassConfigParser
import com.coxautodata.waimak.dataflow.spark.SparkFlowContext
import org.apache.spark.sql.SparkSession

import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -69,7 +70,7 @@ abstract class SparkApp[E <: Env : TypeTag] {
* @param envPrefix the prefix for keys in the SparkConf needed by the [[Env]] implementation
* @return a parsed case class of type [[E]]
*/
def parseEnv(sparkSession: SparkSession, envPrefix: String): E = CaseClassConfigParser.fromMap[E](sparkSession.conf.getAll, envPrefix)
def parseEnv(sparkSession: SparkSession, envPrefix: String): E = CaseClassConfigParser(SparkFlowContext(sparkSession), envPrefix)

/**
* Default Spark configuration values to use for the application
Expand Down

0 comments on commit 7edaeec

Please sign in to comment.