Skip to content

Commit

Permalink
clean up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Nov 14, 2022
1 parent b1e597b commit 784afb6
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 245 deletions.
7 changes: 7 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink._
import io.epiphanous.flinkrunner.model.source._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.AvroSchemaSerializer
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
Expand Down Expand Up @@ -42,6 +44,11 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](

val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

env.getConfig.addDefaultKryoSerializer(
classOf[Schema],
classOf[AvroSchemaSerializer]
)

/** Gets (and returns as string) the execution plan for the job from the
* StreamExecutionEnvironment.
* @return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.epiphanous.flinkrunner.model.sink

import com.datastax.driver.core.{Cluster, CodecRegistry}
import com.datastax.driver.extras.codecs.jdk8.InstantCodec
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
FlinkConfig,
Expand All @@ -13,7 +14,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.cassandra._
import com.datastax.driver.extras.codecs.jdk8.InstantCodec

/** A cassandra sink config.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,31 +572,38 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
* @return
* JdbcStatementBuilder[E]
*/
def getStatementBuilder[E <: ADT]: JdbcStatementBuilder[E] = {
case (statement, element) =>
_fillInStatement(
element.getClass.getDeclaredFields
.map(_.getName)
.zip(element.productIterator.toIndexedSeq)
.toMap,
statement,
element
)
def getStatementBuilder[E <: ADT]: JdbcStatementBuilder[E] =
new JdbcStatementBuilder[E] {
override def accept(statement: PreparedStatement, element: E): Unit =
_fillInStatement(
fieldValuesOf(element),
statement,
element
)
}

def fieldValuesOf[T <: Product](product: T): Map[String, Any] = {
product.getClass.getDeclaredFields
.map(_.getName)
.zip(product.productIterator.toIndexedSeq)
.toMap
}

def getAvroStatementBuilder[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation]: JdbcStatementBuilder[E] = {
case (statement, element) =>
_fillInStatement[E](
element.$record.getSchema.getFields.asScala
.map(_.name)
.map(f => (f, element.$record.get(f)))
.toMap,
statement,
element
)
}
A <: GenericRecord: TypeInformation]: JdbcStatementBuilder[E] =
new JdbcStatementBuilder[E] {
override def accept(
statement: PreparedStatement,
element: E): Unit = {
println(s"XXX: $element")
_fillInStatement[E](
fieldValuesOf(element.$record.asInstanceOf[Product]),
statement,
element
)
}
}

def _fillInStatement[E <: ADT](
data: Map[String, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ object SinkConfig {
config.jobName,
config.getStringOpt(s"sinks.$name.connector")
) match {
case Kafka => KafkaSinkConfig(name, config)
case Kinesis => KinesisSinkConfig(name, config)
case File => FileSinkConfig(name, config)
case Socket => SocketSinkConfig(name, config)
case Jdbc => JdbcSinkConfig(name, config)
case Kafka => KafkaSinkConfig(name, config)
case Kinesis => KinesisSinkConfig(name, config)
case File => FileSinkConfig(name, config)
case Socket => SocketSinkConfig(name, config)
case Jdbc => JdbcSinkConfig(name, config)
case Cassandra =>
CassandraSinkConfig(name, config)
case Elasticsearch =>
ElasticsearchSinkConfig(name, config)
case RabbitMQ => RabbitMQSinkConfig(name, config)
case connector =>
case RabbitMQ => RabbitMQSinkConfig(name, config)
case connector =>
throw new RuntimeException(
s"Don't know how to configure ${connector.entryName} sink connector $name (job ${config.jobName}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.epiphanous.flinkrunner.model.source

import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.serde._
import io.epiphanous.flinkrunner.util.AvroUtils.toEmbeddedAvroInstance
import io.epiphanous.flinkrunner.util.ConfigToProps.getFromEither
import io.epiphanous.flinkrunner.util.FileUtils.getResourceOrFile
import org.apache.avro.Schema
Expand Down Expand Up @@ -294,13 +295,13 @@ case class FileSourceConfig[ADT <: FlinkEvent](
case StreamFormatName.Avro =>
val avroInputFormat = new AvroInputFormat(
origin,
implicitly[TypeInformation[A]].getTypeClass
classOf[GenericRecord]
)
avroInputFormat.setNestedFileEnumeration(true)
if (wantsFiltering) avroInputFormat.setFilesFilter(fileFilter)
nameAndWatermark(
env
.readFile[A](
.readFile[GenericRecord](
avroInputFormat,
path,
if (monitorDuration > 0)
Expand All @@ -310,7 +311,13 @@ case class FileSourceConfig[ADT <: FlinkEvent](
)
.uid(s"avro:$label")
.name(s"avro:$label")
.map((a: A) => fromKV(EmbeddedAvroRecordInfo(a, config))),
.map(g =>
toEmbeddedAvroInstance[E, A, ADT](
g,
implicitly[TypeInformation[A]].getTypeClass,
config
)
),
label
)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.epiphanous.flinkrunner.model

import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import java.time.Instant

sealed trait MySimpleADT extends FlinkEvent
Expand Down Expand Up @@ -36,7 +38,9 @@ case class SimpleB(
id: String,
b0: String,
b1: Double,
b2: Option[Int],
@JsonDeserialize(contentAs = classOf[java.lang.Integer]) b2: Option[
Int
],
ts: Instant)
extends MySimpleADT {
override def $id: String = id
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.epiphanous.flinkrunner.model.sink

import com.dimafeng.testcontainers.CassandraContainer
import io.epiphanous.flinkrunner.model.SimpleA
import org.apache.flink.api.scala.createTypeInformation

import java.time.Instant
import scala.collection.JavaConverters._

class CassandraSinkSpec extends SinkSpec {
val cassandra = new CassandraContainer()

// todo: fix this test
ignore("cassandra sink works") {
cassandra.start()
val c = cassandra.container
val session = c.getCluster.newSession()

session.execute("""
|create keyspace if not exists simple_adt
| with replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
|""".stripMargin)

session.execute("""
|create table if not exists simple_adt.simple_a (
| id text,
| a0 text,
| a1 int,
| ts timestamp,
| primary key(id)
|);
|""".stripMargin)

testJob[SimpleA](
s"""
|cassandra-test {
| host = ${c.getHost}
| port = ${c.getMappedPort(9042)}
| query = "insert into simple_adt.simple_a (id, a0, a1, ts) values (?, ?, ?, ?);"
|}
|""".stripMargin,
"resource://SampleA.csv"
)

// for (
// row <-
// session.execute("select * from simple_adt.simple_a").asScala
// ) {
// println(row.get(0, classOf[String]))
// println(row.get(1, classOf[String]))
// println(row.get(2, classOf[Int]))
// println(row.get(3, classOf[Instant]))
// }
session.close()
cassandra.stop()
}
}

0 comments on commit 784afb6

Please sign in to comment.