Skip to content

Commit

Permalink
Acxiom#353 Migrated SparkConfigurationSteps to spark project.
Browse files Browse the repository at this point in the history
  • Loading branch information
dafreels committed Feb 22, 2023
1 parent 7c61d61 commit 8ce48cb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 37 deletions.
@@ -1,7 +1,8 @@
package com.acxiom.pipeline.steps
package com.acxiom.metalus.spark.steps

import com.acxiom.pipeline.PipelineContext
import com.acxiom.pipeline.annotations.{StepFunction, StepObject, StepParameter, StepParameters}
import com.acxiom.metalus.PipelineContext
import com.acxiom.metalus.annotations.{StepFunction, StepObject, StepParameter, StepParameters}
import com.acxiom.metalus.spark._

import scala.annotation.tailrec

Expand All @@ -27,7 +28,7 @@ object SparkConfigurationSteps {
"keySeparator" -> StepParameter(None, Some(false), Some("__"), None, None, None,
Some("String that will be replaced with a period character"))))
def setLocalProperties(properties: Map[String, Any], keySeparator: Option[String] = None, pipelineContext: PipelineContext): Unit = {
val sc = pipelineContext.sparkSession.get.sparkContext
val sc = pipelineContext.sparkSession.sparkContext
cleanseMap(properties, keySeparator).foreach {
case (key, Some(value)) => sc.setLocalProperty(key, value.toString)
case (key, None) => sc.setLocalProperty(key, None.orNull)
Expand All @@ -45,7 +46,7 @@ object SparkConfigurationSteps {
Some("String that will be replaced with a period character"))))
def setHadoopConfigurationProperties(properties: Map[String, Any], keySeparator: Option[String] = None,
pipelineContext: PipelineContext): Unit = {
val hc = pipelineContext.sparkSession.get.sparkContext.hadoopConfiguration
val hc = pipelineContext.sparkSession.sparkContext.hadoopConfiguration
cleanseMap(properties, keySeparator).foreach {
case (key, Some(value)) => hc.set(key, value.toString)
case (key, None) => hc.unset(key)
Expand All @@ -59,10 +60,8 @@ object SparkConfigurationSteps {
"Pipeline", "Spark")
@StepParameters(Map("key" -> StepParameter(None, Some(true), None, None, None, None, Some("The name of the property to set")),
"value" -> StepParameter(None, Some(true), None, None, None, None, Some("The value to set"))))
def setHadoopConfigurationProperty(key: String, value: Any,
pipelineContext: PipelineContext): Unit = {
def setHadoopConfigurationProperty(key: String, value: Any, pipelineContext: PipelineContext): Unit =
setHadoopConfigurationProperties(Map(key -> value), None, pipelineContext)
}

@StepFunction("b7373f02-4d1e-44cf-a9c9-315a5c1ccecc",
"Set Job Group",
Expand All @@ -74,16 +73,14 @@ object SparkConfigurationSteps {
Some("When true, will trigger Thread.interrupt getting called on executor threads"))))
def setJobGroup(groupId: String, description: String, interruptOnCancel: Option[Boolean] = None,
pipelineContext: PipelineContext): Unit = {
pipelineContext.sparkSession.get.sparkContext.setJobGroup(groupId, description, interruptOnCancel.getOrElse(false))
pipelineContext.sparkSession.sparkContext.setJobGroup(groupId, description, interruptOnCancel.getOrElse(false))
}

@StepFunction("7394ff4d-f74d-4c9f-a55c-e0fd398fa264",
"Clear Job Group",
"Clear the current thread's job group",
"Pipeline", "Spark")
def clearJobGroup(pipelineContext: PipelineContext): Unit = {
pipelineContext.sparkSession.get.sparkContext.clearJobGroup()
}
def clearJobGroup(pipelineContext: PipelineContext): Unit = pipelineContext.sparkSession.sparkContext.clearJobGroup()


@tailrec
Expand All @@ -96,7 +93,7 @@ object SparkConfigurationSteps {

private def cleanseMap(map: Map[String, Any], keySeparator: Option[String] = None): Map[String, Any] = {
val sep = keySeparator.getOrElse("__")
map.map{ case (key, value) =>
map.map { case (key, value) =>
key.replaceAllLiterally(sep, ".") -> unwrapOptions(value)
}
}
Expand Down
@@ -1,49 +1,43 @@
package com.acxiom.pipeline.steps
package com.acxiom.metalus.spark.steps

import com.acxiom.pipeline._
import com.acxiom.metalus.context.ContextManager
import com.acxiom.metalus.spark.SparkSessionContext
import com.acxiom.metalus.{ClassInfo, DefaultPipelineListener, PipelineContext, PipelineStepMapper}
import org.apache.commons.io.FileUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.{BeforeAndAfterAll, FunSpec, GivenWhenThen}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}

import java.nio.file.{Files, Path}
import scala.language.postfixOps

class SparkConfigurationStepsTests extends FunSpec with BeforeAndAfterAll with GivenWhenThen {
class SparkConfigurationStepsTests extends AnyFunSpec with BeforeAndAfterAll with GivenWhenThen {
private val MASTER = "local[2]"
private val APPNAME = "spark-config-steps-spark"
private var sparkConf: SparkConf = _
private var sparkSession: SparkSession = _
private val sparkLocalDir: Path = Files.createTempDirectory("sparkLocal")
private var pipelineContext: PipelineContext = _

override def beforeAll(): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("com.acxiom.pipeline").setLevel(Level.DEBUG)
val contextManager = new ContextManager(Map("spark" ->
ClassInfo(Some("com.acxiom.metalus.spark.SparkSessionContext"),
Some(Map[String, Any]("sparkConfOptions" -> Map[String, Any](
"setOptions" -> List(Map("name" -> "spark.local.dir", "value" -> sparkLocalDir.toFile.getAbsolutePath))),
"appName" -> APPNAME,
"sparkMaster" -> MASTER)))),
Map())
sparkSession = contextManager.getContext("spark").get.asInstanceOf[SparkSessionContext].sparkSession

sparkConf = new SparkConf()
.setMaster(MASTER)
.setAppName(APPNAME)
.set("spark.local.dir", sparkLocalDir.toFile.getAbsolutePath)
sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

pipelineContext = PipelineContext(Some(sparkConf), Some(sparkSession), Some(Map[String, Any]()),
PipelineSecurityManager(),
PipelineParameters(List(PipelineParameter("0", Map[String, Any]()), PipelineParameter("1", Map[String, Any]()))),
Some(List("com.acxiom.pipeline.steps")),
PipelineStepMapper(),
Some(DefaultPipelineListener()),
Some(sparkSession.sparkContext.collectionAccumulator[PipelineStepMessage]("stepMessages")))
pipelineContext = PipelineContext(Some(Map[String, Any]()),
List(), Some(List("com.acxiom.metalus.spark.steps")), PipelineStepMapper(),
Some(DefaultPipelineListener()), contextManager = contextManager)
}

override def afterAll(): Unit = {
sparkSession.sparkContext.cancelAllJobs()
sparkSession.sparkContext.stop()
sparkSession.stop()

Logger.getRootLogger.setLevel(Level.INFO)
// cleanup spark directories
FileUtils.deleteDirectory(sparkLocalDir.toFile)
}
Expand Down

0 comments on commit 8ce48cb

Please sign in to comment.