diff --git a/metalus-common/src/main/scala/com/acxiom/pipeline/steps/SparkConfigurationSteps.scala b/metalus-spark/src/main/scala/com/acxiom/metalus/spark/steps/SparkConfigurationSteps.scala similarity index 84% rename from metalus-common/src/main/scala/com/acxiom/pipeline/steps/SparkConfigurationSteps.scala rename to metalus-spark/src/main/scala/com/acxiom/metalus/spark/steps/SparkConfigurationSteps.scala index 0eb86b3f..71bd6111 100644 --- a/metalus-common/src/main/scala/com/acxiom/pipeline/steps/SparkConfigurationSteps.scala +++ b/metalus-spark/src/main/scala/com/acxiom/metalus/spark/steps/SparkConfigurationSteps.scala @@ -1,7 +1,8 @@ -package com.acxiom.pipeline.steps +package com.acxiom.metalus.spark.steps -import com.acxiom.pipeline.PipelineContext -import com.acxiom.pipeline.annotations.{StepFunction, StepObject, StepParameter, StepParameters} +import com.acxiom.metalus.PipelineContext +import com.acxiom.metalus.annotations.{StepFunction, StepObject, StepParameter, StepParameters} +import com.acxiom.metalus.spark._ import scala.annotation.tailrec @@ -27,7 +28,7 @@ object SparkConfigurationSteps { "keySeparator" -> StepParameter(None, Some(false), Some("__"), None, None, None, Some("String that will be replaced with a period character")))) def setLocalProperties(properties: Map[String, Any], keySeparator: Option[String] = None, pipelineContext: PipelineContext): Unit = { - val sc = pipelineContext.sparkSession.get.sparkContext + val sc = pipelineContext.sparkSession.sparkContext cleanseMap(properties, keySeparator).foreach { case (key, Some(value)) => sc.setLocalProperty(key, value.toString) case (key, None) => sc.setLocalProperty(key, None.orNull) @@ -45,7 +46,7 @@ object SparkConfigurationSteps { Some("String that will be replaced with a period character")))) def setHadoopConfigurationProperties(properties: Map[String, Any], keySeparator: Option[String] = None, pipelineContext: PipelineContext): Unit = { - val hc = pipelineContext.sparkSession.get.sparkContext.hadoopConfiguration + val hc = pipelineContext.sparkSession.sparkContext.hadoopConfiguration cleanseMap(properties, keySeparator).foreach { case (key, Some(value)) => hc.set(key, value.toString) case (key, None) => hc.unset(key) @@ -59,10 +60,8 @@ object SparkConfigurationSteps { "Pipeline", "Spark") @StepParameters(Map("key" -> StepParameter(None, Some(true), None, None, None, None, Some("The name of the property to set")), "value" -> StepParameter(None, Some(true), None, None, None, None, Some("The value to set")))) - def setHadoopConfigurationProperty(key: String, value: Any, - pipelineContext: PipelineContext): Unit = { + def setHadoopConfigurationProperty(key: String, value: Any, pipelineContext: PipelineContext): Unit = setHadoopConfigurationProperties(Map(key -> value), None, pipelineContext) - } @StepFunction("b7373f02-4d1e-44cf-a9c9-315a5c1ccecc", "Set Job Group", @@ -74,16 +73,14 @@ object SparkConfigurationSteps { Some("When true, will trigger Thread.interrupt getting called on executor threads")))) def setJobGroup(groupId: String, description: String, interruptOnCancel: Option[Boolean] = None, pipelineContext: PipelineContext): Unit = { - pipelineContext.sparkSession.get.sparkContext.setJobGroup(groupId, description, interruptOnCancel.getOrElse(false)) + pipelineContext.sparkSession.sparkContext.setJobGroup(groupId, description, interruptOnCancel.getOrElse(false)) } @StepFunction("7394ff4d-f74d-4c9f-a55c-e0fd398fa264", "Clear Job Group", "Clear the current thread's job group", "Pipeline", "Spark") - def clearJobGroup(pipelineContext: PipelineContext): Unit = { - pipelineContext.sparkSession.get.sparkContext.clearJobGroup() - } + def clearJobGroup(pipelineContext: PipelineContext): Unit = pipelineContext.sparkSession.sparkContext.clearJobGroup() @tailrec @@ -96,7 +93,7 @@ object SparkConfigurationSteps { private def cleanseMap(map: Map[String, Any], keySeparator: Option[String] = None): Map[String, Any] = { val sep = keySeparator.getOrElse("__") - map.map{ case (key, value) => + map.map { case (key, value) => key.replaceAllLiterally(sep, ".") -> unwrapOptions(value) } } diff --git a/metalus-common/src/test/scala/com/acxiom/pipeline/steps/SparkConfigurationStepsTests.scala b/metalus-spark/src/test/scala/com/acxiom/metalus/spark/steps/SparkConfigurationStepsTests.scala similarity index 74% rename from metalus-common/src/test/scala/com/acxiom/pipeline/steps/SparkConfigurationStepsTests.scala rename to metalus-spark/src/test/scala/com/acxiom/metalus/spark/steps/SparkConfigurationStepsTests.scala index 8b877867..940e1b23 100644 --- a/metalus-common/src/test/scala/com/acxiom/pipeline/steps/SparkConfigurationStepsTests.scala +++ b/metalus-spark/src/test/scala/com/acxiom/metalus/spark/steps/SparkConfigurationStepsTests.scala @@ -1,41 +1,36 @@ -package com.acxiom.pipeline.steps +package com.acxiom.metalus.spark.steps -import com.acxiom.pipeline._ +import com.acxiom.metalus.context.ContextManager +import com.acxiom.metalus.spark.SparkSessionContext +import com.acxiom.metalus.{ClassInfo, DefaultPipelineListener, PipelineContext, PipelineStepMapper} import org.apache.commons.io.FileUtils -import org.apache.log4j.{Level, Logger} -import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession -import org.scalatest.{BeforeAndAfterAll, FunSpec, GivenWhenThen} +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} import java.nio.file.{Files, Path} import scala.language.postfixOps -class SparkConfigurationStepsTests extends FunSpec with BeforeAndAfterAll with GivenWhenThen { +class SparkConfigurationStepsTests extends AnyFunSpec with BeforeAndAfterAll with GivenWhenThen { private val MASTER = "local[2]" private val APPNAME = "spark-config-steps-spark" - private var sparkConf: SparkConf = _ private var sparkSession: SparkSession = _ private val sparkLocalDir: Path = Files.createTempDirectory("sparkLocal") private var pipelineContext: PipelineContext = _ override def beforeAll(): Unit = { - Logger.getLogger("org.apache.spark").setLevel(Level.WARN) - Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) - Logger.getLogger("com.acxiom.pipeline").setLevel(Level.DEBUG) + val contextManager = new ContextManager(Map("spark" -> + ClassInfo(Some("com.acxiom.metalus.spark.SparkSessionContext"), + Some(Map[String, Any]("sparkConfOptions" -> Map[String, Any]( + "setOptions" -> List(Map("name" -> "spark.local.dir", "value" -> sparkLocalDir.toFile.getAbsolutePath))), + "appName" -> APPNAME, + "sparkMaster" -> MASTER)))), + Map()) + sparkSession = contextManager.getContext("spark").get.asInstanceOf[SparkSessionContext].sparkSession - sparkConf = new SparkConf() - .setMaster(MASTER) - .setAppName(APPNAME) - .set("spark.local.dir", sparkLocalDir.toFile.getAbsolutePath) - sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - - pipelineContext = PipelineContext(Some(sparkConf), Some(sparkSession), Some(Map[String, Any]()), - PipelineSecurityManager(), - PipelineParameters(List(PipelineParameter("0", Map[String, Any]()), PipelineParameter("1", Map[String, Any]()))), - Some(List("com.acxiom.pipeline.steps")), - PipelineStepMapper(), - Some(DefaultPipelineListener()), - Some(sparkSession.sparkContext.collectionAccumulator[PipelineStepMessage]("stepMessages"))) + pipelineContext = PipelineContext(Some(Map[String, Any]()), + List(), Some(List("com.acxiom.metalus.spark.steps")), PipelineStepMapper(), + Some(DefaultPipelineListener()), contextManager = contextManager) } override def afterAll(): Unit = { @@ -43,7 +38,6 @@ class SparkConfigurationStepsTests extends FunSpec with BeforeAndAfterAll with G sparkSession.sparkContext.stop() sparkSession.stop() - Logger.getRootLogger.setLevel(Level.INFO) // cleanup spark directories FileUtils.deleteDirectory(sparkLocalDir.toFile) }