diff --git a/metalus-core/src/main/scala/com/acxiom/metalus/context/SessionContext.scala b/metalus-core/src/main/scala/com/acxiom/metalus/context/SessionContext.scala index 58e8f025..56aa1a09 100644 --- a/metalus-core/src/main/scala/com/acxiom/metalus/context/SessionContext.scala +++ b/metalus-core/src/main/scala/com/acxiom/metalus/context/SessionContext.scala @@ -416,7 +416,14 @@ trait SessionConvertor { case class DefaultSessionConvertor() extends SessionConvertor { override def name: String = "DefaultConvertor" - override def canConvert(obj: Any): Boolean = obj.isInstanceOf[java.io.Serializable] + override def canConvert(obj: Any): Boolean = { + try { + new ObjectOutputStream(new ByteArrayOutputStream()).writeObject(obj) + true + } catch { + case _ => false + } + } override def serialize(obj: Any): Array[Byte] = { val o = new ByteArrayOutputStream() diff --git a/metalus-core/src/main/scala/com/acxiom/metalus/sql/InMemoryDataReference.scala b/metalus-core/src/main/scala/com/acxiom/metalus/sql/InMemoryDataReference.scala index 3545e2ab..059fc651 100644 --- a/metalus-core/src/main/scala/com/acxiom/metalus/sql/InMemoryDataReference.scala +++ b/metalus-core/src/main/scala/com/acxiom/metalus/sql/InMemoryDataReference.scala @@ -1,11 +1,10 @@ package com.acxiom.metalus.sql -import com.acxiom.metalus.connectors.{ConnectorProvider, FileConnector} -import com.acxiom.metalus.{PipelineContext, PipelineException} import com.acxiom.metalus.utils.ReflectionUtils +import com.acxiom.metalus.{PipelineContext, PipelineException} import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, UnescapedQuoteHandling} -import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.io.InputStream import scala.collection.immutable.Queue import scala.jdk.CollectionConverters._ import scala.math.ScalaNumber @@ -49,7 +48,7 @@ object InMemoryTable { val params = options.map(_.mapValues(_.toString).toMap).getOrElse(Map.empty[String, String]) val settings = new CsvParserSettings() val format = settings.getFormat - format.setComment('\0') + format.setComment('\u0000') format.setDelimiter(params.getOrElse("delimiter", params.getOrElse("separator", ","))) params.get("quote").foreach(q => format.setQuote(q.head)) params.get("escape").foreach(e => format.setQuoteEscape(e.head)) diff --git a/metalus-core/src/main/scala/com/acxiom/metalus/steps/FlowUtilsSteps.scala b/metalus-core/src/main/scala/com/acxiom/metalus/steps/FlowUtilsSteps.scala index 79b9170f..75e3ed8c 100644 --- a/metalus-core/src/main/scala/com/acxiom/metalus/steps/FlowUtilsSteps.scala +++ b/metalus-core/src/main/scala/com/acxiom/metalus/steps/FlowUtilsSteps.scala @@ -1,9 +1,15 @@ package com.acxiom.metalus.steps +import com.acxiom.metalus._ import com.acxiom.metalus.annotations._ -import com.acxiom.metalus.{Constants, PipelineContext, PipelineStepResponse, RetryPolicy} +import com.acxiom.metalus.connectors.{Connector, DataStreamOptions, InMemoryDataConnector} +import com.acxiom.metalus.sql.InMemoryDataReference +import com.acxiom.metalus.utils.DriverUtils +import com.acxiom.metalus.utils.DriverUtils.buildPipelineException import org.slf4j.{Logger, LoggerFactory} +import scala.annotation.tailrec + @StepObject object FlowUtilsSteps { val logger: Logger = LoggerFactory.getLogger(getClass) @@ -20,6 +26,49 @@ object FlowUtilsSteps { } } + @StepFunction("ef1028ad-8cc4-4e20-b29d-d5d8e506dbc7", + "Stream Data", + "Given a connector and pipeline id, this step will process windowed data until complete or a pipeline error occurs", + "Pipeline", "Streaming", List[String]("batch", "streaming")) + @StepParameters(Map("source" -> StepParameter(None, Some(true), None, None, None, None, Some("The connector to use to obtain the source data")), + "pipelineId" -> StepParameter(None, Some(true), None, None, None, None, Some("The id of the pipeline to use for processing")), + "retryPolicy" -> StepParameter(None, Some(false), None, None, None, None, Some("An optional retry policy")), + "options" -> StepParameter(None, Some(false), None, None, None, None, Some("Optional settings to use during the data read")))) + def streamData(source: Connector, pipelineId: String, + retryPolicy: RetryPolicy = RetryPolicy(), + options: Option[DataStreamOptions], pipelineContext: PipelineContext): Unit = { + val readerOpt = source.getReader(options) + val pipeline = pipelineContext.pipelineManager.getPipeline(pipelineId) + if (readerOpt.isEmpty) { + throw DriverUtils.buildPipelineException(Some(s"Connector ${source.name} does not support reading from a stream!"), None, Some(pipelineContext)) + } + if (pipeline.isEmpty) { + throw DriverUtils.buildPipelineException(Some(s"Pipeline $pipelineId does not exist!"), None, Some(pipelineContext)) + } + val reader = readerOpt.get + reader.open() + try { + Iterator.continually(reader.next()).takeWhile(r => r.isDefined).foreach(results => { + if (results.get.nonEmpty) { + val properties = Map("data" -> results.get.map(_.columns), "schema" -> results.get.head.schema) + val dataRef = InMemoryDataConnector("data-chunk") + .createDataReference(Some(properties), pipelineContext) + .asInstanceOf[InMemoryDataReference] + val result = processDataReference(source.name, dataRef, pipeline.get, retryPolicy, 0, pipelineContext) + if (!result.success) { + reader.close() + throw DriverUtils.buildPipelineException(Some("Failed to process streaming data!"), result.exception, Some(pipelineContext)) + } + } + }) + reader.close() + } catch { + case t: Throwable => + reader.close() + throw t + } + } + @StepFunction("6ed36f89-35d1-4280-a555-fbcd8dd76bf2", "Retry (simple)", "Makes a decision to retry or stop based on a named counter", @@ -48,4 +97,25 @@ object FlowUtilsSteps { } PipelineStepResponse(Some(decision), Some(Map[String, Any](s"$$globals.$counterName" -> updateCounter))) } + + @tailrec + private def processDataReference(sourceName: String, + data: InMemoryDataReference, + pipeline: Pipeline, + retryPolicy: RetryPolicy = RetryPolicy(), + retryCount: Int = 0, + pipelineContext: PipelineContext): PipelineExecutionResult = { + try { + val initialContext = pipelineContext.setGlobal("streamedDataReference", data) + PipelineExecutor.executePipelines(pipeline, initialContext) + } catch { + case t: Throwable => + if (retryCount < retryPolicy.maximumRetries.getOrElse(Constants.TEN)) { + DriverUtils.invokeWaitPeriod(retryPolicy, retryCount + 1) + processDataReference(sourceName, data, pipeline, retryPolicy, retryCount, pipelineContext) + } else { + throw buildPipelineException(Some(s"Unable to process records from source stream $sourceName"), Some(t), Some(pipelineContext)) + } + } + } } diff --git a/metalus-core/src/test/resources/metadata/pipelines/in-memory-fail-flow.json b/metalus-core/src/test/resources/metadata/pipelines/in-memory-fail-flow.json new file mode 100644 index 00000000..05852718 --- /dev/null +++ b/metalus-core/src/test/resources/metadata/pipelines/in-memory-fail-flow.json @@ -0,0 +1,22 @@ +{ + "id": "in-memory-flow", + "name": "InMemory Test Pipeline", + "steps": [ + { + "id": "VALIDATE_DATA", + "type": "Pipeline", + "params": [ + { + "type": "object", + "name": "dataRef", + "required": true, + "value": "!streamedDataReference" + } + ], + "engineMeta": { + "command": "MockStepObject.mockExceptionStepFunction", + "pkg": "com.acxiom.metalus" + } + } + ] +} diff --git a/metalus-core/src/test/resources/metadata/pipelines/in-memory-flow.json b/metalus-core/src/test/resources/metadata/pipelines/in-memory-flow.json new file mode 100644 index 00000000..299da9fc --- /dev/null +++ b/metalus-core/src/test/resources/metadata/pipelines/in-memory-flow.json @@ -0,0 +1,28 @@ +{ + "id": "in-memory-flow", + "name": "InMemory Test Pipeline", + "steps": [ + { + "id": "VALIDATE_DATA", + "type": "Pipeline", + "params": [ + { + "type": "object", + "name": "dataRef", + "required": true, + "value": "!streamedDataReference" + }, + { + "type": "object", + "name": "results", + "required": true, + "value": "!rowBuffer" + } + ], + "engineMeta": { + "command": "MockStepObject.validateDataReference", + "pkg": "com.acxiom.metalus.steps" + } + } + ] +} diff --git a/metalus-core/src/test/scala/com/acxiom/metalus/steps/FlowUtilsStepsTests.scala b/metalus-core/src/test/scala/com/acxiom/metalus/steps/FlowUtilsStepsTests.scala index e579cf86..e1a7bc15 100644 --- a/metalus-core/src/test/scala/com/acxiom/metalus/steps/FlowUtilsStepsTests.scala +++ b/metalus-core/src/test/scala/com/acxiom/metalus/steps/FlowUtilsStepsTests.scala @@ -1,10 +1,15 @@ package com.acxiom.metalus.steps -import com.acxiom.metalus.{Constants, EngineMeta, Parameter, Pipeline, PipelineExecutor, PipelineListener, PipelineStep, RetryPolicy, TestHelper} +import com.acxiom.metalus.connectors.{DataStreamOptions, LocalFileConnector} +import com.acxiom.metalus.sql.Row +import com.acxiom.metalus._ import org.scalatest.funspec.AnyFunSpec -class FlowUtilsStepsTests extends AnyFunSpec { +import java.io.File +import java.nio.file.{Files, StandardCopyOption} +import scala.collection.mutable.ListBuffer +class FlowUtilsStepsTests extends AnyFunSpec { val STRING_STEP: PipelineStep = PipelineStep(Some("STRINGSTEP"), Some("String Step"), None, Some("Pipeline"), Some(List(Parameter(Some("text"), Some("value"), Some(true), None, Some("lowercase")))), Some("RETRY"), None, None, None, None, None, None, None, Some(EngineMeta(Some("StringSteps.toUpperCase")))) @@ -39,6 +44,32 @@ class FlowUtilsStepsTests extends AnyFunSpec { } } + describe("streamData") { + val source = File.createTempFile("placeholder", ".txt") + source.deleteOnExit() + val dataFilePath = s"${source.getParentFile.getAbsolutePath}/MOCK_DATA.csv" + Files.copy(getClass.getResourceAsStream("/MOCK_DATA.csv"), + new File(dataFilePath).toPath, + StandardCopyOption.REPLACE_EXISTING) + val localFileConnector = LocalFileConnector("my-connector", None, None) + val options = DataStreamOptions(None, + Map("filePath" -> dataFilePath, "fileDelimiter" -> ",", "useHeader" -> true), + Constants.TWELVE) + it ("should process data in chunks") { + val rows = new ListBuffer[Row]() + val initialPipelineContext = TestHelper.generatePipelineContext().setGlobal("rowBuffer", rows) + FlowUtilsSteps.streamData(localFileConnector, "in-memory-flow", RetryPolicy(), Some(options), initialPipelineContext) + assert(rows.length == Constants.ONE_THOUSAND) + } + + it ("should throw an exception when pipeline fails") { + val thrown = intercept[PipelineException] { + FlowUtilsSteps.streamData(localFileConnector, "in-memory-fail-flow", RetryPolicy(), Some(options), TestHelper.generatePipelineContext()) + } + assert(thrown.getMessage == "Failed to process streaming data!") + } + } + describe("isEmpty") { it("should determine if object is empty") { assert(FlowUtilsSteps.isEmpty(None)) diff --git a/metalus-core/src/test/scala/com/acxiom/metalus/steps/MockStepObject.scala b/metalus-core/src/test/scala/com/acxiom/metalus/steps/MockStepObject.scala index f010b11a..11347620 100644 --- a/metalus-core/src/test/scala/com/acxiom/metalus/steps/MockStepObject.scala +++ b/metalus-core/src/test/scala/com/acxiom/metalus/steps/MockStepObject.scala @@ -1,9 +1,18 @@ package com.acxiom.metalus.steps +import com.acxiom.metalus.sql.{DataReference, InMemoryDataReference, Row} + +import scala.collection.mutable.ListBuffer + object MockStepObject { def mockStepFunctionAnyResponse(string: String): String = { string } + def validateDataReference(dataRef: DataReference[_], results: ListBuffer[Row]): Unit = { + assert(dataRef.isInstanceOf[InMemoryDataReference]) + val table = dataRef.asInstanceOf[InMemoryDataReference].execute + table.data.foreach(r => results += Row(r, Some(table.schema), None)) + } }