Skip to content

Commit

Permalink
Merge pull request #259 from djfreels/develop
Browse files Browse the repository at this point in the history
Adding SparkConfigurationSteps
  • Loading branch information
dafreels committed Oct 15, 2021
2 parents 278bbbe + eeab70f commit efd0354
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 51 deletions.
2 changes: 1 addition & 1 deletion manual_tests/testData/metalus-common/steps.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,21 @@ object S3Utilities {
role: Option[String] = None,
partition: Option[String] = None,
pipelineContext: PipelineContext): Unit = {
if (accessKeyId.isDefined && secretAccessKey.isDefined) {
val keyAndSecret = accessKeyId.isDefined && secretAccessKey.isDefined
val roleBased = role.isDefined && accountId.isDefined
if (keyAndSecret || roleBased) {
logger.debug(s"Setting up S3 authorization for $path")
val protocol = S3Utilities.deriveProtocol(path)
val sc = pipelineContext.sparkSession.get.sparkContext
if (accessKeyId.isDefined && secretAccessKey.isDefined) {
sc.hadoopConfiguration.unset("spark.hadoop.fs.s3a.aws.credentials.provider")
sc.hadoopConfiguration.unset("fs.s3a.aws.credentials.provider")
sc.hadoopConfiguration.set(s"fs.$protocol.awsAccessKeyId", accessKeyId.get)
sc.hadoopConfiguration.set(s"fs.$protocol.awsSecretAccessKey", secretAccessKey.get)
sc.hadoopConfiguration.set(s"fs.$protocol.access.key", accessKeyId.get)
sc.hadoopConfiguration.set(s"fs.$protocol.secret.key", secretAccessKey.get)
}
if(role.isDefined && accountId.isDefined && protocol == "s3a") {
if(roleBased && protocol == "s3a") {
sc.hadoopConfiguration.set("fs.s3a.assumed.role.arn", buildARN(accountId.get, role.get, partition))
sc.hadoopConfiguration.setStrings("spark.hadoop.fs.s3a.aws.credentials.provider",
s"org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider",
Expand Down
39 changes: 39 additions & 0 deletions metalus-common/docs/sparkconfigurationstepds.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[Documentation Home](../../docs/readme.md) | [Common Home](../readme.md)

# SparkConfigurationSteps
This object exposes some basic functions to set configurations on the spark context at run time.

##Set Spark Local Property
Set a local property on the current thread.

* **key** - The key of the property to set.
* **value** - The value to set. Use None to remove the property.

##Set Spark Local Properties
Set a local property on the current thread for each entry in the properties map.

* **properties** - A Map where each entry will be set as a key/value pair.
* **keySeparator** - Replaces all occurrences of this string with periods in the keys. Default is __.

##Set Hadoop Configuration Property
Set a property on the hadoop configuration.

* **key** - The key of the property to set.
* **value** - The value to set. Use None to remove the property.

##Set Hadoop Configuration Properties
Set a property on the hadoop configuration for each entry in the properties map.

* **properties** - A Map where each entry will be set as a key/value pair.
* **keySeparator** - Replaces all occurrences of this string with periods in the keys. Default is __.

##Set Job Group
Set a job group id and description to group all upcoming jobs on the current thread.

* **groupId** - The name of the group.
* **description** - Description of the group.
* **interruptOnCancel** - When true, then job cancellation will result in Thread.interrupt()
getting called on the job's executor threads

##Clear Job Group
Clears the current job group.
1 change: 1 addition & 0 deletions metalus-common/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ using Spark.
* [QuerySteps](docs/querysteps.md)
* [ScalaSteps](docs/scalascriptsteps.md)
* [SFTPSteps](docs/sftpsteps.md)
* [SparkConfigurationSteps](docs/sparkconfigurationstepds.md)
* [StringSteps](docs/stringsteps.md)
* [TransformationSteps](docs/transformationsteps.md)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.acxiom.pipeline.steps

import com.acxiom.pipeline.PipelineContext
import com.acxiom.pipeline.annotations.{StepFunction, StepObject, StepParameter, StepParameters}

import scala.annotation.tailrec

@StepObject
object SparkConfigurationSteps {

@StepFunction("5c4d2d01-da85-4e2e-a551-f5a65f83653a",
"Set Spark Local Property",
"Set a property on the spark context.",
"Pipeline", "Spark")
@StepParameters(Map("key" -> StepParameter(None, Some(true), None, None, None, None, Some("The name of the property to set")),
"value" -> StepParameter(None, Some(true), None, None, None, None, Some("The value to set"))))
def setLocalProperty(key: String, value: Any, pipelineContext: PipelineContext): Unit = {
setLocalProperties(Map(key -> value), None, pipelineContext)
}

@StepFunction("0b86b314-2657-4392-927c-e555af56b415",
"Set Spark Local Properties",
"Set each property on the spark context.",
"Pipeline", "Spark")
@StepParameters(Map("properties" -> StepParameter(None, Some(true), None, None, None, None,
Some("Map representing local properties to set")),
"keySeparator" -> StepParameter(None, Some(false), Some("__"), None, None, None,
Some("String that will be replaced with a period character"))))
def setLocalProperties(properties: Map[String, Any], keySeparator: Option[String] = None, pipelineContext: PipelineContext): Unit = {
val sc = pipelineContext.sparkSession.get.sparkContext
cleanseMap(properties, keySeparator).foreach {
case (key, Some(value)) => sc.setLocalProperty(key, value.toString)
case (key, None) => sc.setLocalProperty(key, None.orNull)
case (key, value) => sc.setLocalProperty(key, value.toString)
}
}

@StepFunction("c8c82365-e078-4a2a-99b8-0c0e20d8102d",
"Set Hadoop Configuration Properties",
"Set each property on the hadoop configuration.",
"Pipeline", "Spark")
@StepParameters(Map("properties" -> StepParameter(None, Some(true), None, None, None, None,
Some("Map representing local properties to set")),
"keySeparator" -> StepParameter(None, Some(false), Some("__"), None, None, None,
Some("String that will be replaced with a period character"))))
def setHadoopConfigurationProperties(properties: Map[String, Any], keySeparator: Option[String] = None,
pipelineContext: PipelineContext): Unit = {
val hc = pipelineContext.sparkSession.get.sparkContext.hadoopConfiguration
cleanseMap(properties, keySeparator).foreach {
case (key, Some(value)) => hc.set(key, value.toString)
case (key, None) => hc.unset(key)
case (key, value) => hc.set(key, value.toString)
}
}

@StepFunction("ea7ea3e0-d1c2-40a2-b2b7-3488489509ca",
"Set Hadoop Configuration Property",
"Set a property on the hadoop configuration.",
"Pipeline", "Spark")
@StepParameters(Map("key" -> StepParameter(None, Some(true), None, None, None, None, Some("The name of the property to set")),
"value" -> StepParameter(None, Some(true), None, None, None, None, Some("The value to set"))))
def setHadoopConfigurationProperty(key: String, value: Any,
pipelineContext: PipelineContext): Unit = {
setHadoopConfigurationProperties(Map(key -> value), None, pipelineContext)
}

@StepFunction("b7373f02-4d1e-44cf-a9c9-315a5c1ccecc",
"Set Job Group",
"Set the current thread's group id and description that will be associated with any jobs.",
"Pipeline", "Spark")
@StepParameters(Map("groupId" -> StepParameter(None, Some(true), None, None, None, None, Some("The name of the group")),
"description" -> StepParameter(None, Some(true), None, None, None, None, Some("Description of the job group")),
"interruptOnCancel" -> StepParameter(None, Some(false), Some("false"), None, None, None,
Some("When true, will trigger Thread.interrupt getting called on executor threads"))))
def setJobGroup(groupId: String, description: String, interruptOnCancel: Option[Boolean] = None,
pipelineContext: PipelineContext): Unit = {
pipelineContext.sparkSession.get.sparkContext.setJobGroup(groupId, description, interruptOnCancel.getOrElse(false))
}

@StepFunction("7394ff4d-f74d-4c9f-a55c-e0fd398fa264",
"Clear Job Group",
"Clear the current thread's job group",
"Pipeline", "Spark")
def clearJobGroup(pipelineContext: PipelineContext): Unit = {
pipelineContext.sparkSession.get.sparkContext.clearJobGroup()
}


@tailrec
private def unwrapOptions(value: Any): Any = {
value match {
case Some(v: Option[_]) => unwrapOptions(v)
case v => v
}
}

private def cleanseMap(map: Map[String, Any], keySeparator: Option[String] = None): Map[String, Any] = {
val sep = keySeparator.getOrElse("__")
map.map{ case (key, value) =>
key.replaceAllLiterally(sep, ".") -> unwrapOptions(value)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSpec}
class CSVStepsTests extends FunSpec with BeforeAndAfterAll {

private val MASTER = "local[2]"
private val APPNAME = "json-steps-spark"
private val APPNAME = "csv-steps-spark"
private var sparkConf: SparkConf = _
private var sparkSession: SparkSession = _
private val sparkLocalDir: Path = Files.createTempDirectory("sparkLocal")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.acxiom.pipeline.steps

import java.nio.file.{Files, Path}

import com.acxiom.pipeline._
import org.apache.commons.io.FileUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.{BeforeAndAfterAll, FunSpec, GivenWhenThen}

class SparkConfigurationStepsTests extends FunSpec with BeforeAndAfterAll with GivenWhenThen {
private val MASTER = "local[2]"
private val APPNAME = "spark-config-steps-spark"
private var sparkConf: SparkConf = _
private var sparkSession: SparkSession = _
private val sparkLocalDir: Path = Files.createTempDirectory("sparkLocal")
private var pipelineContext: PipelineContext = _

override def beforeAll(): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("com.acxiom.pipeline").setLevel(Level.DEBUG)

sparkConf = new SparkConf()
.setMaster(MASTER)
.setAppName(APPNAME)
.set("spark.local.dir", sparkLocalDir.toFile.getAbsolutePath)
sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

pipelineContext = PipelineContext(Some(sparkConf), Some(sparkSession), Some(Map[String, Any]()),
PipelineSecurityManager(),
PipelineParameters(List(PipelineParameter("0", Map[String, Any]()), PipelineParameter("1", Map[String, Any]()))),
Some(List("com.acxiom.pipeline.steps")),
PipelineStepMapper(),
Some(DefaultPipelineListener()),
Some(sparkSession.sparkContext.collectionAccumulator[PipelineStepMessage]("stepMessages")))
}

override def afterAll(): Unit = {
sparkSession.sparkContext.cancelAllJobs()
sparkSession.sparkContext.stop()
sparkSession.stop()

Logger.getRootLogger.setLevel(Level.INFO)
// cleanup spark directories
FileUtils.deleteDirectory(sparkLocalDir.toFile)
}

describe("SparkConfigurationSteps - Basic") {
it("should set a local property") {
try {
SparkConfigurationSteps.setLocalProperty("moo", "moo2", pipelineContext)
assert(sparkSession.sparkContext.getLocalProperty("moo") == "moo2")
} finally {
sparkSession.sparkContext.setLocalProperty("moo", None.orNull)
}
}

it("should unset a local property") {
sparkSession.sparkContext.setLocalProperty("unset", "moo")
SparkConfigurationSteps.setLocalProperty("unset", None, pipelineContext)
assert(Option(sparkSession.sparkContext.getLocalProperty("unset")).isEmpty)
}

it ("should set a local properties") {
try {
SparkConfigurationSteps.setLocalProperties(Map("moo_m1" -> "m1", "moo_m2" -> "m2"), Some("_"), pipelineContext)
assert(sparkSession.sparkContext.getLocalProperty("moo.m1") == "m1")
assert(sparkSession.sparkContext.getLocalProperty("moo.m2") == "m2")
} finally {
sparkSession.sparkContext.setLocalProperty("moo.m1", None.orNull)
sparkSession.sparkContext.setLocalProperty("moo.m2", None.orNull)
}
}

it ("should unset a local properties") {
try {
sparkSession.sparkContext.setLocalProperty("moo.m1", "m1")
sparkSession.sparkContext.setLocalProperty("moo.m2", "m2")
SparkConfigurationSteps.setLocalProperties(Map("moo_m1" -> None, "moo_m2" -> None), Some("_"), pipelineContext)
assert(Option(sparkSession.sparkContext.getLocalProperty("moo.m1")).isEmpty)
assert(Option(sparkSession.sparkContext.getLocalProperty("moo.m2")).isEmpty)
} finally {
sparkSession.sparkContext.setLocalProperty("moo.m1", None.orNull)
sparkSession.sparkContext.setLocalProperty("moo.m2", None.orNull)
}
}
}

describe("SparkConfigurationSteps - Job Group") {
it("should set a job group") {
SparkConfigurationSteps.setJobGroup("group1", "test1", None, pipelineContext)
val df = sparkSession.range(2)
df.count()
df.head()
val group1Ids = sparkSession.sparkContext.statusTracker.getJobIdsForGroup("group1")
assert(group1Ids.length == 2)
SparkConfigurationSteps.setJobGroup("group2", "test2", None, pipelineContext)
df.count()
val group2Ids = sparkSession.sparkContext.statusTracker.getJobIdsForGroup("group2")
assert(group2Ids.length == 1)
}

it("should clear a job group") {
SparkConfigurationSteps.setJobGroup("clear1", "test1", None, pipelineContext)
val df = sparkSession.range(2)
df.count()
df.head()
val group1Ids = sparkSession.sparkContext.statusTracker.getJobIdsForGroup("clear1")
assert(group1Ids.length == 2)
SparkConfigurationSteps.clearJobGroup(pipelineContext)
df.count()
assert(sparkSession.sparkContext.statusTracker.getJobIdsForGroup("clear1").length == 2)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,14 @@ object PipelineExecutorValidations {
pipelineProgress = Some(PipelineExecutionInfo(step.id, pipeline.id)))
}
val forkMethod = step.params.get.find(p => p.name.getOrElse("") == "forkMethod")
if(forkMethod.isDefined && forkMethod.get.value.nonEmpty){
val method = forkMethod.get.value.get.asInstanceOf[String]
if(!(method == "serial" || method == "parallel")){
throw PipelineException(
message = Some(s"Unknown value [$method] for parameter [forkMethod]." +
s" Value must be either [serial] or [parallel] for fork step [${step.id.get}] in pipeline [${pipeline.id.get}]."),
pipelineProgress = Some(PipelineExecutionInfo(step.id, pipeline.id)))
}
} else {

if (forkMethod.flatMap(_.value).isEmpty) {
throw PipelineException(
message = Some(s"Parameter [forkMethod] is required for fork step [${step.id.get}] in pipeline [${pipeline.id.get}]."),
pipelineProgress = Some(PipelineExecutionInfo(step.id, pipeline.id)))
}
val forkByValues = step.params.get.find(p => p.name.getOrElse("") == "forkByValues")
if(forkByValues.isEmpty || forkByValues.get.value.isEmpty){
if(forkByValues.flatMap(_.value).isEmpty){
throw PipelineException(
message = Some(s"Parameter [forkByValues] is required for fork step [${step.id.get}] in pipeline [${pipeline.id.get}]."),
pipelineProgress = Some(PipelineExecutionInfo(step.id, pipeline.id)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ trait PipelineStepMapper {
def mapParameter(parameter: Parameter, pipelineContext: PipelineContext): Any = {
// Get the value/defaultValue for this parameter
val value = getParamValue(parameter)
val returnValue = if (value.isDefined) {
removeOptions(value) match {
val returnValue = value.map(removeOptions).flatMap {
case s: String =>
parameter.`type`.getOrElse("none").toLowerCase match {
case "script" =>
Expand All @@ -239,13 +238,11 @@ trait PipelineStepMapper {
case b: Boolean => Some(b)
case i: Int => Some(i)
case i: BigInt => Some(i.toInt)
case d: Double => Some(d)
case l: List[_] => handleListParameter(l, parameter, pipelineContext)
case m: Map[_, _] => handleMapParameter(m, parameter, pipelineContext)
case t => // Handle other types - This function may need to be reworked to support this so that it can be overridden
throw new RuntimeException(s"Unsupported value type ${t.getClass} for ${parameter.name.getOrElse("unknown")}!")
}
} else {
None
}

// use the first valid (non-empty) value found
Expand Down

0 comments on commit efd0354

Please sign in to comment.