Skip to content

Commit

Permalink
#345 Refactored EngineMeta to store the command in the command attrib…
Browse files Browse the repository at this point in the history
…ute instead of the spark attribute.
  • Loading branch information
dafreels committed Jan 27, 2023
1 parent 1a6f705 commit 06f4cef
Show file tree
Hide file tree
Showing 50 changed files with 1,071 additions and 226 deletions.
20 changes: 10 additions & 10 deletions metalus-core/src/main/resources/metadata/pipelines/copy-file.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
}
],
"engineMeta": {
"spark": "FileManagerSteps.getFileManager",
"command": "FileManagerSteps.getFileManager",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "com.acxiom.pipeline.fs.FileManager"
Expand All @@ -86,7 +86,7 @@
}
],
"engineMeta": {
"spark": "FileManagerSteps.getFileManager",
"command": "FileManagerSteps.getFileManager",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "com.acxiom.pipeline.fs.FileManager"
Expand Down Expand Up @@ -135,7 +135,7 @@
}
],
"engineMeta": {
"spark": "FileManagerSteps.copy",
"command": "FileManagerSteps.copy",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "com.acxiom.pipeline.steps.CopyResults"
Expand Down Expand Up @@ -185,7 +185,7 @@
}
],
"engineMeta": {
"spark": "FileManagerSteps.compareFileSizes",
"command": "FileManagerSteps.compareFileSizes",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "Int"
Expand Down Expand Up @@ -218,7 +218,7 @@
}
],
"engineMeta": {
"spark": "StringSteps.toString",
"command": "StringSteps.toString",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "String"
Expand Down Expand Up @@ -273,7 +273,7 @@
}
],
"engineMeta": {
"spark": "StringSteps.stringEquals",
"command": "StringSteps.stringEquals",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "Boolean"
Expand All @@ -297,7 +297,7 @@
}
],
"engineMeta": {
"spark": "FileManagerSteps.disconnectFileManager",
"command": "FileManagerSteps.disconnectFileManager",
"pkg": "com.acxiom.pipeline.steps"
},
"nextStepId": "CLOSEDESTINATION",
Expand All @@ -319,7 +319,7 @@
}
],
"engineMeta": {
"spark": "FileManagerSteps.disconnectFileManager",
"command": "FileManagerSteps.disconnectFileManager",
"pkg": "com.acxiom.pipeline.steps"
},
"stepId": "3d1e8519-690c-55f0-bd05-1e7b97fb6633"
Expand Down Expand Up @@ -348,7 +348,7 @@
}
],
"engineMeta": {
"spark": "FileManagerSteps.deleteFile",
"command": "FileManagerSteps.deleteFile",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "Boolean"
Expand Down Expand Up @@ -395,7 +395,7 @@
}
],
"engineMeta": {
"spark": "FlowUtilsSteps.simpleRetry",
"command": "FlowUtilsSteps.simpleRetry",
"pkg": "com.acxiom.pipeline.steps",
"results": {
"primaryType": "com.acxiom.pipeline.PipelineStepResponse"
Expand Down
28 changes: 1 addition & 27 deletions metalus-core/src/main/scala/com/acxiom/metalus/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.acxiom.metalus

import com.acxiom.metalus.ExecutionEvaluationResult.ExecutionEvaluationResult
import com.acxiom.metalus.audits.ExecutionAudit
import com.acxiom.metalus.utils.ReflectionUtils
import com.acxiom.metalus.context.ContextManager

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -130,32 +130,6 @@ final case class PipelineStateInfo(pipelineId: String,
}
}

/**
* This class maintains a set of contexts that can be used by the running pipelines.
*
* @param contexts Map of class info objects that need to be initialized.
* @param setupParameters A map containing items that were passed to the application at startup.
*/
class ContextManager(contexts: Map[String, ClassInfo], setupParameters: Map[String, Any]) {
private val defaultContexts: Map[String, ClassInfo] = Map[String, ClassInfo](
"json" -> ClassInfo(Some("com.acxiom.metalus.context.Json4sContext"), Some(Map[String, Any]()))
)
private val contextObjects: Map[String, Context] = (defaultContexts ++ contexts)
.map(c => c._1 -> ReflectionUtils.loadClass(c._2.className.get, Some(c._2.parameters.getOrElse(Map()) ++ setupParameters)).asInstanceOf[Context])

/**
* Returns the Context associated with the provided key or None.
* @param key The lookup key for the Context.
* @return A Context for the key or None if it doesn't exist.
*/
def getContext(key: String): Option[Context] = contextObjects.get(key)
}

/**
* Marker trait for objects managed by the ContextManager
*/
trait Context

/**
* Contains information about a class that needs to be instantiated at runtime.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object PipelineExecutorValidations {
step.`type`.getOrElse("").toLowerCase match {
case s if s == PipelineStepType.PIPELINE || s == PipelineStepType.BRANCH =>
val ps = step.asInstanceOf[PipelineStep]
if(ps.engineMeta.isEmpty || ps.engineMeta.get.spark.getOrElse("") == "") {
if(ps.engineMeta.isEmpty || ps.engineMeta.get.command.getOrElse("") == "") {
throw PipelineException(
message = Some(s"EngineMeta is required for [${step.`type`.get}] step [${step.id.get}] in pipeline [${pipeline.id.get}]"),
pipelineProgress = defaultStateInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ case class Parameter(`type`: Option[String] = None,
/**
* This class contains the execution information for a Step
*
* @param spark The execution instruction for the Spark engine.
* @param command The execution instruction for the Spark engine.
* @param pkg An optional package location
* @param results The optional StepResult
*/
case class EngineMeta(spark: Option[String] = None, pkg: Option[String] = None, results: Option[Results] = None)
case class EngineMeta(command: Option[String] = None, pkg: Option[String] = None, results: Option[Results] = None)

/**
* This class represents the expected result of a step execution
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.acxiom.metalus.applications

import com.acxiom.metalus._
import com.acxiom.metalus.context.Json4sContext
import com.acxiom.metalus.context.{ContextManager, Json4sContext}
import com.acxiom.metalus.parser.JsonParser
import com.acxiom.metalus.utils.ReflectionUtils
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -34,7 +34,8 @@ object ApplicationUtils {
}
val validateArgumentTypes = parameters.getOrElse(Map()).getOrElse("validateStepParameterTypes", false).asInstanceOf[Boolean]
// Create the ContextManager
val contextManager = new ContextManager(application.contexts.getOrElse(Map()), parameters.getOrElse(Map()))
val contextManager = new ContextManager(application.contexts.getOrElse(Map()),
parameters.getOrElse(Map()) + ("credentialProvider" -> credentialProvider))
val tempCtx = PipelineContext(globals, List(), contextManager = contextManager)
val globalStepMapper = generateStepMapper(application.stepMapper, Some(PipelineStepMapper()),
validateArgumentTypes, credentialProvider, tempCtx)
Expand All @@ -52,30 +53,6 @@ object ApplicationUtils {
initialContext.copy(globals = defaultGlobals)
}

/** TODO [2.0 Review] IS this still needed?
* Utility method that resets the state on the PipelineExecution.
*
* @param application The Application configuration
* @param rootGlobals The initial set of globals
* @param execution The execution configuration
* @param pipelineExecution The PipelineExecution that needs to be refreshed
* @return An updated PipelineExecution
*/
// def refreshPipelineExecution(application: Application,
// rootGlobals: Option[Map[String, Any]],
// execution: Execution,
// pipelineExecution: PipelineExecution): PipelineExecution = {
// implicit val formats: Formats = getJson4sFormats(application.json4sSerializers)
// val initialContext = pipelineExecution.pipelineContext.copy(globals = rootGlobals)
// val defaultGlobals = generateGlobals(application.globals, rootGlobals.get, rootGlobals, initialContext)
// val globalPipelineParameters = generatePipelineParameters(application.pipelineParameters, Some(PipelineParameters()))
// val ctx = pipelineExecution.pipelineContext
// .copy(globals = generateGlobals(execution.globals, rootGlobals.get, defaultGlobals,
// initialContext, execution.mergeGlobals.getOrElse(false)))
// .copy(parameters = generatePipelineParameters(execution.pipelineParameters, globalPipelineParameters).get)
// pipelineExecution.asInstanceOf[DefaultPipelineExecution].copy(pipelineContext = ctx)
// }

private def generatePipelineManager(pipelineManagerInfo: Option[ClassInfo],
pipelineManager: Option[PipelineManager],
validateArgumentTypes: Boolean,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.acxiom.metalus.context

import com.acxiom.metalus.ClassInfo
import com.acxiom.metalus.utils.ReflectionUtils

/**
* This class maintains a set of contexts that can be used by the running pipelines.
*
* @param contexts Map of class info objects that need to be initialized.
* @param setupParameters A map containing items that were passed to the application at startup.
*/
class ContextManager(contexts: Map[String, ClassInfo], setupParameters: Map[String, Any]) {
private val defaultContexts: Map[String, ClassInfo] = Map[String, ClassInfo](
"json" -> ClassInfo(Some("com.acxiom.metalus.context.Json4sContext"), Some(Map[String, Any]()))
)
private val contextObjects: Map[String, Context] = (defaultContexts ++ contexts)
.map(c => c._1 -> ReflectionUtils.loadClass(c._2.className.get, Some(c._2.parameters.getOrElse(Map()) ++ setupParameters)).asInstanceOf[Context])

/**
* Returns the Context associated with the provided key or None.
*
* @param key The lookup key for the Context.
* @return A Context for the key or None if it doesn't exist.
*/
def getContext(key: String): Option[Context] = contextObjects.get(key)
}

/**
* Marker trait for objects managed by the ContextManager
*/
trait Context
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package com.acxiom.metalus.context

import com.acxiom.metalus.Context
import com.acxiom.metalus.applications.Json4sSerializers
import com.acxiom.metalus.parser.JsonParser
import com.acxiom.metalus.parser.JsonParser.StepSerializer
import com.acxiom.metalus.utils.ReflectionUtils
import org.json4s.ext.{EnumNameSerializer, EnumSerializer}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.json4s.reflect.Reflector
import org.json4s.{CustomSerializer, DefaultFormats, Extraction, Formats, FullTypeHints}

/**
* Build a json4s Formats object using the ClassInfo objects in json4sSerializers. If json4sSerializers is not
Expand Down

0 comments on commit 06f4cef

Please sign in to comment.