Skip to content

Commit

Permalink
Acxiom#368 Added parameters to the copy pipeline and a source verific…
Browse files Browse the repository at this point in the history
…ation section

Acxiom#345 Updated Connectors traits to have a connectorType of either FILE or DATA
  • Loading branch information
dafreels committed Mar 16, 2023
1 parent fab6416 commit 35c0a17
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 2 deletions.
30 changes: 30 additions & 0 deletions metalus-aws/src/main/resources/dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,36 @@
"groupId": "com.acxiom",
"artifactId": "metalus-core_${scala.compat.version}",
"version": "${version}"
},
{
"groupId": "software.amazon.awssdk",
"artifactId": "lambda",
"version": "${aws-sdk-version}",
"scope":"extraction"
},
{
"groupId": "software.amazon.awssdk",
"artifactId": "sdk-core",
"version": "${aws-sdk-version}",
"scope":"extraction"
},
{
"groupId": "software.amazon.awssdk",
"artifactId": "utils",
"version": "${aws-sdk-version}",
"scope":"extraction"
},
{
"groupId": "software.amazon.awssdk",
"artifactId": "annotations",
"version": "${aws-sdk-version}",
"scope":"extraction"
},
{
"groupId": "software.amazon.awssdk",
"artifactId": "aws-core",
"version": "${aws-sdk-version}",
"scope":"extraction"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@
"y": 830
}
},
"parameters": {
"inputs": [
{
"name": "sourceConnector",
"global": true,
"required": true
},
{
"name": "destinationConnector",
"global": true,
"required": true
},
{
"name": "sourceCopyPath",
"global": true,
"required": true
},
{
"name": "destinationCopyPath",
"global": true,
"required": true
}
],
"restartableSteps": [
"VERIFY"
]
},
"steps": [
{
"id": "GETSOURCE",
Expand All @@ -67,9 +94,78 @@
"primaryType": "com.acxiom.pipeline.fs.FileManager"
}
},
"nextStepId": "GETDESTINATION",
"nextStepId": "VALIDATE_SOURCE",
"stepId": "259a880a-3e12-4843-9f02-2cfc2a05f576"
},
{
"id": "VALIDATE_SOURCE",
"displayName": "Does File Exist",
"description": "Checks whether a file exists",
"type": "branch",
"params": [
{
"type": "text",
"name": "fileManager",
"required": true,
"value": "@GETSOURCE",
"parameterType": "com.acxiom.pipeline.connectors.FileConnector",
"description": "Maps the value from the sourceConnector global."
},
{
"type": "text",
"name": "path",
"required": true,
"value": "!sourceCopyPath",
"parameterType": "String",
"description": "Maps the value from the sourceConnector global."
},
{
"type": "result",
"name": "true",
"required": true,
"value": "GETDESTINATION",
"parameterType": "String"
},
{
"type": "result",
"name": "false",
"required": true,
"value": "INVALID_SOURCE",
"parameterType": "String"
}
],
"engineMeta": {
"command": "FileManagerSteps.exists",
"pkg": "com.acxiom.pipeline.steps"
},
"stepId": "aec5ebf7-7dac-4132-8d58-3a06b4772f79"
},
{
"id": "INVALID_SOURCE",
"displayName": "Raise Exception",
"type": "Pipeline",
"params": [
{
"type": "text",
"name": "message",
"required": true,
"value": "Source path does not exist: !{sourceCopyPath}",
"parameterType": "String"
},
{
"type": "text",
"name": "stepIdOverride",
"required": true,
"value": "VALIDATE_SOURCE",
"parameterType": "String"
}
],
"engineMeta": {
"command": "ExceptionSteps.throwPipelineException",
"pkg": "com.acxiom.pipeline.steps"
},
"stepId": "fb6c6293-c51d-49ab-a77e-de389610cdd6c"
},
{
"id": "GETDESTINATION",
"displayName": "Create a FileManager",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ final case class PipelineStepGroup(override val id: Option[String] = None,
pipelineId: Option[String] = None) extends FlowStep

/**
* Represents a template fora step to be used when creating a pipeline.
* Represents a template for a step to be used when creating a pipeline.
*
* @param id The unique (to the pipeline) id of this step template. This property is used to reference this template within a pipeline step.
* @param displayName A name that can be displayed in logs and errors.
* @param description A long description of this step.
* @param `type` The type of step.
* @param params The step parameters that are used during execution.
* @param category Used for categorizing the template metadata.
* @param engineMeta Contains the instruction for invoking the step function.
* @param restartable Boolean flag indicating whether this step may be started in a flow restart
*/
Expand All @@ -99,6 +100,7 @@ case class StepTemplate(override val id: Option[String] = None,
override val description: Option[String] = None,
override val `type`: Option[String] = None,
override val params: Option[List[Parameter]] = None,
category: Option[String] = None,
engineMeta: Option[EngineMeta] = None,
restartable: Option[Boolean] = Some(false)) extends Step

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ trait Connector {

def credential: Option[Credential]

def connectorType: String

/**
* Using the provided PipelineContext and the optional credentialName and credential, this function will
* attempt to provide a Credential for use by the connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ import com.acxiom.metalus.PipelineContext
import com.acxiom.metalus.sql.DataReference

trait DataConnector extends Connector {
def connectorType: String = "DATA"

def createDataReference(properties: Option[Map[String, Any]], pipelineContext: PipelineContext): DataReference[_]
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import com.acxiom.metalus.fs.FileManager
* implementation provides a way to get the FileManager for that file system and can be used by steps.
*/
trait FileConnector extends Connector {

def connectorType: String = "FILE"

/**
* Creates and opens a FileManager.
*
Expand Down

0 comments on commit 35c0a17

Please sign in to comment.