diff --git a/build.sbt b/build.sbt index 267749b7..5e387d93 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,7 @@ val V = new { val scalaTestPlus = "3.2.14.0" val scalaCheck = "1.17.0" val testContainersScala = "0.40.11" - val jackson = "2.13.4" + val jackson = "2.14.0" val circe = "0.14.2" val http4s = "0.23.12" val enumeratum = "1.7.0" @@ -50,6 +50,7 @@ val V = new { val jdbcPg = "42.5.0" val jdbcMssql = "11.2.0.jre11" val hadoop = "3.3.2" + val cassandraDriver = "3.11.3" val jna = "5.12.1" // needed for testcontainers in some jvms } @@ -130,7 +131,9 @@ val otherDeps = Seq( "org.postgresql" % "postgresql" % V.jdbcPg % Provided, "com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test, "net.java.dev.jna" % "jna" % V.jna % Test, - "com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided + "com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided, + "com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test, + "com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided ) ++ Seq("org.apache.parquet" % "parquet-avro" % V.parquet % Provided).map( m => diff --git a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala index 7c181df3..b9e33702 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala @@ -5,9 +5,11 @@ import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.model.sink._ import io.epiphanous.flinkrunner.model.source._ +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.AvroSchemaSerializer import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment @@ -42,6 +44,11 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) + env.getConfig.addDefaultKryoSerializer( + classOf[Schema], + classOf[AvroSchemaSerializer] + ) + /** Gets (and returns as string) the execution plan for the job from the * StreamExecutionEnvironment. * @return @@ -261,7 +268,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( def toSink[E <: ADT: TypeInformation]( stream: DataStream[E], sinkName: String - ): Object = + ): DataStreamSink[E] = configToSink[E](stream, getSinkConfig(sinkName)) /** Create an avro-encoded stream sink from configuration. @@ -282,7 +289,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( A <: GenericRecord: TypeInformation]( stream: DataStream[E], sinkName: String - ): Object = + ): DataStreamSink[E] = configToAvroSink[E, A](stream, getSinkConfig(sinkName)) def getSinkConfig( @@ -299,7 +306,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( def configToSink[E <: ADT: TypeInformation]( stream: DataStream[E], - sinkConfig: SinkConfig[ADT]): Object = + sinkConfig: SinkConfig[ADT]): DataStreamSink[E] = sinkConfig match { case s: CassandraSinkConfig[ADT] => s.getSink[E](stream) case s: ElasticsearchSinkConfig[ADT] => s.getSink[E](stream) @@ -317,11 +324,13 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation]( stream: DataStream[E], sinkConfig: SinkConfig[ADT]): DataStreamSink[E] = sinkConfig match { - case s: KafkaSinkConfig[ADT] => s.getAvroSink[E, A](stream) - case s: FileSinkConfig[ADT] => s.getAvroSink[E, A](stream) - case s => - throw new RuntimeException( - s"Avro serialization not supported for ${s.connector} sinks" - ) + case s: CassandraSinkConfig[ADT] => s.getAvroSink[E, A](stream) + case s: ElasticsearchSinkConfig[ADT] => s.getAvroSink[E, A](stream) + case s: FileSinkConfig[ADT] => s.getAvroSink[E, A](stream) + case s: JdbcSinkConfig[ADT] => s.getAvroSink[E, A](stream) + case s: KafkaSinkConfig[ADT] => s.getAvroSink[E, A](stream) + case s: KinesisSinkConfig[ADT] => s.getAvroSink[E, A](stream) + case s: RabbitMQSinkConfig[ADT] => s.getAvroSink[E, A](stream) + case s: SocketSinkConfig[ADT] => s.getAvroSink[E, A](stream) } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala index 9a249595..855d9539 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala @@ -20,9 +20,9 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] { case object Socket extends FlinkConnectorName - case object CassandraSink extends FlinkConnectorName + case object Cassandra extends FlinkConnectorName - case object ElasticsearchSink extends FlinkConnectorName + case object Elasticsearch extends FlinkConnectorName case object Jdbc extends FlinkConnectorName @@ -31,7 +31,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] { case object Generator extends FlinkConnectorName val sources: immutable.Seq[FlinkConnectorName] = - values diff IndexedSeq(CassandraSink, ElasticsearchSink) + values diff IndexedSeq(Cassandra, Elasticsearch) val sinks: immutable.Seq[FlinkConnectorName] = values diff IndexedSeq(Hybrid, Generator) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala index 2be4494c..b93f9943 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkConfig.scala @@ -1,12 +1,19 @@ package io.epiphanous.flinkrunner.model.sink +import com.datastax.driver.core.{Cluster, CodecRegistry} +import com.datastax.driver.extras.codecs.jdk8.InstantCodec import io.epiphanous.flinkrunner.model.{ + EmbeddedAvroRecord, FlinkConfig, FlinkConnectorName, FlinkEvent } +import io.epiphanous.flinkrunner.util.AvroUtils.RichGenericRecord +import org.apache.avro.generic.GenericRecord +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala.DataStream -import org.apache.flink.streaming.connectors.cassandra.CassandraSink +import org.apache.flink.streaming.connectors.cassandra._ /** A cassandra sink config. * @@ -28,17 +35,52 @@ case class CassandraSinkConfig[ADT <: FlinkEvent]( ) extends SinkConfig[ADT] { override val connector: FlinkConnectorName = - FlinkConnectorName.CassandraSink + FlinkConnectorName.Cassandra - val host: String = config.getString(pfx("host")) + val host: String = + config.getStringOpt(pfx("host")).getOrElse("localhost") + val port: Int = config.getIntOpt(pfx("port")).getOrElse(9042) val query: String = config.getString(pfx("query")) - def getSink[E <: ADT](stream: DataStream[E]): CassandraSink[E] = - CassandraSink - .addSink(stream) - .setHost(host) - .setQuery(query) - .build() + /** Don't convert to single abstract method...flink will complain + */ + val clusterBuilder: ClusterBuilder = new ClusterBuilder { + override def buildCluster(builder: Cluster.Builder): Cluster = + builder + .addContactPoint(host) + .withPort(port) + .withoutJMXReporting() + .withCodecRegistry( + new CodecRegistry().register(InstantCodec.instance) + ) + .build() + } + + def getSink[E <: ADT: TypeInformation]( + stream: DataStream[E]): DataStreamSink[E] = { + stream + .addSink(new CassandraScalaProductSink[E](query, clusterBuilder)) .uid(label) .name(label) + } + + override def getAvroSink[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + stream: DataStream[E]): DataStreamSink[E] = + stream + .addSink( + new AbstractCassandraTupleSink[E]( + query, + clusterBuilder, + CassandraSinkBaseConfig.newBuilder().build(), + new NoOpCassandraFailureHandler() + ) { + override def extract(record: E): Array[AnyRef] = + record.$record.getDataAsSeq.toArray + } + ) + .uid(label) + .name(label) + } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/ElasticsearchSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/ElasticsearchSinkConfig.scala index ddc2015d..a7d927f4 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/ElasticsearchSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/ElasticsearchSinkConfig.scala @@ -2,10 +2,14 @@ package io.epiphanous.flinkrunner.model.sink import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model.{ + EmbeddedAvroRecord, FlinkConfig, FlinkConnectorName, FlinkEvent } +import io.epiphanous.flinkrunner.util.AvroUtils.RichGenericRecord +import org.apache.avro.generic.GenericRecord +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.connector.sink2.SinkWriter import org.apache.flink.connector.elasticsearch.sink import org.apache.flink.connector.elasticsearch.sink.{ @@ -19,7 +23,7 @@ import org.apache.http.HttpHost import org.elasticsearch.client.Requests import java.net.URL -import scala.collection.JavaConverters.mapAsJavaMap +import scala.collection.JavaConverters._ /** Elasticsearch sink config * @@ -48,7 +52,7 @@ case class ElasticsearchSinkConfig[ADT <: FlinkEvent]( with LazyLogging { override val connector: FlinkConnectorName = - FlinkConnectorName.ElasticsearchSink + FlinkConnectorName.Elasticsearch val index: String = config.getString(pfx("index")) val transports: List[HttpHost] = @@ -75,11 +79,13 @@ case class ElasticsearchSinkConfig[ADT <: FlinkEvent]( val bulkFlushIntervalMs: Option[Long] = Option(properties.getProperty("bulk.flush.interval.ms")).map(_.toLong) - def getSink[E <: ADT](dataStream: DataStream[E]): DataStreamSink[E] = { + def _getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E], + emitter: ElasticsearchEmitter[E]): DataStreamSink[E] = { val esb = new Elasticsearch7SinkBuilder[E] .setHosts(transports: _*) - .setEmitter[E](getEmitter[E]) + .setEmitter[E](emitter) .setBulkFlushBackoffStrategy( bulkFlushBackoffType, bulkFlushBackoffRetries, @@ -91,13 +97,37 @@ case class ElasticsearchSinkConfig[ADT <: FlinkEvent]( dataStream.sinkTo(esb.build()).uid(label).name(label) } - def getEmitter[E <: ADT]: ElasticsearchEmitter[E] = + override def getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink(dataStream, getEmitter[E]) + + override def getAvroSink[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink(dataStream, getAvroEmitter[E, A]) + + def _getEmitter[E <: ADT]( + getData: E => AnyRef): ElasticsearchEmitter[E] = (element: E, _: SinkWriter.Context, indexer: sink.RequestIndexer) => indexer.add( Requests.indexRequest .index(index) .source( - mapAsJavaMap(Map("data" -> element.asInstanceOf[AnyRef])) + Map("data" -> getData(element)).asJava ) ) + + def getEmitter[E <: ADT]: ElasticsearchEmitter[E] = _getEmitter { e => + val values = e.productIterator + e.getClass.getDeclaredFields + .map(_.getName -> values.next()) + .toMap + .asJava + } + def getAvroEmitter[ + E <: ADT with EmbeddedAvroRecord[A], + A <: GenericRecord]: ElasticsearchEmitter[E] = _getEmitter( + _.$record.getDataAsMap.asJava + ) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/FileSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/FileSinkConfig.scala index baf1b835..459b1154 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/FileSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/FileSinkConfig.scala @@ -110,7 +110,7 @@ case class FileSinkConfig[ADT <: FlinkEvent]( * @return * DataStreamSink[E] */ - def getSink[E <: ADT: TypeInformation]( + override def getSink[E <: ADT: TypeInformation]( dataStream: DataStream[E]): DataStreamSink[E] = dataStream.sinkTo( FileSink @@ -133,7 +133,7 @@ case class FileSinkConfig[ADT <: FlinkEvent]( * @return * DataStream[E] */ - def getAvroSink[ + override def getAvroSink[ E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, A <: GenericRecord: TypeInformation]( dataStream: DataStream[E]): DataStreamSink[E] = { diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index 12cbce61..f47b9069 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -13,6 +13,7 @@ import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.{ } import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction import io.epiphanous.flinkrunner.util.SqlBuilder +import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat @@ -27,7 +28,7 @@ import org.apache.flink.connector.jdbc.{ import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala.DataStream -import java.sql.{Connection, DriverManager, Timestamp} +import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp} import java.time.Instant import java.util.function.{Function => JavaFunction} import scala.collection.JavaConverters._ @@ -571,35 +572,66 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( * @return * JdbcStatementBuilder[E] */ - def getStatementBuilder[E <: ADT]: JdbcStatementBuilder[E] = { - case (statement, element) => - val data = element.getClass.getDeclaredFields - .map(_.getName) - .zip(element.productIterator.toIndexedSeq) - .toMap - .filterKeys(f => columns.exists(_.name.equalsIgnoreCase(f))) - columns.zipWithIndex.map(x => (x._1, x._2 + 1)).foreach { - case (column, i) => - data.get(column.name) match { - case Some(v) => - val value = v match { - case ts: Instant => Timestamp.from(ts) - case Some(ts: Instant) => Timestamp.from(ts) - case Some(x) => x - case None => null - case _ => v - } - statement.setObject(i, value, column.dataType.jdbcType) - case None => - throw new RuntimeException( - s"value for field ${column.name} is not in $element" - ) - } + def getStatementBuilder[E <: ADT]: JdbcStatementBuilder[E] = + new JdbcStatementBuilder[E] { + override def accept(statement: PreparedStatement, element: E): Unit = + _fillInStatement( + fieldValuesOf(element), + statement, + element + ) + } + + def fieldValuesOf[T <: Product](product: T): Map[String, Any] = { + product.getClass.getDeclaredFields + .map(_.getName) + .zip(product.productIterator.toIndexedSeq) + .toMap + } + + def getAvroStatementBuilder[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]: JdbcStatementBuilder[E] = + new JdbcStatementBuilder[E] { + override def accept( + statement: PreparedStatement, + element: E): Unit = { + println(s"XXX: $element") + _fillInStatement[E]( + fieldValuesOf(element.$record.asInstanceOf[Product]), + statement, + element + ) } + } + + def _fillInStatement[E <: ADT]( + data: Map[String, Any], + statement: PreparedStatement, + element: E): Unit = { + columns.zipWithIndex.map(x => (x._1, x._2 + 1)).foreach { + case (column, i) => + data.get(column.name) match { + case Some(v) => + val value = v match { + case ts: Instant => Timestamp.from(ts) + case Some(ts: Instant) => Timestamp.from(ts) + case Some(x) => x + case None => null + case _ => v + } + statement.setObject(i, value, column.dataType.jdbcType) + case None => + throw new RuntimeException( + s"value for field ${column.name} is not in $element" + ) + } + } } - def getSink[E <: ADT: TypeInformation]( - dataStream: DataStream[E]): DataStreamSink[E] = { + def _getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E], + statementBuilder: JdbcStatementBuilder[E]): DataStreamSink[E] = { val jdbcOutputFormat = new JdbcOutputFormat[E, E, JdbcBatchStatementExecutor[E]]( new SimpleJdbcConnectionProvider( @@ -616,7 +648,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( t: RuntimeContext): JdbcBatchStatementExecutor[E] = { JdbcBatchStatementExecutor.simple( queryDml, - getStatementBuilder[E], + statementBuilder, JavaFunction.identity[E] ) } @@ -630,6 +662,16 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( .uid(label) .name(label) } + + override def getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink(dataStream, getStatementBuilder[E]) + + override def getAvroSink[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink(dataStream, getAvroStatementBuilder[E, A]) } object JdbcSinkConfig { diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala index aabacba6..54791413 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala @@ -135,13 +135,13 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation]( : KafkaRecordSerializationSchema[E] = new JsonKafkaRecordSerializationSchema[E, ADT](this) - def getAvroSink[ + override def getAvroSink[ E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, A <: GenericRecord: TypeInformation]( dataStream: DataStream[E]): DataStreamSink[E] = dataStream.sinkTo(_getSink[E](getAvroSerializationSchema[E, A])) - def getSink[E <: ADT: TypeInformation]( + override def getSink[E <: ADT: TypeInformation]( dataStream: DataStream[E]): DataStreamSink[E] = dataStream.sinkTo(_getSink[E](getSerializationSchema[E])) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KinesisSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KinesisSinkConfig.scala index abf04c9d..e17b8396 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KinesisSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KinesisSinkConfig.scala @@ -2,11 +2,16 @@ package io.epiphanous.flinkrunner.model.sink import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model.{ + EmbeddedAvroRecord, FlinkConfig, FlinkConnectorName, FlinkEvent } -import io.epiphanous.flinkrunner.serde.JsonSerializationSchema +import io.epiphanous.flinkrunner.serde.{ + EmbeddedAvroJsonSerializationSchema, + JsonSerializationSchema +} +import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.serialization.SerializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink @@ -44,15 +49,16 @@ case class KinesisSinkConfig[ADT <: FlinkEvent: TypeInformation]( val stream: String = config.getString(pfx("stream")) - def getSink[E <: ADT: TypeInformation]( - dataStream: DataStream[E]): DataStreamSink[E] = { + def _getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E], + serializationSchema: SerializationSchema[E]): DataStreamSink[E] = { dataStream .sinkTo( KinesisStreamsSink .builder[E] .setStreamName(stream) .setFailOnError(true) - .setSerializationSchema(getSerializationSchema[E]) + .setSerializationSchema(serializationSchema) .setKinesisClientProperties(properties) .build() ) @@ -60,7 +66,23 @@ case class KinesisSinkConfig[ADT <: FlinkEvent: TypeInformation]( .name(label) } + override def getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink[E](dataStream, getSerializationSchema[E]) + + override def getAvroSink[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink[E](dataStream, getAvroSerializationSchema[E, A]) + def getSerializationSchema[E <: ADT: TypeInformation] : SerializationSchema[E] = new JsonSerializationSchema[E, ADT](this) + + def getAvroSerializationSchema[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation] = + new EmbeddedAvroJsonSerializationSchema[E, A, ADT](this) + } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/RabbitMQSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/RabbitMQSinkConfig.scala index a012f84e..cff020f0 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/RabbitMQSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/RabbitMQSinkConfig.scala @@ -1,13 +1,12 @@ package io.epiphanous.flinkrunner.model.sink import com.typesafe.scalalogging.LazyLogging -import io.epiphanous.flinkrunner.model.{ - FlinkConfig, - FlinkConnectorName, - FlinkEvent, - RabbitMQConnectionInfo +import io.epiphanous.flinkrunner.model._ +import io.epiphanous.flinkrunner.serde.{ + EmbeddedAvroJsonSerializationSchema, + JsonSerializationSchema } -import io.epiphanous.flinkrunner.serde.JsonSerializationSchema +import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.serialization.SerializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStreamSink @@ -35,13 +34,18 @@ case class RabbitMQSinkConfig[ADT <: FlinkEvent: TypeInformation]( def getSerializationSchema[E <: ADT: TypeInformation] : SerializationSchema[E] = new JsonSerializationSchema[E, ADT](this) + def getAvroSerializationSchema[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation] = + new EmbeddedAvroJsonSerializationSchema[E, A, ADT](this) + def getPublishOptions[E <: ADT: TypeInformation] : Option[RMQSinkPublishOptions[E]] = None - def getSink[E <: ADT: TypeInformation]( - dataStream: DataStream[E]): DataStreamSink[E] = { - val connConfig = connectionInfo.rmqConfig - val serializationSchema = getSerializationSchema[E] + def _getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E], + serializationSchema: SerializationSchema[E]): DataStreamSink[E] = { + val connConfig = connectionInfo.rmqConfig val sink = getPublishOptions[E] match { case Some(p) => new RMQSink(connConfig, serializationSchema, p) @@ -56,4 +60,15 @@ case class RabbitMQSinkConfig[ADT <: FlinkEvent: TypeInformation]( } dataStream.addSink(sink).uid(label).name(label) } + + override def getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink[E](dataStream, getSerializationSchema[E]) + + override def getAvroSink[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink[E](dataStream, getAvroSerializationSchema[E, A]) + } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala index c781fa6e..eb51fa5b 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala @@ -4,7 +4,10 @@ import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model.FlinkConnectorName._ import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.util.StreamUtils._ +import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.datastream.DataStreamSink +import org.apache.flink.streaming.api.scala.DataStream import java.util import java.util.Properties @@ -16,14 +19,15 @@ import java.util.Properties * * - `name`: the sink name * - `connector`: one of - * - [[FlinkConnectorName.CassandraSink]] - * - [[FlinkConnectorName.ElasticsearchSink]] + * - [[FlinkConnectorName.Cassandra]] + * - [[FlinkConnectorName.Elasticsearch]] * - [[FlinkConnectorName.File]] * - [[FlinkConnectorName.Jdbc]] * - [[FlinkConnectorName.Kafka]] * - [[FlinkConnectorName.Kinesis]] * - [[FlinkConnectorName.RabbitMQ]] * - [[FlinkConnectorName.Socket]] + * * @tparam ADT * the flinkrunner algebraic data type */ @@ -45,6 +49,14 @@ trait SinkConfig[ADT <: FlinkEvent] extends LazyLogging { lazy val label: String = s"${connector.entryName.toLowerCase}/$name" + def getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] + + def getAvroSink[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] + } object SinkConfig { @@ -58,17 +70,17 @@ object SinkConfig { config.jobName, config.getStringOpt(s"sinks.$name.connector") ) match { - case Kafka => KafkaSinkConfig(name, config) - case Kinesis => KinesisSinkConfig(name, config) - case File => FileSinkConfig(name, config) - case Socket => SocketSinkConfig(name, config) - case Jdbc => JdbcSinkConfig(name, config) - case CassandraSink => + case Kafka => KafkaSinkConfig(name, config) + case Kinesis => KinesisSinkConfig(name, config) + case File => FileSinkConfig(name, config) + case Socket => SocketSinkConfig(name, config) + case Jdbc => JdbcSinkConfig(name, config) + case Cassandra => CassandraSinkConfig(name, config) - case ElasticsearchSink => + case Elasticsearch => ElasticsearchSinkConfig(name, config) - case RabbitMQ => RabbitMQSinkConfig(name, config) - case connector => + case RabbitMQ => RabbitMQSinkConfig(name, config) + case connector => throw new RuntimeException( s"Don't know how to configure ${connector.entryName} sink connector $name (job ${config.jobName}" ) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SocketSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SocketSinkConfig.scala index 7a8d4ea6..14688aea 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SocketSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SocketSinkConfig.scala @@ -1,13 +1,9 @@ package io.epiphanous.flinkrunner.model.sink import com.typesafe.scalalogging.LazyLogging -import io.epiphanous.flinkrunner.model.{ - FlinkConfig, - FlinkConnectorName, - FlinkEvent, - StreamFormatName -} +import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.serde._ +import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.serialization.SerializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStreamSink @@ -62,14 +58,31 @@ case class SocketSinkConfig[ADT <: FlinkEvent]( } } - def getSink[E <: ADT: TypeInformation]( - dataStream: DataStream[E]): DataStreamSink[E] = dataStream.addSink( - new SocketClientSink[E]( - host, - port, - getSerializationSchema[E], - maxRetries, - autoFlush + override def getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink[E](dataStream, getSerializationSchema[E]) + + def getAvroSerializationSchema[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]: SerializationSchema[E] = + new EmbeddedAvroJsonSerializationSchema[E, A, ADT](this) + + override def getAvroSink[ + E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + dataStream: DataStream[E]): DataStreamSink[E] = + _getSink[E](dataStream, getAvroSerializationSchema[E, A]) + + def _getSink[E <: ADT: TypeInformation]( + dataStream: DataStream[E], + serializationSchema: SerializationSchema[E]): DataStreamSink[E] = + dataStream.addSink( + new SocketClientSink[E]( + host, + port, + serializationSchema, + maxRetries, + autoFlush + ) ) - ) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala index 555c9aa9..4839b35d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala @@ -2,6 +2,7 @@ package io.epiphanous.flinkrunner.model.source import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.serde._ +import io.epiphanous.flinkrunner.util.AvroUtils.toEmbeddedAvroInstance import io.epiphanous.flinkrunner.util.ConfigToProps.getFromEither import io.epiphanous.flinkrunner.util.FileUtils.getResourceOrFile import org.apache.avro.Schema @@ -294,13 +295,13 @@ case class FileSourceConfig[ADT <: FlinkEvent]( case StreamFormatName.Avro => val avroInputFormat = new AvroInputFormat( origin, - implicitly[TypeInformation[A]].getTypeClass + classOf[GenericRecord] ) avroInputFormat.setNestedFileEnumeration(true) if (wantsFiltering) avroInputFormat.setFilesFilter(fileFilter) nameAndWatermark( env - .readFile[A]( + .readFile[GenericRecord]( avroInputFormat, path, if (monitorDuration > 0) @@ -310,7 +311,13 @@ case class FileSourceConfig[ADT <: FlinkEvent]( ) .uid(s"avro:$label") .name(s"avro:$label") - .map((a: A) => fromKV(EmbeddedAvroRecordInfo(a, config))), + .map(g => + toEmbeddedAvroInstance[E, A, ADT]( + g, + implicitly[TypeInformation[A]].getTypeClass, + config + ) + ), label ) case _ => diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonSerializationSchema.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonSerializationSchema.scala index ea1dc054..bce2bb68 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonSerializationSchema.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/EmbeddedAvroJsonSerializationSchema.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation class EmbeddedAvroJsonSerializationSchema[ E <: ADT with EmbeddedAvroRecord[A]: TypeInformation, A <: GenericRecord: TypeInformation, - ADT <: FlinkEvent: TypeInformation](sinkConfig: SinkConfig[ADT]) + ADT <: FlinkEvent](sinkConfig: SinkConfig[ADT]) extends JsonSerializationSchema[E, ADT](sinkConfig) { val avroJsonEncoder = new EmbeddedAvroJsonFileEncoder[E, A, ADT] diff --git a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala index 1065abd4..5b0c1c1d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/serde/JsonSerializationSchema.scala @@ -19,7 +19,7 @@ import java.nio.charset.StandardCharsets */ class JsonSerializationSchema[ E <: ADT: TypeInformation, - ADT <: FlinkEvent: TypeInformation](sinkConfig: SinkConfig[ADT]) + ADT <: FlinkEvent](sinkConfig: SinkConfig[ADT]) extends SerializationSchema[E] with LazyLogging { diff --git a/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala b/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala index f9271574..0b2ae13e 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala @@ -105,7 +105,17 @@ object AvroUtils extends LazyLogging { ) ) - implicit class GenericToSpecific(genericRecord: GenericRecord) { + implicit class RichGenericRecord(genericRecord: GenericRecord) { + def getDataAsSeq[A <: GenericRecord]: Seq[AnyRef] = + genericRecord.getSchema.getFields.asScala.map(f => + genericRecord.get(f.pos()) + ) + + def getDataAsMap[A <: GenericRecord]: Map[String, AnyRef] = + genericRecord.getSchema.getFields.asScala + .map(f => (f.name(), genericRecord.get(f.name()))) + .toMap + def toSpecific[A <: GenericRecord](instance: A): A = { genericRecord.getSchema.getFields.asScala .foldLeft(instance) { (a, field) => diff --git a/src/test/resources/SampleA.csv b/src/test/resources/SampleA.csv index fa8e810a..fc61df59 100644 --- a/src/test/resources/SampleA.csv +++ b/src/test/resources/SampleA.csv @@ -1,5 +1,5 @@ gx21ge6,B-NGMZ-5351,200,2022-08-26T20:37:33.031Z -zVqYPPA,B-GCWD-8429,,2022-08-26T20:37:37.854Z +zVqYPPA,B-GCWD-8429,301,2022-08-26T20:37:37.854Z nF1kVdP,B-CEWP-2441,299,2022-08-26T20:37:10.113Z b0Q1VjB,B-TFWR-9685,226,2022-08-26T20:37:17.299Z bGlSUta,B-GSZQ-8036,245,2022-08-26T20:37:13.649Z diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/ElasticsearchSinkConfigSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/ElasticsearchConfigSpec.scala similarity index 92% rename from src/test/scala/io/epiphanous/flinkrunner/model/ElasticsearchSinkConfigSpec.scala rename to src/test/scala/io/epiphanous/flinkrunner/model/ElasticsearchConfigSpec.scala index 0c95d23f..13fe6749 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/ElasticsearchSinkConfigSpec.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/ElasticsearchConfigSpec.scala @@ -6,7 +6,7 @@ import org.apache.http.HttpHost import java.net.URL import scala.collection.JavaConverters._ -class ElasticsearchSinkConfigSpec extends PropSpec { +class ElasticsearchConfigSpec extends PropSpec { def getHosts(transports: List[String]) = transports.map { s => val url = new URL(if (s.startsWith("http")) s else s"http://$s") val hostname = url.getHost diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala b/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala index dc430432..229afb2e 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/MySimpleADT.scala @@ -1,5 +1,7 @@ package io.epiphanous.flinkrunner.model +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + import java.time.Instant sealed trait MySimpleADT extends FlinkEvent @@ -36,7 +38,9 @@ case class SimpleB( id: String, b0: String, b1: Double, - b2: Option[Int], + @JsonDeserialize(contentAs = classOf[java.lang.Integer]) b2: Option[ + Int + ], ts: Instant) extends MySimpleADT { override def $id: String = id diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/TestGeneratorFactory.scala b/src/test/scala/io/epiphanous/flinkrunner/model/TestGeneratorFactory.scala new file mode 100644 index 00000000..0c817132 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/TestGeneratorFactory.scala @@ -0,0 +1,25 @@ +package io.epiphanous.flinkrunner.model + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.runtime.state.FunctionInitializationContext +import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator + +import java.time.Instant + +class TestGeneratorFactory extends GeneratorFactory[MySimpleADT] { + override def getDataGenerator[E <: MySimpleADT: TypeInformation]( + generatorConfig: GeneratorConfig): DataGenerator[E] = { + new DataGenerator[E] { + override def open( + name: String, + context: FunctionInitializationContext, + runtimeContext: RuntimeContext): Unit = {} + + override def hasNext: Boolean = true + + override def next(): E = + SimpleA("id", "a", 1, Instant.now()).asInstanceOf[E] + } + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkSpec.scala new file mode 100644 index 00000000..9387261e --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/CassandraSinkSpec.scala @@ -0,0 +1,91 @@ +package io.epiphanous.flinkrunner.model.sink + +import com.datastax.driver.core.{Cluster, CodecRegistry} +import com.datastax.driver.extras.codecs.jdk8.InstantCodec +import com.dimafeng.testcontainers.CassandraContainer +import io.epiphanous.flinkrunner.model.SimpleA +import org.apache.flink.api.scala.createTypeInformation + +import java.time.Instant +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +class CassandraSinkSpec extends SinkSpec { + val cassandra = new CassandraContainer() + + property("cassandra sink works") { + cassandra.start() + val c = cassandra.container + val session = Cluster + .builder() + .addContactPoint(c.getHost) + .withPort(c.getMappedPort(9042)) + .withoutJMXReporting() + .withCodecRegistry( + new CodecRegistry().register(InstantCodec.instance) + ) + .build() + .newSession() + + session.execute(""" + |create keyspace if not exists simple_adt + | with replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; + |""".stripMargin) + + session.execute(""" + |create table if not exists simple_adt.simple_a ( + | id text, + | a0 text, + | a1 int, + | ts timestamp, + | primary key(id) + |); + |""".stripMargin) + + testJob[SimpleA]( + s""" + |cassandra-test { + | host = ${c.getHost} + | port = ${c.getMappedPort(9042)} + | query = "insert into simple_adt.simple_a (id, a0, a1, ts) values (?, ?, ?, ?);" + |} + |""".stripMargin, + "resource://SampleA.csv" + ) + + // contents of SampleA.csv sorted + val expected = + """ + |b0Q1VjB,B-TFWR-9685,226,2022-08-26T20:37:17.299Z + |bGlSUta,B-GSZQ-8036,245,2022-08-26T20:37:13.649Z + |dluBK7m,B-RYOT-2386,200,2022-08-26T20:37:39.150Z + |edopOkb,B-VSFJ-9246,299,2022-08-26T20:37:23.287Z + |gx21ge6,B-NGMZ-5351,200,2022-08-26T20:37:33.031Z + |i4t00SY,B-RLTY-8415,223,2022-08-26T20:37:41.671Z + |nF1kVdP,B-CEWP-2441,299,2022-08-26T20:37:10.113Z + |w0x0NBB,B-VHHF-7895,217,2022-08-26T20:36:56.524Z + |zVqYPPA,B-GCWD-8429,301,2022-08-26T20:37:37.854Z + |""".stripMargin.trim + + val rows = ArrayBuffer.empty[String] + for ( + row <- + session + .execute("select * from simple_adt.simple_a") + .asScala + ) { + rows += + List( + row.get(0, classOf[String]), + row.get(1, classOf[String]), + row.get(2, classOf[Int]).toString, + row.get(3, classOf[Instant]).toString + ).mkString(",") + } + + rows.sorted.mkString("\n") shouldEqual expected + + session.close() + cassandra.stop() + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala index 234dcfd0..29fa3225 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkJobTest.scala @@ -1,134 +1,77 @@ package io.epiphanous.flinkrunner.model.sink -import io.epiphanous.flinkrunner.flink.StreamJob -import io.epiphanous.flinkrunner.model.{MySimpleADT, SimpleB} -import io.epiphanous.flinkrunner.{FlinkRunner, FlinkRunnerSpec} -import org.apache.flink.api.common.typeinfo.TypeInformation +import com.dimafeng.testcontainers.PostgreSQLContainer +import io.epiphanous.flinkrunner.model.{ + BRecord, + BWrapper, + SimpleB, + StreamFormatName +} import org.apache.flink.api.scala.createTypeInformation -import org.apache.flink.streaming.api.scala.DataStream -import java.sql.DriverManager +import java.time.Instant +import java.util.Properties -class JdbcSinkJobTest extends FlinkRunnerSpec { +class JdbcSinkJobTest extends SinkSpec { -// val pgContainer: PostgreSQLContainer = PostgreSQLContainer() - val pgContainer = new Object() { - val databaseName = "test" - val schema = "public" - val jdbcUrl = "jdbc:postgresql://localhost:5432/test" - val username = "test" - val password = "test" - } + val pgContainer: PostgreSQLContainer = PostgreSQLContainer() - // ignore since it's manual - ignore("write job results to sink") { -// pgContainer.start() - val configStr = + property("jdbc sink works") { + pgContainer.start() + testJob[SimpleB]( s""" - |sinks { - | jdbc-test { - | connection = { - | database = "${pgContainer.databaseName}" - | schema = "${pgContainer.schema}" - | url = "${pgContainer.jdbcUrl}" - | username = "${pgContainer.username}" - | password = "${pgContainer.password}" - | } - | table { - | name = "sample_b" - | columns = [ - | { - | name = id - | type = VARCHAR - | precision = 36 - | primary.key = 1 - | } - | { - | name = b0 - | type = VARCHAR - | precision = 255 - | nullable = false - | } - | { - | name = b1 - | type = DOUBLE - | nullable = false - | } - | { - | name = b2 - | type = INTEGER - | } - | { - | name = ts - | type = TIMESTAMP - | nullable = false - | } - | ] - | } - | } - |} - |sources { - | test-file { - | path = "resource://SampleB.csv" - | format = csv - | } - |} - |jobs { - | testJob { - | show.plan = true - | } - |} - |execution.runtime-mode = batch - |""".stripMargin -// val checkResults: CheckResults[MySimpleADT] = -// new CheckResults[MySimpleADT] { -// override val name = "check postgresql table" -// override val writeToSink = false -// override def getInputEvents[IN <: MySimpleADT: TypeInformation]( -// sourceConfig: SourceConfig[MySimpleADT]): List[IN] = -// genPop[SimpleB]().asInstanceOf[List[IN]] -// -// override def checkOutputEvents[ -// OUT <: MySimpleADT: TypeInformation]( -// sinkConfig: SinkConfig[MySimpleADT], -// out: List[OUT]): Unit = { -// logger.debug(out.mkString("\n")) -// sinkConfig match { -// case sc: JdbcSinkConfig[MySimpleADT] => -// sc.getConnection -// .fold( -// t => -// throw new RuntimeException( -// "failed to connect to test database", -// t -// ), -// conn => { -// val rs = conn -// .createStatement() -// .executeQuery(s"select * from ${sc.table}") -// while (rs.next()) { -// val row = rs.getRow -// logger.debug( -// s"$row - ${Range(1, 6).map(i => rs.getString(i)).mkString("|")}" -// ) -// } -// } -// ) -// case _ => logger.debug("Oops") -// } -// } -// } - - val factory = (runner: FlinkRunner[MySimpleADT]) => - new IdentityJob[SimpleB](runner) - testStreamJob(configStr, factory) - val conn = DriverManager.getConnection( - pgContainer.jdbcUrl, - pgContainer.username, - pgContainer.password + | jdbc-test { + | connection = { + | database = "${pgContainer.databaseName}" + | schema = public + | url = "${pgContainer.jdbcUrl}" + | username = "${pgContainer.username}" + | password = "${pgContainer.password}" + | } + | table { + | name = "sample_b" + | columns = [ + | { + | name = id + | type = VARCHAR + | precision = 36 + | primary.key = 1 + | } + | { + | name = b0 + | type = VARCHAR + | precision = 255 + | nullable = false + | } + | { + | name = b1 + | type = DOUBLE + | nullable = false + | } + | { + | name = b2 + | type = INTEGER + | } + | { + | name = ts + | type = TIMESTAMP + | nullable = false + | } + | ] + | } + | } + |""".stripMargin, + "resource://SampleB.csv", + otherJobConfig = "show.plan = true" ) - val stmt = conn.createStatement() - val rs = stmt.executeQuery("select * from sample_b") + + val props = new Properties() + props.put("user", pgContainer.username) + props.put("password", pgContainer.password) + val conn = + pgContainer.jdbcDriverInstance.connect(pgContainer.jdbcUrl, props) + val stmt = conn.createStatement() + val rs = stmt.executeQuery("select * from sample_b") while (rs.next()) { println( rs.getRow + "|" + rs.getString("id").trim() + "|" + rs.getString( @@ -141,18 +84,79 @@ class JdbcSinkJobTest extends FlinkRunnerSpec { } stmt.close() conn.close() - // pgContainer.stop() + pgContainer.stop() } -} - -class IdentityJob[E <: MySimpleADT: TypeInformation]( - runner: FlinkRunner[MySimpleADT]) - extends StreamJob[E, MySimpleADT](runner) { - override def transform: DataStream[E] = { - singleSource[E]().map { e: E => - println(e.toString) - e + property("jdbc avro sink works") { + pgContainer.start() + val pop = genPop[BWrapper](10) + pop.foreach(println) + val database = pgContainer.databaseName + val url = pgContainer.jdbcUrl + val user = pgContainer.username + val pw = pgContainer.password + getTempFile(StreamFormatName.Avro).map { path => + val avroFile = path.toString + writeFile(avroFile, StreamFormatName.Avro, pop) + testAvroJob[BWrapper, BRecord]( + s""" + | jdbc-test { + | connection = { + | database = "$database" + | schema = public + | url = "$url" + | username = "$user" + | password = "$pw" + | } + | table { + | name = "b_record" + | columns = [ + | { + | name = b0 + | type = VARCHAR + | precision = 36 + | primary.key = 1 + | } + | { + | name = b1 + | type = INTEGER + | nullable = true + | } + | { + | name = b2 + | type = DOUBLE + | nullable = true + | } + | { + | name = b3 + | type = TIMESTAMP + | nullable = false + | } + | ] + | } + | } + |""".stripMargin, + sourceFile = avroFile, + sourceFormat = "avro" + ) + val props = new Properties() + props.put("user", user) + props.put("password", pw) + val conn = + pgContainer.jdbcDriverInstance.connect(url, props) + val stmt = conn.createStatement() + val rs = stmt.executeQuery("select * from b_record") + while (rs.next()) { + val row = rs.getRow + val b0 = rs.getString("b0").trim() + val b1 = Option(rs.getInt("b1")) + val b2 = Option(rs.getDouble("b2")) + val b3 = Instant.ofEpochMilli(rs.getTimestamp("b3").getTime) + println(row -> BRecord(b0, b1, b2, b3)) + } + stmt.close() + conn.close() } + pgContainer.stop() } } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleAvroIdentityJob.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleAvroIdentityJob.scala new file mode 100644 index 00000000..43e40bab --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleAvroIdentityJob.scala @@ -0,0 +1,23 @@ +package io.epiphanous.flinkrunner.model.sink + +import io.epiphanous.flinkrunner.FlinkRunner +import io.epiphanous.flinkrunner.flink.AvroStreamJob +import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, EmbeddedAvroRecordInfo, MyAvroADT} +import org.apache.avro.generic.GenericRecord +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.streaming.api.scala.DataStream + +class SimpleAvroIdentityJob[ + E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation](runner: FlinkRunner[MyAvroADT])( + implicit fromKV: EmbeddedAvroRecordInfo[A] => E) + extends AvroStreamJob[E, A, MyAvroADT](runner) { + + override def transform: DataStream[E] = { + singleAvroSource[E, A]().map { e: E => + println(e.$record.toString) + e + } + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleIdentityJob.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleIdentityJob.scala new file mode 100644 index 00000000..95a56502 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SimpleIdentityJob.scala @@ -0,0 +1,20 @@ +package io.epiphanous.flinkrunner.model.sink + +import io.epiphanous.flinkrunner.FlinkRunner +import io.epiphanous.flinkrunner.flink.StreamJob +import io.epiphanous.flinkrunner.model.MySimpleADT +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.streaming.api.scala.DataStream + +class SimpleIdentityJob[E <: MySimpleADT: TypeInformation]( + runner: FlinkRunner[MySimpleADT]) + extends StreamJob[E, MySimpleADT](runner) { + + override def transform: DataStream[E] = { + singleSource[E]().map { e: E => + println(e.toString) + e + } + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/SinkSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SinkSpec.scala new file mode 100644 index 00000000..89f7ec33 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/SinkSpec.scala @@ -0,0 +1,86 @@ +package io.epiphanous.flinkrunner.model.sink + +import io.epiphanous.flinkrunner.model._ +import io.epiphanous.flinkrunner.{FlinkRunner, FlinkRunnerSpec} +import org.apache.avro.generic.GenericRecord +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.createTypeInformation + +class SinkSpec extends FlinkRunnerSpec with AvroFileTestUtils { + + def getFactory[E <: MySimpleADT: TypeInformation] + : FlinkRunner[MySimpleADT] => SimpleIdentityJob[E] = + (runner: FlinkRunner[MySimpleADT]) => new SimpleIdentityJob[E](runner) + + def getAvroFactory[ + E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation](implicit + fromKV: EmbeddedAvroRecordInfo[A] => E) + : FlinkRunner[MyAvroADT] => SimpleAvroIdentityJob[ + E, + A + ] = (runner: FlinkRunner[MyAvroADT]) => + new SimpleAvroIdentityJob[E, A](runner) + + def getJobConfig( + sinkConfigStr: String, + sourceFile: String, + sourceFormat: String = "csv", + batchMode: Boolean = true, + otherJobConfig: String = ""): String = + s""" + |${if (batchMode) "runtime.execution-mode=batch" else ""} + |jobs { + | testJob { + | $otherJobConfig + | sources { + | file-source { + | path = "$sourceFile" + | format = $sourceFormat + | } + | } + | sinks { + | $sinkConfigStr + | } + | } + |} + |""".stripMargin + + def testJob[E <: MySimpleADT: TypeInformation]( + sinkConfigStr: String, + sourceFile: String, + sourceFormat: String = "csv", + batchMode: Boolean = true, + otherJobConfig: String = "runtime.execution-mode = batch"): Unit = + testStreamJob( + getJobConfig( + sinkConfigStr, + sourceFile, + sourceFormat, + batchMode, + otherJobConfig + ), + getFactory[E] + ) + + def testAvroJob[ + E <: MyAvroADT with EmbeddedAvroRecord[A]: TypeInformation, + A <: GenericRecord: TypeInformation]( + sinkConfigStr: String, + sourceFile: String, + sourceFormat: String = "csv", + batchMode: Boolean = true, + otherJobConfig: String = "")(implicit + fromKV: EmbeddedAvroRecordInfo[A] => E): Unit = + testAvroStreamJob( + getJobConfig( + sinkConfigStr, + sourceFile, + sourceFormat, + batchMode, + otherJobConfig + ), + getAvroFactory[E, A] + ) + +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/source/GeneratorSourceConfigTest.scala b/src/test/scala/io/epiphanous/flinkrunner/model/source/GeneratorSourceConfigTest.scala index aaa9203d..c81b3b0b 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/source/GeneratorSourceConfigTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/source/GeneratorSourceConfigTest.scala @@ -2,37 +2,14 @@ package io.epiphanous.flinkrunner.model.source import io.epiphanous.flinkrunner.PropSpec import io.epiphanous.flinkrunner.model._ -import org.apache.flink.api.common.functions.RuntimeContext -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.createTypeInformation -import org.apache.flink.runtime.state.FunctionInitializationContext -import org.apache.flink.streaming.api.functions.source.datagen.{ - DataGenerator, - DataGeneratorSource -} +import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource import java.time.temporal.ChronoUnit import java.time.{Duration, Instant} class GeneratorSourceConfigTest extends PropSpec { - class TestGeneratorFactory extends GeneratorFactory[MySimpleADT] { - override def getDataGenerator[E <: MySimpleADT: TypeInformation]( - generatorConfig: GeneratorConfig): DataGenerator[E] = { - new DataGenerator[E] { - override def open( - name: String, - context: FunctionInitializationContext, - runtimeContext: RuntimeContext): Unit = {} - - override def hasNext: Boolean = true - - override def next(): E = - SimpleA("id", "a", 1, Instant.now()).asInstanceOf[E] - } - } - } - def getSourceConfig( configStr: String): GeneratorSourceConfig[MySimpleADT] = GeneratorSourceConfig( diff --git a/src/test/scala/io/epiphanous/flinkrunner/util/AvroUtilsTest.scala b/src/test/scala/io/epiphanous/flinkrunner/util/AvroUtilsTest.scala index 209778f6..4701ac91 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/util/AvroUtilsTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/util/AvroUtilsTest.scala @@ -2,7 +2,7 @@ package io.epiphanous.flinkrunner.util import io.epiphanous.flinkrunner.PropSpec import io.epiphanous.flinkrunner.model._ -import io.epiphanous.flinkrunner.util.AvroUtils.GenericToSpecific +import io.epiphanous.flinkrunner.util.AvroUtils.RichGenericRecord import org.apache.avro.SchemaBuilder import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}