From 601f53000597c670cdedfac718f6a8c890d6afac Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Sun, 11 Dec 2022 13:45:51 -0500 Subject: [PATCH] added test for firehose config --- build.sbt | 55 ++--- .../model/FlinkConnectorName.scala | 11 +- .../flinkrunner/model/KinesisProperties.scala | 79 +++++-- ...kConfig.scala => FirehoseSinkConfig.scala} | 4 +- .../flinkrunner/model/sink/SinkConfig.scala | 4 +- .../model/source/FileSourceConfig.scala | 16 +- .../model/source/RabbitMQSourceConfig.scala | 10 +- .../model/source/SocketSourceConfig.scala | 6 +- .../model/source/SourceConfig.scala | 28 ++- .../flinkrunner/util/AvroUtils.scala | 6 +- .../flinkrunner/util/InstantUtils.scala | 2 +- .../flinkrunner/model/D64Spec.scala | 1 - .../model/sink/FirehoseSinkConfigSpec.scala | 201 ++++++++++++++++++ 13 files changed, 344 insertions(+), 79 deletions(-) rename src/main/scala/io/epiphanous/flinkrunner/model/sink/{KinesisFirehoseSinkConfig.scala => FirehoseSinkConfig.scala} (97%) create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/sink/FirehoseSinkConfigSpec.scala diff --git a/build.sbt b/build.sbt index e65afcca..b31f195c 100644 --- a/build.sbt +++ b/build.sbt @@ -115,33 +115,34 @@ val circeDeps = Seq( ).map(d => "io.circe" %% s"circe-$d" % V.circe) val otherDeps = Seq( - "io.dropwizard.metrics" % "metrics-core" % V.dropWizard % Provided, - "com.github.f4b6a3" % "uuid-creator" % V.uuidCreator, - "org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided, - "io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided, - "com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided, - "com.beachape" %% "enumeratum" % V.enumeratum, - "com.typesafe" % "config" % V.typesafeConfig, - "com.google.guava" % "guava" % V.guava, - "org.typelevel" %% "squants" % V.squants, - "org.scalactic" %% "scalactic" % V.scalaTest, - "org.scalatest" %% "scalatest" % V.scalaTest % Test, - "org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test, - "org.scalacheck" %% "scalacheck" % V.scalaCheck, - "com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson, - "com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0", - "com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson, - "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson, - "com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test, - "com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test, - "mysql" % "mysql-connector-java" % V.jdbcMysql % Provided, - "com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test, - "org.postgresql" % "postgresql" % V.jdbcPg % Provided, - "com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test, - "net.java.dev.jna" % "jna" % V.jna % Test, - "com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided, - "com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test, - "com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided + "io.dropwizard.metrics" % "metrics-core" % V.dropWizard % Provided, + "com.github.f4b6a3" % "uuid-creator" % V.uuidCreator, + "org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided, + "io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided, + "com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided, + "com.beachape" %% "enumeratum" % V.enumeratum, + "com.typesafe" % "config" % V.typesafeConfig, + "com.google.guava" % "guava" % V.guava, + "org.typelevel" %% "squants" % V.squants, + "org.scalactic" %% "scalactic" % V.scalaTest, + "org.scalatest" %% "scalatest" % V.scalaTest % Test, + "org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test, + "org.scalacheck" %% "scalacheck" % V.scalaCheck, + "com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson, + "com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0", + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson, + "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson, + "com.dimafeng" %% "testcontainers-scala-localstack-v2" % V.testContainersScala % Test, + "com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test, + "com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test, + "mysql" % "mysql-connector-java" % V.jdbcMysql % Provided, + "com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test, + "org.postgresql" % "postgresql" % V.jdbcPg % Provided, + "com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test, + "net.java.dev.jna" % "jna" % V.jna % Test, + "com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided, + "com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test, + "com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided ) ++ Seq("org.apache.parquet" % "parquet-avro" % V.parquet % Provided).map( m => diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala index 9273f067..d85ab43b 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConnectorName.scala @@ -14,7 +14,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] { case object Kinesis extends FlinkConnectorName - case object KinesisFirehoseSink extends FlinkConnectorName + case object Firehose extends FlinkConnectorName case object Kafka extends FlinkConnectorName @@ -33,7 +33,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] { case object Generator extends FlinkConnectorName val sources: immutable.Seq[FlinkConnectorName] = - values diff IndexedSeq(Cassandra, Elasticsearch, KinesisFirehoseSink) + values diff IndexedSeq(Cassandra, Elasticsearch, Firehose) val sinks: immutable.Seq[FlinkConnectorName] = values diff IndexedSeq(Hybrid, Generator) @@ -62,10 +62,11 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] { val connector = (connectorNameOpt match { case Some(connectorName) => withNameInsensitiveOption(connectorName) case None => - val lcName = sourceOrSinkName.toLowerCase - val lcNameSuffixed = s"${lcName}_$sourceOrSink" + val lcName = sourceOrSinkName.toLowerCase.replaceAll("-", "_") + val lcNameSuffixed = s"${lcName}_$sourceOrSink" + val lcNameUnsuffixed = lcName.replace(s"_$sourceOrSink", "") values.find { c => - Seq(lcName, lcNameSuffixed).exists( + Seq(lcName, lcNameSuffixed, lcNameUnsuffixed).exists( _.contains(c.entryName.toLowerCase) ) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/KinesisProperties.scala b/src/main/scala/io/epiphanous/flinkrunner/model/KinesisProperties.scala index 508caa1a..21d63a72 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/KinesisProperties.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/KinesisProperties.scala @@ -1,5 +1,6 @@ package io.epiphanous.flinkrunner.model +import com.amazonaws.regions.Regions import io.epiphanous.flinkrunner.model.sink.SinkConfig import io.epiphanous.flinkrunner.util.ConfigToProps.{ getFromEither, @@ -21,6 +22,15 @@ case class KinesisProperties( maxRecordSizeInBytes: Option[Long]) object KinesisProperties { + + final val DEFAULT_REGION = Regions.US_EAST_1.getName + final val DEFAULT_FAIL_ON_ERROR = false + final val DEFAULT_MAX_BATCH_SIZE_IN_NUMBER = 500 + final val DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 4 * 1024 * 1024 + final val DEFAULT_MAX_BUFFERED_REQUESTS = 10000 + final val DEFAULT_MAX_BUFFER_TIME = 5000 + final val DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50 + def fromSinkConfig[SC <: SinkConfig[_]]( sinkConfig: SC): KinesisProperties = { val config = sinkConfig.config @@ -47,7 +57,7 @@ object KinesisProperties { clientProperties.putIfAbsent( AWSConfigConstants.AWS_REGION, - awsRegion.getOrElse("us-east-1") + DEFAULT_REGION ) awsEndpoint.foreach(endpoint => @@ -68,32 +78,69 @@ object KinesisProperties { config.getStringOpt ).getOrElse( throw new RuntimeException( - s"kinesis stream name required but missing in sink ${sinkConfig.name} of job ${config.jobName}" + s"kinesis stream name required but missing in sink <${sinkConfig.name}> of job <${config.jobName}>" ) ) - val failOnError: Boolean = - config.getBooleanOpt("fail.on.error").getOrElse(false) + val failOnError: Boolean = getFromEither( + pfx, + Seq("failOnError", "fail.on.error"), + config.getBooleanOpt + ).getOrElse(DEFAULT_FAIL_ON_ERROR) - val maxInFlightRequests: Int = - config.getIntOpt("max.in.flight.requests").getOrElse(50) + val maxInFlightRequests: Int = getFromEither( + pfx, + Seq("maxInFlightRequests", "max.in.flight.requests"), + config.getIntOpt + ).getOrElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS) val maxBufferedRequests: Int = - config.getIntOpt("max.buffer.requests").getOrElse(10000) + getFromEither( + pfx, + Seq("maxBufferedRequests", "max.buffered.requests"), + config.getIntOpt + ).getOrElse(DEFAULT_MAX_BUFFERED_REQUESTS) val maxBatchSizeInNumber: Int = - config.getIntOpt("max.batch.size.number").getOrElse(500) + getFromEither( + pfx, + Seq( + "maxBatchSizeInNumber", + "max.batch.size.in.number", + "max.batch.size.number" + ), + config.getIntOpt + ).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_NUMBER) val maxBatchSizeInBytes: Long = - config.getLongOpt("max.batch.size.bytes").getOrElse(4 * 1024 * 1024) - - val maxBufferTime: Long = config - .getDurationOpt("max.buffer.time") + getFromEither( + pfx, + Seq( + "maxBatchSizeInBytes", + "max.batch.size.in.bytes", + "max.batch.size.bytes" + ), + config.getLongOpt + ).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_BYTES) + + val maxBufferTime: Long = getFromEither( + pfx, + Seq("maxBufferTime", "max.buffer.time"), + config.getDurationOpt + ) .map(_.toMillis) - .getOrElse(5000) + .getOrElse(DEFAULT_MAX_BUFFER_TIME) - val maxRecordSize: Option[Long] = - config.getLongOpt("max.record.size") + val maxRecordSizeInBytes: Option[Long] = getFromEither( + pfx, + Seq( + "maxRecordSizeInBytes", + "maxRecordSize", + "max.record.size", + "max.record.size.in.bytes" + ), + config.getLongOpt + ) KinesisProperties( stream, @@ -104,7 +151,7 @@ object KinesisProperties { maxBufferedRequests, maxBufferTime, maxInFlightRequests, - maxRecordSize + maxRecordSizeInBytes ) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KinesisFirehoseSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/FirehoseSinkConfig.scala similarity index 97% rename from src/main/scala/io/epiphanous/flinkrunner/model/sink/KinesisFirehoseSinkConfig.scala rename to src/main/scala/io/epiphanous/flinkrunner/model/sink/FirehoseSinkConfig.scala index 1188e05b..10c84ebc 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KinesisFirehoseSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/FirehoseSinkConfig.scala @@ -57,13 +57,13 @@ import org.apache.flink.streaming.api.scala.DataStream * @tparam ADT * the flinkrunner algebraic data type */ -case class KinesisFirehoseSinkConfig[ADT <: FlinkEvent: TypeInformation]( +case class FirehoseSinkConfig[ADT <: FlinkEvent: TypeInformation]( name: String, config: FlinkConfig ) extends SinkConfig[ADT] with LazyLogging { override def connector: FlinkConnectorName = - FlinkConnectorName.KinesisFirehoseSink + FlinkConnectorName.Firehose val props: KinesisProperties = KinesisProperties.fromSinkConfig(this) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala index eb51fa5b..d421e6b5 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/SinkConfig.scala @@ -22,6 +22,7 @@ import java.util.Properties * - [[FlinkConnectorName.Cassandra]] * - [[FlinkConnectorName.Elasticsearch]] * - [[FlinkConnectorName.File]] + * - [[FlinkConnectorName.Firehose]] * - [[FlinkConnectorName.Jdbc]] * - [[FlinkConnectorName.Kafka]] * - [[FlinkConnectorName.Kinesis]] @@ -72,6 +73,7 @@ object SinkConfig { ) match { case Kafka => KafkaSinkConfig(name, config) case Kinesis => KinesisSinkConfig(name, config) + case Firehose => FirehoseSinkConfig(name, config) case File => FileSinkConfig(name, config) case Socket => SocketSinkConfig(name, config) case Jdbc => JdbcSinkConfig(name, config) @@ -82,7 +84,7 @@ object SinkConfig { case RabbitMQ => RabbitMQSinkConfig(name, config) case connector => throw new RuntimeException( - s"Don't know how to configure ${connector.entryName} sink connector $name (job ${config.jobName}" + s"Don't know how to configure ${connector.entryName} sink connector <$name> (in job <${config.jobName}>)" ) } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala index 4839b35d..5e9d5a83 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/FileSourceConfig.scala @@ -206,20 +206,20 @@ case class FileSourceConfig[ADT <: FlinkEvent]( WatermarkStrategy.noWatermarks(), rawName ) - .uid(rawName) + .uid(s"raw:$uid") + .setParallelism(parallelism) } def nameAndWatermark[E <: ADT: TypeInformation]( stream: DataStream[E], label: String): DataStream[E] = { stream - .name(label) - .uid(label) .assignTimestampsAndWatermarks( getWatermarkStrategy[E] ) .name(s"wm:$label") - .uid(s"wm:$label") + .uid(s"wm:$uid") + .setParallelism(parallelism) } def flatMapTextStream[E <: ADT: TypeInformation]( @@ -229,10 +229,7 @@ case class FileSourceConfig[ADT <: FlinkEvent]( textStream .flatMap[E](new FlatMapFunction[String, E] { override def flatMap(line: String, out: Collector[E]): Unit = - decoder.decode(line).foreach { e => - println(s"decoded event from $line") - out.collect(e) - } + decoder.decode(line).foreach(out.collect) }), label ) @@ -292,6 +289,8 @@ case class FileSourceConfig[ADT <: FlinkEvent]( getWatermarkStrategy, label ) + .uid(uid) + .setParallelism(parallelism) case StreamFormatName.Avro => val avroInputFormat = new AvroInputFormat( origin, @@ -311,6 +310,7 @@ case class FileSourceConfig[ADT <: FlinkEvent]( ) .uid(s"avro:$label") .name(s"avro:$label") + .setParallelism(parallelism) .map(g => toEmbeddedAvroInstance[E, A, ADT]( g, diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/RabbitMQSourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/RabbitMQSourceConfig.scala index f4088e03..44625743 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/RabbitMQSourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/RabbitMQSourceConfig.scala @@ -10,10 +10,6 @@ import io.epiphanous.flinkrunner.serde.JsonRMQDeserializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.connector.source.{Source, SourceSplit} import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.scala.{ - DataStream, - StreamExecutionEnvironment -} import org.apache.flink.streaming.connectors.rabbitmq.{ RMQDeserializationSchema, RMQSource @@ -34,6 +30,8 @@ case class RabbitMQSourceConfig[ADT <: FlinkEvent]( override val connector: FlinkConnectorName = FlinkConnectorName.RabbitMQ + override lazy val parallelism: Int = 1 // ensure exactly once + val uri: String = config.getString(pfx("uri")) val useCorrelationId: Boolean = config.getBoolean(pfx("use.correlation.id")) @@ -64,8 +62,4 @@ case class RabbitMQSourceConfig[ADT <: FlinkEvent]( ) ) - override def getSourceStream[E <: ADT: TypeInformation]( - env: StreamExecutionEnvironment): DataStream[E] = - super.getSourceStream(env).setParallelism(1) // to ensure exactly once - } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/SocketSourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/SocketSourceConfig.scala index 436bd617..59144555 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/SocketSourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/SocketSourceConfig.scala @@ -59,9 +59,9 @@ case class SocketSourceConfig[ADT <: FlinkEvent]( env .socketTextStream(host, port) .name(s"raw:$label") - .uid(s"raw:$label") + .uid(s"raw:$uid") .flatMap(line => decoder.decode(line)) - .uid(label) - .name(label) + .uid(uid) + .name(uid) } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala index cd0574a8..22cb7829 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/SourceConfig.scala @@ -1,5 +1,6 @@ package io.epiphanous.flinkrunner.model.source +import com.google.common.hash.Hashing import com.typesafe.scalalogging.LazyLogging import io.epiphanous.flinkrunner.model.FlinkConnectorName._ import io.epiphanous.flinkrunner.model._ @@ -12,6 +13,7 @@ import org.apache.flink.api.connector.source.{Source, SourceSplit} import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ +import java.nio.charset.StandardCharsets import java.time.Duration import java.util import java.util.Properties @@ -56,6 +58,23 @@ trait SourceConfig[ADT <: FlinkEvent] extends LazyLogging { def connector: FlinkConnectorName + lazy val label: String = + s"${config.jobName.toLowerCase}/${connector.entryName.toLowerCase}/$name" + + lazy val stdUid: String = Hashing + .sha256() + .hashString( + label, + StandardCharsets.UTF_8 + ) + .toString + + lazy val uid: String = config.getStringOpt(pfx("uid")).getOrElse(stdUid) + + lazy val parallelism: Int = config + .getIntOpt(pfx("parallelism")) + .getOrElse(config.globalParallelism) + def pfx(path: String = ""): String = Seq( Some("sources"), Some(name), @@ -67,8 +86,6 @@ trait SourceConfig[ADT <: FlinkEvent] extends LazyLogging { lazy val propertiesMap: util.HashMap[String, String] = properties.asJavaMap - lazy val label: String = s"${connector.entryName.toLowerCase}/$name" - val watermarkStrategy: String = Try(config.getString(pfx("watermark.strategy"))) .map(config.getWatermarkStrategy) @@ -151,7 +168,8 @@ trait SourceConfig[ADT <: FlinkEvent] extends LazyLogging { .name(label), s => env.fromSource(s, getWatermarkStrategy, label) ) - .uid(label) + .uid(uid) + .setParallelism(parallelism) } /** Flinkrunner calls this method to create a source stream from @@ -225,12 +243,14 @@ trait SourceConfig[ADT <: FlinkEvent] extends LazyLogging { .name(label), s => env.fromSource(s, getWatermarkStrategy, label) ) - .uid(label) + .uid(uid) + .setParallelism(parallelism) /** Flinkrunner calls this method to create an avro source stream. This * method uses the default implementation in * getAvroSourceStreamDefault(). Subclasses can provide their own * implementations. + * * @param env * a flink stream execution environment * @param fromKV diff --git a/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala b/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala index dc39cb61..ddb55112 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/util/AvroUtils.scala @@ -144,12 +144,12 @@ object AvroUtils extends LazyLogging { case array: java.util.List[_] => val convertedArray = array.asScala.map { case rec: GenericRecord => convertEmbeddedRecord(rec) - case v => v + case v => v } a.put(f, convertedArray.asJava) - case rec: GenericRecord => + case rec: GenericRecord => a.put(f, convertEmbeddedRecord(rec)) - case v => a.put(f, v) + case v => a.put(f, v) } a } diff --git a/src/main/scala/io/epiphanous/flinkrunner/util/InstantUtils.scala b/src/main/scala/io/epiphanous/flinkrunner/util/InstantUtils.scala index 080dcae6..3b08cd04 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/util/InstantUtils.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/util/InstantUtils.scala @@ -1,7 +1,7 @@ package io.epiphanous.flinkrunner.util -import java.time.{Instant, ZoneOffset} import java.time.format.DateTimeFormatter +import java.time.{Instant, ZoneOffset} object InstantUtils { diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/D64Spec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/D64Spec.scala index a2be0ac4..b2dabc0f 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/model/D64Spec.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/model/D64Spec.scala @@ -3,7 +3,6 @@ package io.epiphanous.flinkrunner.model import io.epiphanous.flinkrunner.PropSpec import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets import scala.collection.mutable import scala.util.Random diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/sink/FirehoseSinkConfigSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/sink/FirehoseSinkConfigSpec.scala new file mode 100644 index 00000000..1773eabd --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/sink/FirehoseSinkConfigSpec.scala @@ -0,0 +1,201 @@ +package io.epiphanous.flinkrunner.model.sink + +import com.amazonaws.regions.Regions +import io.epiphanous.flinkrunner.model.{KinesisProperties, MySimpleADT} +import io.epiphanous.flinkrunner.{FlinkRunner, PropSpec} +import org.apache.flink.api.scala.createTypeInformation + +class FirehoseSinkConfigSpec extends PropSpec { + + def testConfig(config: String): FlinkRunner[MySimpleADT] = + getRunner[MySimpleADT]( + Array("firehose"), + Some( + s""" + |jobs { + | firehose { + | sinks { + | firehose-sink { + | $config + | } + | } + | } + |} + |""".stripMargin + ) + ) + + property("unconfigured firehose") { + val runner = testConfig("") + the[RuntimeException] thrownBy runner + .getSinkConfig() should have message "kinesis stream name required but missing in sink of job " + } + + property("minimal configuration") { + val runner = testConfig("stream = stream") + noException should be thrownBy runner.getSinkConfig() + } + + property("stream.name") { + val runner = testConfig("stream.name = stream") + noException should be thrownBy runner.getSinkConfig() + } + + property("delivery.stream") { + val runner = testConfig("delivery.stream = stream") + noException should be thrownBy runner.getSinkConfig() + } + + property("delivery.stream.name") { + val runner = testConfig("delivery.stream.name = stream") + noException should be thrownBy runner.getSinkConfig() + } + + def getProps(config: String = ""): KinesisProperties = + testConfig("stream = stream\n" + config) + .getSinkConfig() + .asInstanceOf[FirehoseSinkConfig[MySimpleADT]] + .props + + def testProp[T]( + propList: Seq[String], + prop: KinesisProperties => T, + value: T): Unit = + (propList ++ propList.map(p => s"config.$p")).foreach(c => + prop(getProps(s"$c = $value")) shouldEqual value + ) + + property("stream name") { + getProps().stream shouldEqual "stream" + } + + property("default aws.region") { + getProps().clientProperties.getProperty( + "aws.region" + ) shouldEqual KinesisProperties.DEFAULT_REGION + } + + property("aws.region") { + testProp( + Seq( + "region", + "aws.region" + ), + _.clientProperties.getProperty("aws.region"), + Regions + .values() + .filterNot(_.getName != KinesisProperties.DEFAULT_REGION) + .head + .getName + ) + } + + property("default aws.endpoint") { + getProps().clientProperties + .getProperty("aws.endpoint", "none") shouldEqual "none" + } + + property("aws.endpoint") { + testProp( + Seq( + "endpoint", + "aws.endpoint" + ), + _.clientProperties.getProperty("aws.endpoint"), + "other" + ) + } + + property("default failOnError") { + getProps().failOnError shouldBe KinesisProperties.DEFAULT_FAIL_ON_ERROR + } + + property("failOnError") { + testProp( + Seq( + "fail.on.error", + "failOnError" + ), + _.failOnError, + !KinesisProperties.DEFAULT_FAIL_ON_ERROR + ) + } + + property("default maxInFlightRequests") { + getProps().maxInFlightRequests shouldEqual KinesisProperties.DEFAULT_MAX_IN_FLIGHT_REQUESTS + } + + property("maxInFlightRequests") { + testProp( + Seq( + "maxInFlightRequests", + "max.in.flight.requests" + ), + _.maxInFlightRequests, + 2 * KinesisProperties.DEFAULT_MAX_IN_FLIGHT_REQUESTS + ) + } + + property("default maxBufferedRequests") { + getProps().maxBufferedRequests shouldEqual KinesisProperties.DEFAULT_MAX_BUFFERED_REQUESTS + } + + property("maxBufferedRequests") { + testProp( + Seq( + "maxBufferedRequests", + "max.buffered.requests" + ), + _.maxBufferedRequests, + 2 * KinesisProperties.DEFAULT_MAX_BUFFERED_REQUESTS + ) + } + + property("default maxBatchSizeInNumber") { + getProps().maxBatchSizeInNumber shouldEqual KinesisProperties.DEFAULT_MAX_BATCH_SIZE_IN_NUMBER + } + + property("maxBatchSizeInNumber") { + testProp( + Seq( + "maxBatchSizeInNumber", + "max.batch.size.in.number", + "max.batch.size.number" + ), + _.maxBatchSizeInNumber, + 2 * KinesisProperties.DEFAULT_MAX_BATCH_SIZE_IN_NUMBER + ) + } + + property("default maxBatchSizeInBytes") { + getProps().maxBatchSizeInBytes shouldEqual KinesisProperties.DEFAULT_MAX_BATCH_SIZE_IN_BYTES + } + + property("maxBatchSizeInBytes") { + testProp( + Seq( + "maxBatchSizeInBytes", + "max.batch.size.in.bytes", + "max.batch.size.bytes" + ), + _.maxBatchSizeInBytes, + 2 * KinesisProperties.DEFAULT_MAX_BATCH_SIZE_IN_BYTES + ) + } + + property("default maxBufferTime") { + getProps().maxBufferTime shouldEqual KinesisProperties.DEFAULT_MAX_BUFFER_TIME + } + + property("maxBufferTime") { + testProp( + Seq( + "maxBufferTime", + "max.buffer.time" + ), + _.maxBufferTime, + 2 * KinesisProperties.DEFAULT_MAX_BUFFER_TIME + ) + } + +}