diff --git a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala index 95f80383..db682a5d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala @@ -10,6 +10,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.AvroSchemaSerializer +import org.apache.flink.streaming.api.graph.StreamGraph import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.data.RowData @@ -37,7 +38,8 @@ import scala.reflect.runtime.{universe => ru} abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( val config: FlinkConfig, val checkResultsOpt: Option[CheckResults[ADT]] = None, - val generatorFactoryOpt: Option[GeneratorFactory[ADT]] = None) + val generatorFactoryOpt: Option[GeneratorFactory[ADT]] = None, + val executeJob: Boolean = true) extends LazyLogging { val env: StreamExecutionEnvironment = @@ -57,6 +59,20 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( */ def getExecutionPlan: String = env.getExecutionPlan + /** Get the stream graph for the configured job. This is primarily useful + * for testing the stream jobs constructed in flinkrunner. It will throw + * an exception if you call it before running a job against this runner. + * If you only are interested in the stream graph and don't need the job + * to be executed, you can set executeJob = false when constructing the + * FlinkRunner instance. + * @return + * JobGraph + */ + def getStreamGraph: StreamGraph = env.getStreamGraph(false) + + def getStreamNodesInfo: Seq[StreamNodeInfo] = + StreamNodeInfo.from(getStreamGraph) + /** Executes the job graph. * @return * JobExecutionResult @@ -232,7 +248,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( * * @param sourceConfig * the source config - * @param fromRow + * @param fromRowData * an implicit method to convert a Row into an event of type E * @tparam E * the event data type @@ -266,17 +282,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( def addSink[E <: ADT: TypeInformation]( stream: DataStream[E], sinkName: String): Unit = - getSinkConfig(sinkName) match { - case s: CassandraSinkConfig[ADT] => s.addSink[E](stream) - case s: ElasticsearchSinkConfig[ADT] => s.addSink[E](stream) - case s: FileSinkConfig[ADT] => s.addSink[E](stream) - case s: JdbcSinkConfig[ADT] => s.addSink[E](stream) - case s: KafkaSinkConfig[ADT] => s.addSink[E](stream) - case s: KinesisSinkConfig[ADT] => s.addSink[E](stream) - case s: RabbitMQSinkConfig[ADT] => s.addSink[E](stream) - case s: SocketSinkConfig[ADT] => s.addSink[E](stream) - case s: IcebergSinkConfig[ADT] => s.addSink[E](stream) - } + getSinkConfig(sinkName).addSink[E](stream) def addAvroSink[ E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, diff --git a/src/main/scala/io/epiphanous/flinkrunner/flink/StreamJob.scala b/src/main/scala/io/epiphanous/flinkrunner/flink/StreamJob.scala index edd28f9c..eca0f33a 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/flink/StreamJob.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/flink/StreamJob.scala @@ -15,7 +15,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.Window -import org.apache.flink.table.data.RowData +import org.apache.flink.table.data.{GenericRowData, RowData} import org.apache.flink.util.Collector import squants.Quantity @@ -58,9 +58,12 @@ abstract class StreamJob[ IN <: ADT with EmbeddedRowType: TypeInformation]( seq: Seq[IN] = Seq.empty, name: Option[String] = None)(implicit - fromRowData: RowData => IN): DataStream[IN] = if (seq.nonEmpty) - runner.env.fromCollection[IN](seq) - else singleRowSource[IN](name.getOrElse(runner.getDefaultSourceName)) + fromRowData: RowData => IN): DataStream[IN] = if (seq.nonEmpty) { + val rd = seq.zipWithIndex.map { case (_, i) => + GenericRowData.of(Integer.valueOf(i)) + } + runner.env.fromCollection(rd).map(g => seq(g.getInt(0))) + } else singleRowSource[IN](name.getOrElse(runner.getDefaultSourceName)) /** Configure a single input source stream. * @param name @@ -330,6 +333,8 @@ abstract class StreamJob[ def maybeSink(out: DataStream[OUT]): Unit = if (runner.writeToSink) sink(out) + def buildJobGraph: DataStream[OUT] = transform |# maybeSink + /** Runs the job, meaning it constructs the flow and executes it. */ def run(): Unit = { @@ -339,27 +344,31 @@ abstract class StreamJob[ ) // build the job graph - val stream = transform |# maybeSink + val stream = buildJobGraph if (config.showPlan) logger.info(s"\nPLAN:\n${runner.getExecutionPlan}\n") - runner.checkResultsOpt match { + if (runner.executeJob) { + runner.checkResultsOpt match { - case Some(checkResults) => - logger.info( - s"routing job ${config.jobName} results back through CheckResults<${checkResults.name}>" - ) - checkResults.checkOutputEvents[OUT]( - stream.executeAndCollect( - config.jobName, - checkResults.collectLimit + case Some(checkResults) => + logger.info( + s"routing job ${config.jobName} results back through CheckResults<${checkResults.name}>.checkOutputEvents" + ) + checkResults.checkOutputEvents[OUT]( + stream.executeAndCollect( + config.jobName, + checkResults.collectLimit + ) ) - ) - case _ => - val result = runner.execute - logger.info(result.toString) + case _ => + val result = runner.execute + logger.info(result.toString) + } + } else { + logger.info("NOT EXECUTING JOB GRAPH") } } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala index ddd8405f..e0e64897 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala @@ -10,6 +10,7 @@ sealed trait FlinkConnectorName extends EnumEntry with Snakecase object FlinkConnectorName extends Enum[FlinkConnectorName] { val values: immutable.IndexedSeq[FlinkConnectorName] = findValues + case object Empty extends FlinkConnectorName case object Hybrid extends FlinkConnectorName case object Kinesis extends FlinkConnectorName diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/StreamNodeInfo.scala b/src/main/scala/io/epiphanous/flinkrunner/model/StreamNodeInfo.scala new file mode 100644 index 00000000..0e362203 --- /dev/null +++ b/src/main/scala/io/epiphanous/flinkrunner/model/StreamNodeInfo.scala @@ -0,0 +1,44 @@ +package io.epiphanous.flinkrunner.model + +import org.apache.flink.streaming.api.graph.StreamGraph + +import scala.collection.JavaConverters._ + +case class StreamNodeInfo( + id: Int, + name: String, + parallelism: Int, + inClasses: List[String], + outClass: Option[String]) { + val isSource: Boolean = inClasses.isEmpty + val isSink: Boolean = outClass.isEmpty + val isTransform: Boolean = !isSource && !isSink + val nodeKind: String = + if (isSource) "source" else if (isTransform) "transform" else "sink" + val simpleInClasses: List[String] = + inClasses.map(_.split("\\.").last) + val simpleOutClass: Option[String] = outClass.map(_.split("\\.").last) +} + +object StreamNodeInfo { + def from(sg: StreamGraph): Seq[StreamNodeInfo] = { + sg.getStreamNodes.asScala.map { sn => + val id = sn.getId + val name = sn.getOperatorName + val parallelism = sn.getParallelism + val inClasses = sn.getTypeSerializersIn.toList.map( + _.createInstance().getClass.getCanonicalName + ) + val outClass = + Option(sn.getTypeSerializerOut) + .map(_.createInstance().getClass.getCanonicalName) + StreamNodeInfo( + id = id, + name = name, + parallelism = parallelism, + inClasses = inClasses, + outClass = outClass + ) + }.toSeq + } +} diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/EmptySourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/EmptySourceConfig.scala new file mode 100644 index 00000000..a173c568 --- /dev/null +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/EmptySourceConfig.scala @@ -0,0 +1,42 @@ +package io.epiphanous.flinkrunner.model.source + +import io.epiphanous.flinkrunner.model._ +import org.apache.avro.generic.GenericRecord +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.streaming.api.scala.{ + DataStream, + StreamExecutionEnvironment +} +import org.apache.flink.table.data.{GenericRowData, RowData} + +case class EmptySourceConfig[ADT <: FlinkEvent]( + name: String, + config: FlinkConfig) + extends SourceConfig[ADT] { + override def connector: FlinkConnectorName = FlinkConnectorName.Empty + + def _emptySource[E: TypeInformation]( + env: StreamExecutionEnvironment): DataStream[E] = { + val x = env.fromCollection(Seq.empty[E]) + x + } + + override def getSourceStream[E <: ADT: TypeInformation]( + env: StreamExecutionEnvironment): DataStream[E] = + _emptySource[E](env) + + override def getAvroSourceStream[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + env: StreamExecutionEnvironment)(implicit + fromKV: EmbeddedAvroRecordInfo[A] => E): DataStream[E] = + _emptySource[E](env) + + override def getRowSourceStream[ + E <: ADT with EmbeddedRowType: TypeInformation]( + env: StreamExecutionEnvironment)(implicit + fromRowData: RowData => E): DataStream[E] = + _emptySource[GenericRowData](env).map(fromRowData) + +} diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala index d210f814..ec1f5884 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala @@ -265,6 +265,7 @@ object SourceConfig { config.jobName, config.getStringOpt(s"sources.$name.connector") ) match { + case Empty => EmptySourceConfig[ADT](name, config) case File => FileSourceConfig[ADT](name, config) case Hybrid => HybridSourceConfig[ADT](name, config) case Kafka => KafkaSourceConfig[ADT](name, config) diff --git a/src/test/scala/io/epiphanous/flinkrunner/FlinkRunnerSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/FlinkRunnerSpec.scala index 40e205d3..d786c8f7 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/FlinkRunnerSpec.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/FlinkRunnerSpec.scala @@ -11,17 +11,28 @@ import scala.reflect.runtime.{universe => ru} trait FlinkRunnerSpec { + val DEFAULT_CONFIG_STR: String = + """ + |sources{empty-source{}} + |sinks{print-sink{}} + |""".stripMargin + def getRunner[ IN <: ADT: TypeInformation, OUT <: ADT: TypeInformation, JF <: StreamJob[OUT, ADT], ADT <: FlinkEvent: TypeInformation]( - configStr: String, + configStr: String = DEFAULT_CONFIG_STR, jobFactory: JobFactory[JF, IN, OUT, ADT], checkResultsOpt: Option[CheckResults[ADT]] = None, - args: Array[String] = Array("testJob")): FlinkRunner[ADT] = { + args: Array[String] = Array("testJob"), + executeJob: Boolean = true): FlinkRunner[ADT] = { val config = new FlinkConfig(args, Some(configStr)) - new FlinkRunner[ADT](config, checkResultsOpt) { + new FlinkRunner[ADT]( + config, + checkResultsOpt = checkResultsOpt, + executeJob = executeJob + ) { override def invoke(jobName: String): Unit = jobFactory.getJob(this).run() } @@ -31,31 +42,35 @@ trait FlinkRunnerSpec { IN <: ADT: TypeInformation, OUT <: ADT: TypeInformation, ADT <: FlinkEvent: TypeInformation]( - configStr: String, + configStr: String = DEFAULT_CONFIG_STR, transformer: MapFunction[IN, OUT], input: Seq[IN] = Seq.empty, checkResultsOpt: Option[CheckResults[ADT]] = None, - args: Array[String] = Array("testJob")): FlinkRunner[ADT] = { + args: Array[String] = Array("testJob"), + executeJob: Boolean = true): FlinkRunner[ADT] = { getRunner( configStr, new StreamJobFactory[IN, OUT, ADT](transformer, input), checkResultsOpt, - args + args, + executeJob ) } def getIdentityStreamJobRunner[ OUT <: ADT: TypeInformation, ADT <: FlinkEvent: TypeInformation]( - configStr: String, + configStr: String = DEFAULT_CONFIG_STR, input: Seq[OUT] = Seq.empty, checkResultsOpt: Option[CheckResults[ADT]] = None, - args: Array[String] = Array("testJob")): FlinkRunner[ADT] = + args: Array[String] = Array("testJob"), + executeJob: Boolean = true): FlinkRunner[ADT] = getRunner( configStr, new IdentityStreamJobFactory(input), checkResultsOpt, - args + args, + executeJob ) def getAvroStreamJobRunner[ @@ -64,11 +79,12 @@ trait FlinkRunnerSpec { OUT <: ADT with EmbeddedAvroRecord[OUTA]: TypeInformation, OUTA <: GenericRecord: TypeInformation, ADT <: FlinkEvent: TypeInformation]( - configStr: String, + configStr: String = DEFAULT_CONFIG_STR, transformer: MapFunction[IN, OUT], input: Seq[IN] = Seq.empty, checkResultsOpt: Option[CheckResults[ADT]] = None, - args: Array[String] = Array("testJob"))(implicit + args: Array[String] = Array("testJob"), + executeJob: Boolean = true)(implicit fromKV: EmbeddedAvroRecordInfo[INA] => IN): FlinkRunner[ADT] = getRunner( configStr, @@ -77,96 +93,61 @@ trait FlinkRunnerSpec { input ), checkResultsOpt, - args + args, + executeJob ) def getIdentityAvroStreamJobRunner[ OUT <: ADT with EmbeddedAvroRecord[OUTA]: TypeInformation, OUTA <: GenericRecord: TypeInformation, ADT <: FlinkEvent: TypeInformation]( - configStr: String, + configStr: String = DEFAULT_CONFIG_STR, input: Seq[OUT] = Seq.empty, checkResultsOpt: Option[CheckResults[ADT]] = None, - args: Array[String] = Array("testJob"))(implicit + args: Array[String] = Array("testJob"), + executeJob: Boolean = true)(implicit fromKV: EmbeddedAvroRecordInfo[OUTA] => OUT): FlinkRunner[ADT] = getRunner( configStr, new IdentityAvroStreamJobFactory[OUT, OUTA, ADT](input), checkResultsOpt, - args + args, + executeJob ) def getTableStreamJobRunner[ IN <: ADT with EmbeddedRowType: TypeInformation: ru.TypeTag, OUT <: ADT with EmbeddedRowType: TypeInformation: ru.TypeTag, ADT <: FlinkEvent: TypeInformation]( - configStr: String, + configStr: String = DEFAULT_CONFIG_STR, transformer: MapFunction[IN, OUT], input: Seq[IN] = Seq.empty, checkResultsOpt: Option[CheckResults[ADT]] = None, - args: Array[String] = Array("testJob"))(implicit + args: Array[String] = Array("testJob"), + executeJob: Boolean = true)(implicit fromRowData: RowData => IN): FlinkRunner[ADT] = getRunner( configStr, new TableStreamJobFactory[IN, OUT, ADT](transformer, input), checkResultsOpt, - args + args, + executeJob ) def getIdentityTableStreamJobRunner[ OUT <: ADT with EmbeddedRowType: TypeInformation: ru.TypeTag, ADT <: FlinkEvent: TypeInformation]( - configStr: String, + configStr: String = DEFAULT_CONFIG_STR, input: Seq[OUT] = Seq.empty, checkResultsOpt: Option[CheckResults[ADT]] = None, - args: Array[String] = Array("testJob"))(implicit + args: Array[String] = Array("testJob"), + executeJob: Boolean = true)(implicit fromRowData: RowData => OUT): FlinkRunner[ADT] = getRunner( configStr, new IdentityTableStreamJobFactory[OUT, ADT](input), checkResultsOpt, - args + args, + executeJob ) - -// def getAvroTableStreamJobRunner[ -// IN <: ADT with EmbeddedAvroRecord[INA]: TypeInformation, -// INA <: GenericRecord: TypeInformation, -// OUT <: ADT with EmbeddedAvroRecord[ -// OUTA -// ] with EmbeddedRowType: TypeInformation, -// OUTA <: GenericRecord: TypeInformation, -// ADT <: FlinkEvent: TypeInformation]( -// configStr: String, -// transformer: MapFunction[IN, OUT], -// input: Seq[IN] = Seq.empty, -// checkResultsOpt: Option[CheckResults[ADT]] = None, -// args: Array[String] = Array("testJob"))(implicit -// fromKV: EmbeddedAvroRecordInfo[INA] => IN): FlinkRunner[ADT] = -// getRunner( -// configStr, -// new AvroTableStreamJobFactory[IN, INA, OUT, OUTA, ADT]( -// transformer, -// input -// ), -// checkResultsOpt, -// args -// ) -// -// def getIdentityAvroTableStreamJobRunner[ -// OUT <: ADT with EmbeddedAvroRecord[ -// OUTA -// ] with EmbeddedRowType: TypeInformation, -// OUTA <: GenericRecord: TypeInformation, -// ADT <: FlinkEvent: TypeInformation]( -// configStr: String, -// input: Seq[OUT] = Seq.empty, -// checkResultsOpt: Option[CheckResults[ADT]] = None, -// args: Array[String] = Array("testJob"))(implicit -// fromKV: EmbeddedAvroRecordInfo[OUTA] => OUT): FlinkRunner[ADT] = -// getRunner( -// configStr, -// new IdentityAvroTableStreamJobFactory[OUT, OUTA, ADT](input), -// checkResultsOpt, -// args -// ) } diff --git a/src/test/scala/io/epiphanous/flinkrunner/JobFactory.scala b/src/test/scala/io/epiphanous/flinkrunner/JobFactory.scala index 3dc6f03e..d2536dc7 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/JobFactory.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/JobFactory.scala @@ -99,7 +99,7 @@ class TableStreamJobFactory[ def getJob(runner: FlinkRunner[ADT]): TableStreamJob[OUT, ADT] = new TableStreamJob[OUT, ADT](runner) { override def transform: DataStream[OUT] = - seqOrSingleRowSource(input).map(transformer) + seqOrSingleRowSource[IN](input).map(transformer) } } @@ -113,39 +113,6 @@ class IdentityTableStreamJobFactory[ input ) -//@SerialVersionUID(1L) -//class AvroTableStreamJobFactory[ -// IN <: ADT with EmbeddedAvroRecord[INA]: TypeInformation, -// INA <: GenericRecord: TypeInformation, -// OUT <: ADT with EmbeddedAvroRecord[OUTA]: TypeInformation, -// OUTA <: GenericRecord: TypeInformation, -// ADT <: FlinkEvent: TypeInformation]( -// transformer: MapFunction[IN, OUT], -// input: Seq[IN] = Seq.empty)(implicit -// fromKV: EmbeddedAvroRecordInfo[INA] => IN) -// extends JobFactory[AvroTableStreamJob[OUT, OUTA, ADT], IN, OUT, ADT]( -// transformer, -// input -// ) { -// override def getJob( -// runner: FlinkRunner[ADT]): AvroTableStreamJob[OUT, OUTA, ADT] = -// new AvroTableStreamJob[OUT, OUTA, ADT](runner) { -// override def transform: DataStream[OUT] = -// seqOrSingleAvroSource[IN, INA](input).map(transformer) -// } -//} -// -//@SerialVersionUID(1L) -//class IdentityAvroTableStreamJobFactory[ -// OUT <: ADT with EmbeddedAvroRecord[OUTA]: TypeInformation, -// OUTA <: GenericRecord: TypeInformation, -// ADT <: FlinkEvent: TypeInformation](input: Seq[OUT] = Seq.empty)( -// implicit fromKV: EmbeddedAvroRecordInfo[OUTA] => OUT) -// extends AvroTableStreamJobFactory[OUT, OUTA, OUT, OUTA, ADT]( -// new IdentityMap[OUT], -// input -// ) - @SerialVersionUID(1L) class IdentityMap[A] extends MapFunction[A, A] { override def map(value: A): A = diff --git a/src/test/scala/io/epiphanous/flinkrunner/flink/StreamJobTest.scala b/src/test/scala/io/epiphanous/flinkrunner/flink/StreamJobTest.scala deleted file mode 100644 index 7ae07f58..00000000 --- a/src/test/scala/io/epiphanous/flinkrunner/flink/StreamJobTest.scala +++ /dev/null @@ -1,91 +0,0 @@ -//package io.epiphanous.flinkrunner.flink -// -//import io.epiphanous.flinkrunner.model._ -//import io.epiphanous.flinkrunner.{FlinkRunner, PropSpec} -//import org.apache.flink.api.common.JobExecutionResult -//import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction -//import org.apache.flink.streaming.api.scala._ -//import org.apache.flink.util.Collector -// -//class StreamJobTest extends PropSpec { -// -// property("singleSource") { -// val in = genPop[BWrapper](10) -// val outLen = in.count(_.value.b2.nonEmpty) -// val config = new FlinkConfig( -// Array("singleSource") -// ) -// new FlinkRunner[MyAvroADT]( -// config, -// Map("in" -> in), -// out => { -// println("========================================") -// out.foreach(println) -// out.length shouldEqual outLen -// out.forall(_.isInstanceOf[AWrapper]) shouldBe true -// } -// ) { -// override def invoke( -// jobName: String): Either[List[_], JobExecutionResult] = { -// jobName match { -// case "singleSource" => new SingleSourceTestJob(this).run() -// } -// } -// }.process() -// } -//} -// -//class SingleSourceTestJob(runner: FlinkRunner[MyAvroADT]) -// extends StreamJob[AWrapper, MyAvroADT](runner) { -// override def transform: DataStream[AWrapper] = -// singleSource[BWrapper]("test").flatMap { bw => -// val b = bw.$record -// b -// .map(d => -// List(AWrapper(ARecord(b.b0, b.b1.getOrElse(1), d, b.b3))) -// ) -// .getOrElse(List.empty) -// } -//} -// -//class BroadcastConnectedTestJob(runner: FlinkRunner[MySimpleADT]) -// extends StreamJob[SimpleC, MySimpleADT](runner) { -// override def transform: DataStream[SimpleC] = -// broadcastConnectedSource[SimpleA, SimpleB, String]("a", "b", _.$key) -// .process( -// new KeyedBroadcastProcessFunction[ -// String, -// SimpleA, -// SimpleB, -// SimpleC] { -// override def processElement( -// value: SimpleA, -// ctx: KeyedBroadcastProcessFunction[ -// String, -// SimpleA, -// SimpleB, -// SimpleC]#ReadOnlyContext, -// out: Collector[SimpleC]): Unit = ??? -// -// override def processBroadcastElement( -// value: SimpleB, -// ctx: KeyedBroadcastProcessFunction[ -// String, -// SimpleA, -// SimpleB, -// SimpleC]#Context, -// out: Collector[SimpleC]): Unit = ??? -// } -// ) -//} -// -//class FilterByControlTestJob(runner: FlinkRunner[MySimpleADT]) -// extends StreamJob[SimpleB, MySimpleADT](runner) { -// override def transform: DataStream[SimpleB] = -// filterByControlSource[SimpleA, SimpleB, String]( -// "a", -// "b", -// _.$key, -// _.$key -// ) -//} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/NothingADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/NothingADT.scala deleted file mode 100644 index ddb86401..00000000 --- a/src/test/scala/io/epiphanous/flinkrunner/model/NothingADT.scala +++ /dev/null @@ -1,3 +0,0 @@ -package io.epiphanous.flinkrunner.model - -sealed trait NothingADT extends FlinkEvent diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/StreamJobSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/StreamJobSpec.scala new file mode 100644 index 00000000..2cb75cf7 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/StreamJobSpec.scala @@ -0,0 +1,126 @@ +package io.epiphanous.flinkrunner.model + +import io.epiphanous.flinkrunner.{FlinkRunner, PropSpec} +import org.apache.flink.api.scala.createTypeInformation +import org.scalatest.Assertion + +class StreamJobSpec extends PropSpec { + + def isSingleSourceOf( + runner: FlinkRunner[_], + outClass: String): Assertion = { + val sources = runner.getStreamNodesInfo.filter(_.isSource) + sources.size shouldBe 1 + sources.flatMap(_.simpleOutClass) shouldEqual List(outClass) + } + + def isSingleSinkOf( + runner: FlinkRunner[_], + inClass: String): Assertion = { + val sinks = runner.getStreamNodesInfo.filter(_.isSink) + sinks.size shouldBe 1 + sinks.flatMap(_.simpleInClasses) shouldEqual List(inClass) + } + + def testSingleSource(seq: Seq[SimpleA] = Seq.empty): Assertion = { + val runner = getIdentityStreamJobRunner[SimpleA, MySimpleADT]( + executeJob = false + ) + runner.process() + isSingleSourceOf(runner, "SimpleA") + } + + def testSingleSink(seq: Seq[SimpleA] = Seq.empty): Assertion = { + val runner = getIdentityStreamJobRunner[SimpleA, MySimpleADT]( + executeJob = false + ) + runner.process() + isSingleSinkOf(runner, "SimpleA") + } + + def testSingleAvroSource(seq: Seq[BWrapper] = Seq.empty): Assertion = { + val runner = + getIdentityAvroStreamJobRunner[BWrapper, BRecord, MyAvroADT]( + input = seq, + executeJob = false + ) + runner.process() + isSingleSourceOf(runner, "BWrapper") + } + + def testSingleAvroSink(seq: Seq[BWrapper] = Seq.empty): Assertion = { + val runner = + getIdentityAvroStreamJobRunner[BWrapper, BRecord, MyAvroADT]( + input = seq, + executeJob = false + ) + runner.process() + isSingleSinkOf(runner, "BWrapper") + } + + def testSingleRowSource(seq: Seq[SimpleA] = Seq.empty): Assertion = { + val runner = getIdentityTableStreamJobRunner[SimpleA, MySimpleADT]( + input = seq, + executeJob = false + ) + runner.process() + isSingleSourceOf(runner, "GenericRowData") + } + + def testSingleRowSink(seq: Seq[SimpleA] = Seq.empty): Assertion = { + val runner = getIdentityTableStreamJobRunner[SimpleA, MySimpleADT]( + input = seq, + executeJob = false + ) + runner.process() + isSingleSinkOf(runner, "Row") + } + + property("singleAvroSource property") { + testSingleAvroSource() + } + + property("seqOrSingleAvroSource property") { + testSingleAvroSource(genPop[BWrapper]()) + } + + property("sink property") { + testSingleSink() + testSingleSink(genPop[SimpleA]()) + testSingleAvroSink() + testSingleAvroSink(genPop[BWrapper]()) + testSingleRowSink() + testSingleRowSink(genPop[SimpleA]()) + } + + property("singleSource property") { + testSingleSource() + } + + property("singleRowSource property") { + testSingleRowSource() + } + + property("filterByControlSource property") {} + + property("seqOrSingleSource property") { + testSingleSource(genPop[SimpleA]()) + } + + property("seqOrSingleRowSource property") { + testSingleRowSource(genPop[SimpleA]()) + } + + property("run property") {} + + property("windowedAggregation property") {} + + property("broadcastConnectedSource property") {} + + property("transform property") {} + + property("maybeSink property") {} + + property("connectedSource property") {} + +}