diff --git a/metalus-aws/src/main/resources/dependencies.json b/metalus-aws/src/main/resources/dependencies.json index 5b9af203..2bf22492 100644 --- a/metalus-aws/src/main/resources/dependencies.json +++ b/metalus-aws/src/main/resources/dependencies.json @@ -5,6 +5,36 @@ "groupId": "com.acxiom", "artifactId": "metalus-core_${scala.compat.version}", "version": "${version}" + }, + { + "groupId": "software.amazon.awssdk", + "artifactId": "lambda", + "version": "${aws-sdk-version}", + "scope":"extraction" + }, + { + "groupId": "software.amazon.awssdk", + "artifactId": "sdk-core", + "version": "${aws-sdk-version}", + "scope":"extraction" + }, + { + "groupId": "software.amazon.awssdk", + "artifactId": "utils", + "version": "${aws-sdk-version}", + "scope":"extraction" + }, + { + "groupId": "software.amazon.awssdk", + "artifactId": "annotations", + "version": "${aws-sdk-version}", + "scope":"extraction" + }, + { + "groupId": "software.amazon.awssdk", + "artifactId": "aws-core", + "version": "${aws-sdk-version}", + "scope":"extraction" } ] } diff --git a/metalus-core/src/main/resources/metadata/pipelines/copy-file.json b/metalus-core/src/main/resources/metadata/pipelines/copy-file.json index 54d59917..d0e8faf5 100644 --- a/metalus-core/src/main/resources/metadata/pipelines/copy-file.json +++ b/metalus-core/src/main/resources/metadata/pipelines/copy-file.json @@ -44,6 +44,33 @@ "y": 830 } }, + "parameters": { + "inputs": [ + { + "name": "sourceConnector", + "global": true, + "required": true + }, + { + "name": "destinationConnector", + "global": true, + "required": true + }, + { + "name": "sourceCopyPath", + "global": true, + "required": true + }, + { + "name": "destinationCopyPath", + "global": true, + "required": true + } + ], + "restartableSteps": [ + "VERIFY" + ] + }, "steps": [ { "id": "GETSOURCE", @@ -67,9 +94,78 @@ "primaryType": "com.acxiom.pipeline.fs.FileManager" } }, - "nextStepId": "GETDESTINATION", + "nextStepId": "VALIDATE_SOURCE", "stepId": "259a880a-3e12-4843-9f02-2cfc2a05f576" }, + { + "id": "VALIDATE_SOURCE", + "displayName": "Does File Exist", + "description": "Checks whether a file exists", + "type": "branch", + "params": [ + { + "type": "text", + "name": "fileManager", + "required": true, + "value": "@GETSOURCE", + "parameterType": "com.acxiom.pipeline.connectors.FileConnector", + "description": "Maps the value from the sourceConnector global." + }, + { + "type": "text", + "name": "path", + "required": true, + "value": "!sourceCopyPath", + "parameterType": "String", + "description": "Maps the value from the sourceConnector global." + }, + { + "type": "result", + "name": "true", + "required": true, + "value": "GETDESTINATION", + "parameterType": "String" + }, + { + "type": "result", + "name": "false", + "required": true, + "value": "INVALID_SOURCE", + "parameterType": "String" + } + ], + "engineMeta": { + "command": "FileManagerSteps.exists", + "pkg": "com.acxiom.pipeline.steps" + }, + "stepId": "aec5ebf7-7dac-4132-8d58-3a06b4772f79" + }, + { + "id": "INVALID_SOURCE", + "displayName": "Raise Exception", + "type": "Pipeline", + "params": [ + { + "type": "text", + "name": "message", + "required": true, + "value": "Source path does not exist: !{sourceCopyPath}", + "parameterType": "String" + }, + { + "type": "text", + "name": "stepIdOverride", + "required": true, + "value": "VALIDATE_SOURCE", + "parameterType": "String" + } + ], + "engineMeta": { + "command": "ExceptionSteps.throwPipelineException", + "pkg": "com.acxiom.pipeline.steps" + }, + "stepId": "fb6c6293-c51d-49ab-a77e-de389610cdd6c" + }, { "id": "GETDESTINATION", "displayName": "Create a FileManager", diff --git a/metalus-core/src/main/scala/com/acxiom/metalus/PipelineStep.scala b/metalus-core/src/main/scala/com/acxiom/metalus/PipelineStep.scala index 0070f061..8d77a3fa 100644 --- a/metalus-core/src/main/scala/com/acxiom/metalus/PipelineStep.scala +++ b/metalus-core/src/main/scala/com/acxiom/metalus/PipelineStep.scala @@ -84,13 +84,14 @@ final case class PipelineStepGroup(override val id: Option[String] = None, pipelineId: Option[String] = None) extends FlowStep /** - * Represents a template fora step to be used when creating a pipeline. + * Represents a template for a step to be used when creating a pipeline. * * @param id The unique (to the pipeline) id of this step template. This property is used to reference this template within a pipeline step. * @param displayName A name that can be displayed in logs and errors. * @param description A long description of this step. * @param `type` The type of step. * @param params The step parameters that are used during execution. + * @param category Used for categorizing the template metadata. * @param engineMeta Contains the instruction for invoking the step function. * @param restartable Boolean flag indicating whether this step may be started in a flow restart */ @@ -99,6 +100,7 @@ case class StepTemplate(override val id: Option[String] = None, override val description: Option[String] = None, override val `type`: Option[String] = None, override val params: Option[List[Parameter]] = None, + category: Option[String] = None, engineMeta: Option[EngineMeta] = None, restartable: Option[Boolean] = Some(false)) extends Step diff --git a/metalus-core/src/main/scala/com/acxiom/metalus/connectors/Connector.scala b/metalus-core/src/main/scala/com/acxiom/metalus/connectors/Connector.scala index adc1ae4b..43095e43 100644 --- a/metalus-core/src/main/scala/com/acxiom/metalus/connectors/Connector.scala +++ b/metalus-core/src/main/scala/com/acxiom/metalus/connectors/Connector.scala @@ -9,6 +9,8 @@ trait Connector { def credential: Option[Credential] + def connectorType: String + /** * Using the provided PipelineContext and the optional credentialName and credential, this function will * attempt to provide a Credential for use by the connector. diff --git a/metalus-core/src/main/scala/com/acxiom/metalus/connectors/DataConnector.scala b/metalus-core/src/main/scala/com/acxiom/metalus/connectors/DataConnector.scala index ef35e267..496257c3 100644 --- a/metalus-core/src/main/scala/com/acxiom/metalus/connectors/DataConnector.scala +++ b/metalus-core/src/main/scala/com/acxiom/metalus/connectors/DataConnector.scala @@ -4,5 +4,7 @@ import com.acxiom.metalus.PipelineContext import com.acxiom.metalus.sql.DataReference trait DataConnector extends Connector { + def connectorType: String = "DATA" + def createDataReference(properties: Option[Map[String, Any]], pipelineContext: PipelineContext): DataReference[_] } diff --git a/metalus-core/src/main/scala/com/acxiom/metalus/connectors/FileConnector.scala b/metalus-core/src/main/scala/com/acxiom/metalus/connectors/FileConnector.scala index e6c0a960..bc9c2a9b 100644 --- a/metalus-core/src/main/scala/com/acxiom/metalus/connectors/FileConnector.scala +++ b/metalus-core/src/main/scala/com/acxiom/metalus/connectors/FileConnector.scala @@ -8,6 +8,9 @@ import com.acxiom.metalus.fs.FileManager * implementation provides a way to get the FileManager for that file system and can be used by steps. */ trait FileConnector extends Connector { + + def connectorType: String = "FILE" + /** * Creates and opens a FileManager. *