Skip to content

Commit

Permalink
Acxiom#345 Moved FlowUtilsSteps to core
Browse files Browse the repository at this point in the history
  • Loading branch information
dafreels committed Feb 22, 2023
1 parent 96e71ef commit b6326b4
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 169 deletions.

This file was deleted.

This file was deleted.

Expand Up @@ -2,7 +2,7 @@ package com.acxiom.pipeline.streaming

import com.acxiom.pipeline._
import com.acxiom.pipeline.connectors.{DataConnectorUtilities, HDFSDataConnector}
import com.acxiom.pipeline.steps.{DataFrameReaderOptions, DataFrameWriterOptions, FlowUtilsSteps, Schema}
import com.acxiom.pipeline.steps.{DataFrameReaderOptions, DataFrameWriterOptions, Schema}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster}
Expand Down
@@ -0,0 +1,45 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.annotations._
import com.acxiom.metalus.{PipelineContext, PipelineStepResponse}
import org.slf4j.{Logger, LoggerFactory}

@StepObject
object FlowUtilsSteps {
val logger: Logger = LoggerFactory.getLogger(getClass)

@StepFunction("cc8d44ad-5049-460f-87c4-e250b9fa53f1",
"Empty Check",
"Determines if the provided value is defined. Returns true if the value is not defined.",
"branch", "Utilities", List[String]("batch"))
@BranchResults(List("true", "false"))
def isEmpty(value: Any): Boolean = {
value match {
case o: Option[_] => o.isEmpty
case _ => Option(value).isEmpty
}
}

@StepFunction("6ed36f89-35d1-4280-a555-fbcd8dd76bf2",
"Retry (simple)",
"Makes a decision to retry or stop based on a named counter",
"branch", "RetryLogic", List[String]("batch"))
@BranchResults(List("retry", "stop"))
@StepParameters(Map("counterName" -> StepParameter(None, Some(true), None, None, None, None, Some("The name of the counter to use for tracking")),
"maxRetries" -> StepParameter(None, Some(true), None, None, None, None, Some("The maximum number of retries allowed"))))
@StepResults(primaryType = "String", secondaryTypes = Some(Map("$globals.$counterName" -> "Int")))
def simpleRetry(counterName: String, maxRetries: Int, pipelineContext: PipelineContext): PipelineStepResponse = {
val currentCounter = pipelineContext.getGlobalAs[Int](counterName)
val decision = if (currentCounter.getOrElse(0) < maxRetries) {
"retry"
} else {
"stop"
}
val updateCounter = if (decision == "retry") {
currentCounter.getOrElse(0) + 1
} else {
currentCounter.getOrElse(0)
}
PipelineStepResponse(Some(decision), Some(Map[String, Any](s"$$globals.$counterName" -> updateCounter)))
}
}
@@ -1,6 +1,6 @@
package com.acxiom.metalus.parser

import com.acxiom.metalus.PipelineDefs.{BASIC_PIPELINE, RETRY_PIPELINE, TWO_PIPELINE}
import com.acxiom.metalus.PipelineDefs.{BASIC_PIPELINE, RETRY_PIPELINE}
import com.acxiom.metalus._
import org.scalatest.funspec.AnyFunSpec

Expand Down
@@ -0,0 +1,49 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.{Constants, EngineMeta, Parameter, Pipeline, PipelineExecutor, PipelineListener, PipelineStep, TestHelper}
import org.scalatest.funspec.AnyFunSpec

class FlowUtilsStepsTests extends AnyFunSpec {

val STRING_STEP: PipelineStep = PipelineStep(Some("STRINGSTEP"), Some("String Step"), None, Some("Pipeline"),
Some(List(Parameter(Some("text"), Some("value"), Some(true), None, Some("lowercase")))),
Some("RETRY"), None, None, None, None, None, None, None, Some(EngineMeta(Some("StringSteps.toUpperCase"))))

val RETRY_STEP: PipelineStep = PipelineStep(Some("RETRY"), Some("Retry Step"), None, Some("branch"),
Some(List(Parameter(Some("text"), Some("counterName"), Some(true), None, Some("TEST_RETRY_COUNTER")),
Parameter(Some("int"), Some("maxRetries"), Some(true), None, Some(Constants.FIVE)),
Parameter(Some("result"), Some("retry"), Some(true), None, Some("STRINGSTEP")))),
None, None, None, None, None, None, None, None, Some(EngineMeta(Some("FlowUtilsSteps.simpleRetry"))))

describe("FlowUtilsSteps") {
describe("Retry") {
it("should handle retry") {
TestHelper.pipelineListener = PipelineListener()
val initialPipelineContext = TestHelper.generatePipelineContext().setGlobal("testCounter", 0)
val response = FlowUtilsSteps.simpleRetry("testCounter", 1, initialPipelineContext)
assert(response.primaryReturn.get.toString == "retry")
val stopResponse = FlowUtilsSteps.simpleRetry("testCounter", 1, initialPipelineContext.setGlobal("testCounter", 1))
assert(stopResponse.primaryReturn.get.toString == "stop")
}

it("Should retry and trigger stop") {
TestHelper.pipelineListener = PipelineListener()
val pipeline = Pipeline(Some("testPipeline"), Some("retryPipeline"), Some(List(STRING_STEP, RETRY_STEP)))
val initialPipelineContext = TestHelper.generatePipelineContext()
val result = PipelineExecutor.executePipelines(pipeline, initialPipelineContext)
val counter = result.pipelineContext.getGlobalAs[Int]("TEST_RETRY_COUNTER")
assert(counter.isDefined)
assert(counter.get == Constants.FIVE)
}
}

describe("isEmpty") {
it("should determine if object is empty") {
assert(FlowUtilsSteps.isEmpty(None))
assert(FlowUtilsSteps.isEmpty(None.orNull))
assert(!FlowUtilsSteps.isEmpty(Some("string")))
assert(!FlowUtilsSteps.isEmpty("test"))
}
}
}
}

0 comments on commit b6326b4

Please sign in to comment.