diff --git a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala index 3729ad42..b9e33702 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala @@ -5,9 +5,11 @@ import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.model.sink._ import io.epiphanous.flinkrunner.model.source._ +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.AvroSchemaSerializer import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment @@ -42,6 +44,11 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) + env.getConfig.addDefaultKryoSerializer( + classOf[Schema], + classOf[AvroSchemaSerializer] + ) + /** Gets (and returns as string) the execution plan for the job from the * StreamExecutionEnvironment. * @return diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala index f4141514..b93f9943 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala @@ -1,6 +1,7 @@ package io.epiphanous.flinkrunner.model.sink import com.datastax.driver.core.{Cluster, CodecRegistry} +import com.datastax.driver.extras.codecs.jdk8.InstantCodec import io.epiphanous.flinkrunner.model.{ EmbeddedAvroRecord, FlinkConfig, @@ -13,7 +14,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.connectors.cassandra._ -import com.datastax.driver.extras.codecs.jdk8.InstantCodec /** A cassandra sink config. * diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index 89b38c3d..f47b9069 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -572,31 +572,38 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( * @return * JdbcStatementBuilder[E] */ - def getStatementBuilder[E <: ADT]: JdbcStatementBuilder[E] = { - case (statement, element) => - _fillInStatement( - element.getClass.getDeclaredFields - .map(_.getName) - .zip(element.productIterator.toIndexedSeq) - .toMap, - statement, - element - ) + def getStatementBuilder[E <: ADT]: JdbcStatementBuilder[E] = + new JdbcStatementBuilder[E] { + override def accept(statement: PreparedStatement, element: E): Unit = + _fillInStatement( + fieldValuesOf(element), + statement, + element + ) + } + + def fieldValuesOf[T <: Product](product: T): Map[String, Any] = { + product.getClass.getDeclaredFields + .map(_.getName) + .zip(product.productIterator.toIndexedSeq) + .toMap } def getAvroStatementBuilder[ E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, - A <: GenericRecord: TypeInformation]: JdbcStatementBuilder[E] = { - case (statement, element) => - _fillInStatement[E]( - element.$record.getSchema.getFields.asScala - .map(_.name) - .map(f => (f, element.$record.get(f))) - .toMap, - statement, - element - ) - } + A <: GenericRecord: TypeInformation]: JdbcStatementBuilder[E] = + new JdbcStatementBuilder[E] { + override def accept( + statement: PreparedStatement, + element: E): Unit = { + println(s"XXX: $element") + _fillInStatement[E]( + fieldValuesOf(element.$record.asInstanceOf[Product]), + statement, + element + ) + } + } def _fillInStatement[E <: ADT]( data: Map[String, Any], diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala index 16b47013..eb51fa5b 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala @@ -70,17 +70,17 @@ object SinkConfig { config.jobName, config.getStringOpt(s"sinks.$name.connector") ) match { - case Kafka => KafkaSinkConfig(name, config) - case Kinesis => KinesisSinkConfig(name, config) - case File => FileSinkConfig(name, config) - case Socket => SocketSinkConfig(name, config) - case Jdbc => JdbcSinkConfig(name, config) + case Kafka => KafkaSinkConfig(name, config) + case Kinesis => KinesisSinkConfig(name, config) + case File => FileSinkConfig(name, config) + case Socket => SocketSinkConfig(name, config) + case Jdbc => JdbcSinkConfig(name, config) case Cassandra => CassandraSinkConfig(name, config) case Elasticsearch => ElasticsearchSinkConfig(name, config) - case RabbitMQ => RabbitMQSinkConfig(name, config) - case connector => + case RabbitMQ => RabbitMQSinkConfig(name, config) + case connector => throw new RuntimeException( s"Don't know how to configure ${connector.entryName} sink connector $name (job ${config.jobName}" ) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala index 555c9aa9..4839b35d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala @@ -2,6 +2,7 @@ package io.epiphanous.flinkrunner.model.source import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.serde._ +import io.epiphanous.flinkrunner.util.AvroUtils.toEmbeddedAvroInstance import io.epiphanous.flinkrunner.util.ConfigToProps.getFromEither import io.epiphanous.flinkrunner.util.FileUtils.getResourceOrFile import org.apache.avro.Schema @@ -294,13 +295,13 @@ case class FileSourceConfig[ADT <: FlinkEvent]( case StreamFormatName.Avro => val avroInputFormat = new AvroInputFormat( origin, - implicitly[TypeInformation[A]].getTypeClass + classOf[GenericRecord] ) avroInputFormat.setNestedFileEnumeration(true) if (wantsFiltering) avroInputFormat.setFilesFilter(fileFilter) nameAndWatermark( env - .readFile[A]( + .readFile[GenericRecord]( avroInputFormat, path, if (monitorDuration > 0) @@ -310,7 +311,13 @@ case class FileSourceConfig[ADT <: FlinkEvent]( ) .uid(s"avro:$label") .name(s"avro:$label") - .map((a: A) => fromKV(EmbeddedAvroRecordInfo(a, config))), + .map(g => + toEmbeddedAvroInstance[E, A, ADT]( + g, + implicitly[TypeInformation[A]].getTypeClass, + config + ) + ), label ) case _ => diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala index dc430432..229afb2e 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala @@ -1,5 +1,7 @@ package io.epiphanous.flinkrunner.model +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + import java.time.Instant sealed trait MySimpleADT extends FlinkEvent @@ -36,7 +38,9 @@ case class SimpleB( id: String, b0: String, b1: Double, - b2: Option[Int], + @JsonDeserialize(contentAs = classOf[java.lang.Integer]) b2: Option[ + Int + ], ts: Instant) extends MySimpleADT { override def $id: String = id diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraJobTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraJobTest.scala deleted file mode 100644 index 845295b8..00000000 --- a/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraJobTest.scala +++ /dev/null @@ -1,89 +0,0 @@ -package io.epiphanous.flinkrunner.model.sink - -import com.datastax.driver.core -import com.dimafeng.testcontainers.CassandraContainer -import io.epiphanous.flinkrunner.model.{CheckResults, MySimpleADT, SimpleA} -import io.epiphanous.flinkrunner.{FlinkRunner, FlinkRunnerSpec} -import org.apache.flink.api.scala.createTypeInformation - -import java.time.Instant -import scala.collection.JavaConverters._ - -class CassandraJobTest extends FlinkRunnerSpec { - val cassandra = new CassandraContainer() - - ignore("sink works") { - cassandra.start() - val c = cassandra.container - val session = c.getCluster.newSession() - - session.execute(""" - |create keyspace if not exists simple_adt - | with replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; - |""".stripMargin) - - session.execute(""" - |create table if not exists simple_adt.simple_a ( - | id text, - | a0 text, - | a1 int, - | ts timestamp, - | primary key(id) - |); - |""".stripMargin) - - val factory = (runner: FlinkRunner[MySimpleADT]) => - new SimpleIdentityJob[SimpleA](runner) - -// val checkResults: CheckResults[MySimpleADT] = -// new CheckResults[MySimpleADT] { -// override val name: String = "cassandra-test" -// -// override def getInputEvents[IN <: MySimpleADT]( -// sourceName: String): List[IN] = -// genPop[SimpleA](collectLimit).asInstanceOf[List[IN]] -// -//// override val writeToSink: Boolean = true -// -// override val collectLimit: Int = 10 -// -// override def checkOutputEvents[OUT <: MySimpleADT]( -// out: List[OUT]): Unit = -// out.foreach(println) -// } - testStreamJob( - s""" - |execution.runtime-mode = batch - |jobs { - | testJob { - | sources { - | file-source { - | path = "resource://SampleA.csv" - | format = csv - | } - | } - | sinks { - | cassandra-test { - | host = ${c.getHost} - | port = ${c.getMappedPort(9042)} - | query = "insert into simple_adt.simple_a (id, a0, a1, ts) values (?, ?, ?, ?);" - | } - | } - | } - |} - |""".stripMargin, - factory - ) - for ( - row <- - session.execute("select * from simple_adt.simple_a").asScala - ) { - println(row.get(0, classOf[String])) - println(row.get(1, classOf[String])) - println(row.get(2, classOf[Int])) - println(row.get(3, classOf[Instant])) - } - session.close() - cassandra.stop() - } -} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkSpec.scala new file mode 100644 index 00000000..b4796eed --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkSpec.scala @@ -0,0 +1,57 @@ +package io.epiphanous.flinkrunner.model.sink + +import com.dimafeng.testcontainers.CassandraContainer +import io.epiphanous.flinkrunner.model.SimpleA +import org.apache.flink.api.scala.createTypeInformation + +import java.time.Instant +import scala.collection.JavaConverters._ + +class CassandraSinkSpec extends SinkSpec { + val cassandra = new CassandraContainer() + + // todo: fix this test + ignore("cassandra sink works") { + cassandra.start() + val c = cassandra.container + val session = c.getCluster.newSession() + + session.execute(""" + |create keyspace if not exists simple_adt + | with replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; + |""".stripMargin) + + session.execute(""" + |create table if not exists simple_adt.simple_a ( + | id text, + | a0 text, + | a1 int, + | ts timestamp, + | primary key(id) + |); + |""".stripMargin) + + testJob[SimpleA]( + s""" + |cassandra-test { + | host = ${c.getHost} + | port = ${c.getMappedPort(9042)} + | query = "insert into simple_adt.simple_a (id, a0, a1, ts) values (?, ?, ?, ?);" + |} + |""".stripMargin, + "resource://SampleA.csv" + ) + +// for ( +// row <- +// session.execute("select * from simple_adt.simple_a").asScala +// ) { +// println(row.get(0, classOf[String])) +// println(row.get(1, classOf[String])) +// println(row.get(2, classOf[Int])) +// println(row.get(3, classOf[Instant])) +// } + session.close() + cassandra.stop() + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala index 856e9198..29fa3225 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala @@ -1,134 +1,77 @@ package io.epiphanous.flinkrunner.model.sink -import io.epiphanous.flinkrunner.flink.StreamJob -import io.epiphanous.flinkrunner.model.{MySimpleADT, SimpleB} -import io.epiphanous.flinkrunner.{FlinkRunner, FlinkRunnerSpec} -import org.apache.flink.api.common.typeinfo.TypeInformation +import com.dimafeng.testcontainers.PostgreSQLContainer +import io.epiphanous.flinkrunner.model.{ + BRecord, + BWrapper, + SimpleB, + StreamFormatName +} import org.apache.flink.api.scala.createTypeInformation -import org.apache.flink.streaming.api.scala.DataStream -import java.sql.DriverManager +import java.time.Instant +import java.util.Properties -class JdbcSinkJobTest extends FlinkRunnerSpec { +class JdbcSinkJobTest extends SinkSpec { -// val pgContainer: PostgreSQLContainer = PostgreSQLContainer() - val pgContainer = new Object() { - val databaseName = "test" - val schema = "public" - val jdbcUrl = "jdbc:postgresql://localhost:5432/test" - val username = "test" - val password = "test" - } + val pgContainer: PostgreSQLContainer = PostgreSQLContainer() - // ignore since it's manual - ignore("write job results to sink") { -// pgContainer.start() - val configStr = + property("jdbc sink works") { + pgContainer.start() + testJob[SimpleB]( s""" - |sinks { - | jdbc-test { - | connection = { - | database = "${pgContainer.databaseName}" - | schema = "${pgContainer.schema}" - | url = "${pgContainer.jdbcUrl}" - | username = "${pgContainer.username}" - | password = "${pgContainer.password}" - | } - | table { - | name = "sample_b" - | columns = [ - | { - | name = id - | type = VARCHAR - | precision = 36 - | primary.key = 1 - | } - | { - | name = b0 - | type = VARCHAR - | precision = 255 - | nullable = false - | } - | { - | name = b1 - | type = DOUBLE - | nullable = false - | } - | { - | name = b2 - | type = INTEGER - | } - | { - | name = ts - | type = TIMESTAMP - | nullable = false - | } - | ] - | } - | } - |} - |sources { - | test-file { - | path = "resource://SampleB.csv" - | format = csv - | } - |} - |jobs { - | testJob { - | show.plan = true - | } - |} - |execution.runtime-mode = batch - |""".stripMargin -// val checkResults: CheckResults[MySimpleADT] = -// new CheckResults[MySimpleADT] { -// override val name = "check postgresql table" -// override val writeToSink = false -// override def getInputEvents[IN <: MySimpleADT: TypeInformation]( -// sourceConfig: SourceConfig[MySimpleADT]): List[IN] = -// genPop[SimpleB]().asInstanceOf[List[IN]] -// -// override def checkOutputEvents[ -// OUT <: MySimpleADT: TypeInformation]( -// sinkConfig: SinkConfig[MySimpleADT], -// out: List[OUT]): Unit = { -// logger.debug(out.mkString("\n")) -// sinkConfig match { -// case sc: JdbcSinkConfig[MySimpleADT] => -// sc.getConnection -// .fold( -// t => -// throw new RuntimeException( -// "failed to connect to test database", -// t -// ), -// conn => { -// val rs = conn -// .createStatement() -// .executeQuery(s"select * from ${sc.table}") -// while (rs.next()) { -// val row = rs.getRow -// logger.debug( -// s"$row - ${Range(1, 6).map(i => rs.getString(i)).mkString("|")}" -// ) -// } -// } -// ) -// case _ => logger.debug("Oops") -// } -// } -// } - - val factory = (runner: FlinkRunner[MySimpleADT]) => - new SimpleIdentityJob[SimpleB](runner) - testStreamJob(configStr, factory) - val conn = DriverManager.getConnection( - pgContainer.jdbcUrl, - pgContainer.username, - pgContainer.password + | jdbc-test { + | connection = { + | database = "${pgContainer.databaseName}" + | schema = public + | url = "${pgContainer.jdbcUrl}" + | username = "${pgContainer.username}" + | password = "${pgContainer.password}" + | } + | table { + | name = "sample_b" + | columns = [ + | { + | name = id + | type = VARCHAR + | precision = 36 + | primary.key = 1 + | } + | { + | name = b0 + | type = VARCHAR + | precision = 255 + | nullable = false + | } + | { + | name = b1 + | type = DOUBLE + | nullable = false + | } + | { + | name = b2 + | type = INTEGER + | } + | { + | name = ts + | type = TIMESTAMP + | nullable = false + | } + | ] + | } + | } + |""".stripMargin, + "resource://SampleB.csv", + otherJobConfig = "show.plan = true" ) - val stmt = conn.createStatement() - val rs = stmt.executeQuery("select * from sample_b") + + val props = new Properties() + props.put("user", pgContainer.username) + props.put("password", pgContainer.password) + val conn = + pgContainer.jdbcDriverInstance.connect(pgContainer.jdbcUrl, props) + val stmt = conn.createStatement() + val rs = stmt.executeQuery("select * from sample_b") while (rs.next()) { println( rs.getRow + "|" + rs.getString("id").trim() + "|" + rs.getString( @@ -141,6 +84,79 @@ class JdbcSinkJobTest extends FlinkRunnerSpec { } stmt.close() conn.close() - // pgContainer.stop() + pgContainer.stop() + } + + property("jdbc avro sink works") { + pgContainer.start() + val pop = genPop[BWrapper](10) + pop.foreach(println) + val database = pgContainer.databaseName + val url = pgContainer.jdbcUrl + val user = pgContainer.username + val pw = pgContainer.password + getTempFile(StreamFormatName.Avro).map { path => + val avroFile = path.toString + writeFile(avroFile, StreamFormatName.Avro, pop) + testAvroJob[BWrapper, BRecord]( + s""" + | jdbc-test { + | connection = { + | database = "$database" + | schema = public + | url = "$url" + | username = "$user" + | password = "$pw" + | } + | table { + | name = "b_record" + | columns = [ + | { + | name = b0 + | type = VARCHAR + | precision = 36 + | primary.key = 1 + | } + | { + | name = b1 + | type = INTEGER + | nullable = true + | } + | { + | name = b2 + | type = DOUBLE + | nullable = true + | } + | { + | name = b3 + | type = TIMESTAMP + | nullable = false + | } + | ] + | } + | } + |""".stripMargin, + sourceFile = avroFile, + sourceFormat = "avro" + ) + val props = new Properties() + props.put("user", user) + props.put("password", pw) + val conn = + pgContainer.jdbcDriverInstance.connect(url, props) + val stmt = conn.createStatement() + val rs = stmt.executeQuery("select * from b_record") + while (rs.next()) { + val row = rs.getRow + val b0 = rs.getString("b0").trim() + val b1 = Option(rs.getInt("b1")) + val b2 = Option(rs.getDouble("b2")) + val b3 = Instant.ofEpochMilli(rs.getTimestamp("b3").getTime) + println(row -> BRecord(b0, b1, b2, b3)) + } + stmt.close() + conn.close() + } + pgContainer.stop() } } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleAvroIdentityJob.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleAvroIdentityJob.scala new file mode 100644 index 00000000..43e40bab --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleAvroIdentityJob.scala @@ -0,0 +1,23 @@ +package io.epiphanous.flinkrunner.model.sink + +import io.epiphanous.flinkrunner.FlinkRunner +import io.epiphanous.flinkrunner.flink.AvroStreamJob +import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, EmbeddedAvroRecordInfo, MyAvroADT} +import org.apache.avro.generic.GenericRecord +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.streaming.api.scala.DataStream + +class SimpleAvroIdentityJob[ + E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation](runner: FlinkRunner[MyAvroADT])( + implicit fromKV: EmbeddedAvroRecordInfo[A] => E) + extends AvroStreamJob[E, A, MyAvroADT](runner) { + + override def transform: DataStream[E] = { + singleAvroSource[E, A]().map { e: E => + println(e.$record.toString) + e + } + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/SinkSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SinkSpec.scala new file mode 100644 index 00000000..89f7ec33 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SinkSpec.scala @@ -0,0 +1,86 @@ +package io.epiphanous.flinkrunner.model.sink + +import io.epiphanous.flinkrunner.model._ +import io.epiphanous.flinkrunner.{FlinkRunner, FlinkRunnerSpec} +import org.apache.avro.generic.GenericRecord +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.createTypeInformation + +class SinkSpec extends FlinkRunnerSpec with AvroFileTestUtils { + + def getFactory[E <: MySimpleADT: TypeInformation] + : FlinkRunner[MySimpleADT] => SimpleIdentityJob[E] = + (runner: FlinkRunner[MySimpleADT]) => new SimpleIdentityJob[E](runner) + + def getAvroFactory[ + E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation](implicit + fromKV: EmbeddedAvroRecordInfo[A] => E) + : FlinkRunner[MyAvroADT] => SimpleAvroIdentityJob[ + E, + A + ] = (runner: FlinkRunner[MyAvroADT]) => + new SimpleAvroIdentityJob[E, A](runner) + + def getJobConfig( + sinkConfigStr: String, + sourceFile: String, + sourceFormat: String = "csv", + batchMode: Boolean = true, + otherJobConfig: String = ""): String = + s""" + |${if (batchMode) "runtime.execution-mode=batch" else ""} + |jobs { + | testJob { + | $otherJobConfig + | sources { + | file-source { + | path = "$sourceFile" + | format = $sourceFormat + | } + | } + | sinks { + | $sinkConfigStr + | } + | } + |} + |""".stripMargin + + def testJob[E <: MySimpleADT: TypeInformation]( + sinkConfigStr: String, + sourceFile: String, + sourceFormat: String = "csv", + batchMode: Boolean = true, + otherJobConfig: String = "runtime.execution-mode = batch"): Unit = + testStreamJob( + getJobConfig( + sinkConfigStr, + sourceFile, + sourceFormat, + batchMode, + otherJobConfig + ), + getFactory[E] + ) + + def testAvroJob[ + E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + sinkConfigStr: String, + sourceFile: String, + sourceFormat: String = "csv", + batchMode: Boolean = true, + otherJobConfig: String = "")(implicit + fromKV: EmbeddedAvroRecordInfo[A] => E): Unit = + testAvroStreamJob( + getJobConfig( + sinkConfigStr, + sourceFile, + sourceFormat, + batchMode, + otherJobConfig + ), + getAvroFactory[E, A] + ) + +}