Skip to content

Commit

Permalink
Acxiom#345 Added FlowUtils.readStream step to allow windowing data fr…
Browse files Browse the repository at this point in the history
…om a source to a pipeline.
  • Loading branch information
dafreels committed Mar 23, 2023
1 parent bf353f9 commit 5b169da
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 8 deletions.
Expand Up @@ -416,7 +416,14 @@ trait SessionConvertor {
case class DefaultSessionConvertor() extends SessionConvertor {
override def name: String = "DefaultConvertor"

override def canConvert(obj: Any): Boolean = obj.isInstanceOf[java.io.Serializable]
override def canConvert(obj: Any): Boolean = {
try {
new ObjectOutputStream(new ByteArrayOutputStream()).writeObject(obj)
true
} catch {
case _ => false
}
}

override def serialize(obj: Any): Array[Byte] = {
val o = new ByteArrayOutputStream()
Expand Down
@@ -1,11 +1,10 @@
package com.acxiom.metalus.sql

import com.acxiom.metalus.connectors.{ConnectorProvider, FileConnector}
import com.acxiom.metalus.{PipelineContext, PipelineException}
import com.acxiom.metalus.utils.ReflectionUtils
import com.acxiom.metalus.{PipelineContext, PipelineException}
import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, UnescapedQuoteHandling}

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.io.InputStream
import scala.collection.immutable.Queue
import scala.jdk.CollectionConverters._
import scala.math.ScalaNumber
Expand Down Expand Up @@ -49,7 +48,7 @@ object InMemoryTable {
val params = options.map(_.mapValues(_.toString).toMap).getOrElse(Map.empty[String, String])
val settings = new CsvParserSettings()
val format = settings.getFormat
format.setComment('\0')
format.setComment('\u0000')
format.setDelimiter(params.getOrElse("delimiter", params.getOrElse("separator", ",")))
params.get("quote").foreach(q => format.setQuote(q.head))
params.get("escape").foreach(e => format.setQuoteEscape(e.head))
Expand Down
@@ -1,9 +1,15 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus._
import com.acxiom.metalus.annotations._
import com.acxiom.metalus.{Constants, PipelineContext, PipelineStepResponse, RetryPolicy}
import com.acxiom.metalus.connectors.{Connector, DataStreamOptions, InMemoryDataConnector}
import com.acxiom.metalus.sql.InMemoryDataReference
import com.acxiom.metalus.utils.DriverUtils
import com.acxiom.metalus.utils.DriverUtils.buildPipelineException
import org.slf4j.{Logger, LoggerFactory}

import scala.annotation.tailrec

@StepObject
object FlowUtilsSteps {
val logger: Logger = LoggerFactory.getLogger(getClass)
Expand All @@ -20,6 +26,49 @@ object FlowUtilsSteps {
}
}

@StepFunction("ef1028ad-8cc4-4e20-b29d-d5d8e506dbc7",
"Stream Data",
"Given a connector and pipeline id, this step will process windowed data until complete or a pipeline error occurs",
"Pipeline", "Streaming", List[String]("batch", "streaming"))
@StepParameters(Map("source" -> StepParameter(None, Some(true), None, None, None, None, Some("The connector to use to obtain the source data")),
"pipelineId" -> StepParameter(None, Some(true), None, None, None, None, Some("The id of the pipeline to use for processing")),
"retryPolicy" -> StepParameter(None, Some(false), None, None, None, None, Some("An optional retry policy")),
"options" -> StepParameter(None, Some(false), None, None, None, None, Some("Optional settings to use during the data read"))))
def streamData(source: Connector, pipelineId: String,
retryPolicy: RetryPolicy = RetryPolicy(),
options: Option[DataStreamOptions], pipelineContext: PipelineContext): Unit = {
val readerOpt = source.getReader(options)
val pipeline = pipelineContext.pipelineManager.getPipeline(pipelineId)
if (readerOpt.isEmpty) {
throw DriverUtils.buildPipelineException(Some(s"Connector ${source.name} does not support reading from a stream!"), None, Some(pipelineContext))
}
if (pipeline.isEmpty) {
throw DriverUtils.buildPipelineException(Some(s"Pipeline $pipelineId does not exist!"), None, Some(pipelineContext))
}
val reader = readerOpt.get
reader.open()
try {
Iterator.continually(reader.next()).takeWhile(r => r.isDefined).foreach(results => {
if (results.get.nonEmpty) {
val properties = Map("data" -> results.get.map(_.columns), "schema" -> results.get.head.schema)
val dataRef = InMemoryDataConnector("data-chunk")
.createDataReference(Some(properties), pipelineContext)
.asInstanceOf[InMemoryDataReference]
val result = processDataReference(source.name, dataRef, pipeline.get, retryPolicy, 0, pipelineContext)
if (!result.success) {
reader.close()
throw DriverUtils.buildPipelineException(Some("Failed to process streaming data!"), result.exception, Some(pipelineContext))
}
}
})
reader.close()
} catch {
case t: Throwable =>
reader.close()
throw t
}
}

@StepFunction("6ed36f89-35d1-4280-a555-fbcd8dd76bf2",
"Retry (simple)",
"Makes a decision to retry or stop based on a named counter",
Expand Down Expand Up @@ -48,4 +97,25 @@ object FlowUtilsSteps {
}
PipelineStepResponse(Some(decision), Some(Map[String, Any](s"$$globals.$counterName" -> updateCounter)))
}

@tailrec
private def processDataReference(sourceName: String,
data: InMemoryDataReference,
pipeline: Pipeline,
retryPolicy: RetryPolicy = RetryPolicy(),
retryCount: Int = 0,
pipelineContext: PipelineContext): PipelineExecutionResult = {
try {
val initialContext = pipelineContext.setGlobal("streamedDataReference", data)
PipelineExecutor.executePipelines(pipeline, initialContext)
} catch {
case t: Throwable =>
if (retryCount < retryPolicy.maximumRetries.getOrElse(Constants.TEN)) {
DriverUtils.invokeWaitPeriod(retryPolicy, retryCount + 1)
processDataReference(sourceName, data, pipeline, retryPolicy, retryCount, pipelineContext)
} else {
throw buildPipelineException(Some(s"Unable to process records from source stream $sourceName"), Some(t), Some(pipelineContext))
}
}
}
}
@@ -0,0 +1,22 @@
{
"id": "in-memory-flow",
"name": "InMemory Test Pipeline",
"steps": [
{
"id": "VALIDATE_DATA",
"type": "Pipeline",
"params": [
{
"type": "object",
"name": "dataRef",
"required": true,
"value": "!streamedDataReference"
}
],
"engineMeta": {
"command": "MockStepObject.mockExceptionStepFunction",
"pkg": "com.acxiom.metalus"
}
}
]
}
@@ -0,0 +1,28 @@
{
"id": "in-memory-flow",
"name": "InMemory Test Pipeline",
"steps": [
{
"id": "VALIDATE_DATA",
"type": "Pipeline",
"params": [
{
"type": "object",
"name": "dataRef",
"required": true,
"value": "!streamedDataReference"
},
{
"type": "object",
"name": "results",
"required": true,
"value": "!rowBuffer"
}
],
"engineMeta": {
"command": "MockStepObject.validateDataReference",
"pkg": "com.acxiom.metalus.steps"
}
}
]
}
@@ -1,10 +1,15 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.{Constants, EngineMeta, Parameter, Pipeline, PipelineExecutor, PipelineListener, PipelineStep, RetryPolicy, TestHelper}
import com.acxiom.metalus.connectors.{DataStreamOptions, LocalFileConnector}
import com.acxiom.metalus.sql.Row
import com.acxiom.metalus._
import org.scalatest.funspec.AnyFunSpec

class FlowUtilsStepsTests extends AnyFunSpec {
import java.io.File
import java.nio.file.{Files, StandardCopyOption}
import scala.collection.mutable.ListBuffer

class FlowUtilsStepsTests extends AnyFunSpec {
val STRING_STEP: PipelineStep = PipelineStep(Some("STRINGSTEP"), Some("String Step"), None, Some("Pipeline"),
Some(List(Parameter(Some("text"), Some("value"), Some(true), None, Some("lowercase")))),
Some("RETRY"), None, None, None, None, None, None, None, Some(EngineMeta(Some("StringSteps.toUpperCase"))))
Expand Down Expand Up @@ -39,6 +44,32 @@ class FlowUtilsStepsTests extends AnyFunSpec {
}
}

describe("streamData") {
val source = File.createTempFile("placeholder", ".txt")
source.deleteOnExit()
val dataFilePath = s"${source.getParentFile.getAbsolutePath}/MOCK_DATA.csv"
Files.copy(getClass.getResourceAsStream("/MOCK_DATA.csv"),
new File(dataFilePath).toPath,
StandardCopyOption.REPLACE_EXISTING)
val localFileConnector = LocalFileConnector("my-connector", None, None)
val options = DataStreamOptions(None,
Map("filePath" -> dataFilePath, "fileDelimiter" -> ",", "useHeader" -> true),
Constants.TWELVE)
it ("should process data in chunks") {
val rows = new ListBuffer[Row]()
val initialPipelineContext = TestHelper.generatePipelineContext().setGlobal("rowBuffer", rows)
FlowUtilsSteps.streamData(localFileConnector, "in-memory-flow", RetryPolicy(), Some(options), initialPipelineContext)
assert(rows.length == Constants.ONE_THOUSAND)
}

it ("should throw an exception when pipeline fails") {
val thrown = intercept[PipelineException] {
FlowUtilsSteps.streamData(localFileConnector, "in-memory-fail-flow", RetryPolicy(), Some(options), TestHelper.generatePipelineContext())
}
assert(thrown.getMessage == "Failed to process streaming data!")
}
}

describe("isEmpty") {
it("should determine if object is empty") {
assert(FlowUtilsSteps.isEmpty(None))
Expand Down
@@ -1,9 +1,18 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.sql.{DataReference, InMemoryDataReference, Row}

import scala.collection.mutable.ListBuffer

object MockStepObject {

def mockStepFunctionAnyResponse(string: String): String = {
string
}

def validateDataReference(dataRef: DataReference[_], results: ListBuffer[Row]): Unit = {
assert(dataRef.isInstanceOf[InMemoryDataReference])
val table = dataRef.asInstanceOf[InMemoryDataReference].execute
table.data.foreach(r => results += Row(r, Some(table.schema), None))
}
}

0 comments on commit 5b169da

Please sign in to comment.