Skip to content

Commit

Permalink
Acxiom#345 Added ability for StepGroup pipelineId attribute to contai…
Browse files Browse the repository at this point in the history
…n a mapping in addition to the id. Remove some unused classes and performed additional cleanup.
  • Loading branch information
dafreels committed Feb 10, 2023
1 parent b5c3c4b commit 150ec31
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 54 deletions.
34 changes: 1 addition & 33 deletions metalus-core/src/main/scala/com/acxiom/metalus/Pipeline.scala
@@ -1,6 +1,5 @@
package com.acxiom.metalus

import com.acxiom.metalus.ExecutionEvaluationResult.ExecutionEvaluationResult
import com.acxiom.metalus.audits.ExecutionAudit
import com.acxiom.metalus.context.ContextManager
import com.acxiom.metalus.parser.JsonParser
Expand Down Expand Up @@ -589,46 +588,15 @@ case class RestartPoints(steps: List[StepState])
*/
case class StepState(key: PipelineStateInfo, status: String)

// TODO Reevaluate the classes below to see if they are needed in the new world.

/**
* This class represents the result of executing a list of pipelines.
*
* @param pipelineContext The final pipeline context when execution stopped
* @param success Boolean flag indicating whether pipelines ran to completion (true) or stopped due to an error or message (false)
* @param paused Flag indicating whether the "failure" was actually a pause
* @param exception The original exception
* @param runStatus Status indicating whether the calling execution should continue execution.
*/
case class PipelineExecutionResult(pipelineContext: PipelineContext,
success: Boolean,
paused: Boolean,
exception: Option[Throwable],
runStatus: ExecutionEvaluationResult = ExecutionEvaluationResult.RUN)

// TODO [2.0 Review] Remove in 2.0.0
object ExecutionEvaluationResult extends Enumeration {
type ExecutionEvaluationResult = Value
val RUN, SKIP, STOP = Value
}

///**
// * Contains the current pipeline and step information
// *
// * @param stepId The current step being executed
// * @param pipelineId The current pipeline being executed
// * @param executionId The current execution being executed
// * @param groupId The current group being executed
// */
//case class PipelineExecutionInfo(stepId: Option[String] = None,
// pipelineId: Option[String] = None,
// executionId: Option[String] = None,
// groupId: Option[String] = None) {
// def displayPipelineStepString: String = {
// s"pipeline ${pipelineId.getOrElse("")} step ${stepId.getOrElse("")}"
// }
//
// def displayString: String = {
// s"execution ${executionId.getOrElse("")} group ${groupId.getOrElse("")} pipeline ${pipelineId.getOrElse("")} step ${stepId.getOrElse("")}"
// }
//}
exception: Option[Throwable])
Expand Up @@ -25,29 +25,29 @@ object PipelineExecutor {
logger.warn(s"Stopping pipeline because of a skip exception: ${see.getMessage}")
pipelineListener.applicationStopped(initialContext).getOrElse(initialContext)
sessionContext.completeSession("SKIPPED")
PipelineExecutionResult(see.context.getOrElse(initialContext), success = true, paused = false, None, ExecutionEvaluationResult.SKIP)
PipelineExecutionResult(see.context.getOrElse(initialContext), success = true, paused = false, None)
case fe: ForkedPipelineStepException =>
fe.exceptions.foreach(entry =>
logger.error(s"Execution Id ${entry._1} had an error: ${entry._2.getMessage}", entry._2))
pipelineListener.applicationStopped(initialContext).getOrElse(initialContext)
sessionContext.completeSession("ERROR")
PipelineExecutionResult(fe.context.getOrElse(initialContext), success = false, paused = false, Some(fe), ExecutionEvaluationResult.STOP)
PipelineExecutionResult(fe.context.getOrElse(initialContext), success = false, paused = false, Some(fe))
case se: SplitStepException =>
se.exceptions.foreach(entry =>
logger.error(s"Execution Id ${entry._1} had an error: ${entry._2.getMessage}", entry._2))
pipelineListener.applicationStopped(initialContext).getOrElse(initialContext)
sessionContext.completeSession("ERROR")
PipelineExecutionResult(se.context.getOrElse(initialContext), success = false, paused = false, Some(se), ExecutionEvaluationResult.STOP)
PipelineExecutionResult(se.context.getOrElse(initialContext), success = false, paused = false, Some(se))
case p: PauseException =>
logger.info(s"Paused pipeline flow at ${p.pipelineProgress.getOrElse(PipelineStateInfo("")).displayPipelineStepString}. ${p.message}")
pipelineListener.applicationComplete(initialContext).getOrElse(initialContext)
sessionContext.completeSession("PAUSED")
PipelineExecutionResult(p.context.getOrElse(initialContext), success = false, paused = true, Some(p), ExecutionEvaluationResult.STOP)
PipelineExecutionResult(p.context.getOrElse(initialContext), success = false, paused = true, Some(p))
case pse: PipelineStepException =>
logger.error(s"Stopping pipeline because of an exception", pse)
pipelineListener.applicationStopped(initialContext).getOrElse(initialContext)
sessionContext.completeSession("ERROR")
PipelineExecutionResult(pse.context.getOrElse(initialContext), success = false, paused = false, Some(pse), ExecutionEvaluationResult.STOP)
PipelineExecutionResult(pse.context.getOrElse(initialContext), success = false, paused = false, Some(pse))
}
}
}
Expand Up @@ -3,7 +3,7 @@ package com.acxiom.metalus.audits
import com.acxiom.metalus.PipelineStateInfo
import com.acxiom.metalus.audits.AuditType.AuditType

/** TODO Use the stateinfo as the key and remove the graph
/**
* Creates a new Audit with the appropriate information,
*
* @param key The pipeline state associated with this audit
Expand Down
Expand Up @@ -137,14 +137,20 @@ case class StepGroupFlow(pipeline: Pipeline,
}

private def getPipeline(step: FlowStep, parameterValues: Map[String, Any], pipelineContext: PipelineContext): Pipeline = {
val pipelineId = step match {
case group: PipelineStepGroup if group.pipelineId.isDefined => group.pipelineId
val (pipelineId, pipeline) = step match {
case group: PipelineStepGroup if group.pipelineId.isDefined =>
val p = Parameter(Some("text"), Some("pipelineId"), value = group.pipelineId)
val pipeline = pipelineContext.parameterMapper.mapParameter(p, pipelineContext)
pipeline match {
case pipeline1: Pipeline => (None, Some(pipeline1))
case _ => (Some(pipeline.toString), None)
}
case _ => if (step.params.get.exists(_.name.getOrElse("") == "pipelineId")) {
Some(step.params.get.find(_.name.getOrElse("") == "pipelineId").get.value.getOrElse("").toString)
(Some(step.params.get.find(_.name.getOrElse("") == "pipelineId").get.value.getOrElse("").toString), None)
} else if (parameterValues.contains("pipelineId")) {
parameterValues.get("pipelineId").asInstanceOf[Option[String]]
(parameterValues.get("pipelineId").asInstanceOf[Option[String]], None)
} else {
None
(None, None)
}
}

Expand All @@ -153,6 +159,8 @@ case class StepGroupFlow(pipeline: Pipeline,
.getOrElse(throw PipelineException(message = Some(s"Unable to retrieve required step group id ${pipelineId.get}"),
context = Some(pipelineContext),
pipelineProgress = pipelineContext.currentStateInfo))
} else if (pipeline.isDefined) {
pipeline.get
} else {
parameterValues("pipeline").asInstanceOf[Pipeline]
}
Expand Down
Expand Up @@ -86,14 +86,12 @@ class StepGroupStepTests extends AnyFunSpec {
validateResults(executionResult.pipelineContext, "globalOne", "gtwo", "3")
}

it("Should execute step with pipelineMappings pulled from PipelineManager") {
it("Should execute step with pipeline pulled from PipelineManager") {
TestHelper.pipelineListener = PipelineListener()
val mappingPipelineStepTwo = PipelineStepGroup(Some("PIPELINE_STEP_TWO"), None, None, Some("step-group"),
Some(List(Parameter(Some("text"), Some("pipelineId"),
value = Some("subPipelineId")),
Parameter(Some("object"), Some("pipelineMappings"),
Some(List(Parameter(Some("object"), Some("pipelineMappings"),
value = Some(Map[String, Any]("globalOne" -> "globalOne", "globalTwo" -> "gtwo", "globalThree" -> "3"))))),
pipelineId = None, nextStepId = Some("PIPELINE_STEP_THREE"))
pipelineId = Some("subPipelineId"), nextStepId = Some("PIPELINE_STEP_THREE"))

val context = TestHelper.generatePipelineContext()
.copy(pipelineManager = PipelineManager(List(subPipeline)))
Expand All @@ -103,7 +101,7 @@ class StepGroupStepTests extends AnyFunSpec {
validateResults(executionResult.pipelineContext, "globalOne", "gtwo", "3")
}

it("Should execute step with pipelineMappings pulled from PipelineManager using special character") {
it("Should execute step with pipeline pulled from PipelineManager using special character") {
TestHelper.pipelineListener = PipelineListener()
val mappingPipelineStepTwo = PipelineStepGroup(Some("PIPELINE_STEP_TWO"), None, None, Some("step-group"),
Some(List(Parameter(Some("text"), Some("pipeline"),
Expand All @@ -123,13 +121,11 @@ class StepGroupStepTests extends AnyFunSpec {
it("Should execute step with result parameter") {
TestHelper.pipelineListener = PipelineListener()
val mappingPipelineStepTwo = PipelineStepGroup(Some("PIPELINE_STEP_TWO"), None, None, Some("step-group"),
Some(List(Parameter(Some("text"), Some("pipeline"),
value = Some("!subPipeline")),
Parameter(Some("boolean"), Some("useParentGlobals"), value = Some(true)),
Some(List(Parameter(Some("boolean"), Some("useParentGlobals"), value = Some(true)),
Parameter(Some("object"), Some("pipelineMappings"),
value = Some(Map[String, Any]("globalTwo" -> "gtwo", "globalThree" -> "3"))),
Parameter(Some("result"), Some("output"), None, None, Some("@SUB_PIPELINE_STEP_TWO")))),
pipelineId = None, nextStepId = Some("PIPELINE_STEP_THREE"))
pipelineId = Some("!subPipeline"), nextStepId = Some("PIPELINE_STEP_THREE"))

val context = TestHelper.generatePipelineContext()
.setGlobal("subPipeline", subPipeline)
Expand Down

0 comments on commit 150ec31

Please sign in to comment.