Skip to content

Commit

Permalink
added test for firehose config
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Dec 11, 2022
1 parent f7aa264 commit 601f530
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 79 deletions.
55 changes: 28 additions & 27 deletions build.sbt
Expand Up @@ -115,33 +115,34 @@ val circeDeps = Seq(
).map(d => "io.circe" %% s"circe-$d" % V.circe)

val otherDeps = Seq(
"io.dropwizard.metrics" % "metrics-core" % V.dropWizard % Provided,
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
"com.beachape" %% "enumeratum" % V.enumeratum,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"org.scalactic" %% "scalactic" % V.scalaTest,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
"com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test,
"org.postgresql" % "postgresql" % V.jdbcPg % Provided,
"com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test,
"net.java.dev.jna" % "jna" % V.jna % Test,
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test,
"com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided
"io.dropwizard.metrics" % "metrics-core" % V.dropWizard % Provided,
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
"com.beachape" %% "enumeratum" % V.enumeratum,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"org.scalactic" %% "scalactic" % V.scalaTest,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-localstack-v2" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
"com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test,
"org.postgresql" % "postgresql" % V.jdbcPg % Provided,
"com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test,
"net.java.dev.jna" % "jna" % V.jna % Test,
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test,
"com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided
) ++
Seq("org.apache.parquet" % "parquet-avro" % V.parquet % Provided).map(
m =>
Expand Down
Expand Up @@ -14,7 +14,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {

case object Kinesis extends FlinkConnectorName

case object KinesisFirehoseSink extends FlinkConnectorName
case object Firehose extends FlinkConnectorName

case object Kafka extends FlinkConnectorName

Expand All @@ -33,7 +33,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
case object Generator extends FlinkConnectorName

val sources: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Cassandra, Elasticsearch, KinesisFirehoseSink)
values diff IndexedSeq(Cassandra, Elasticsearch, Firehose)
val sinks: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Hybrid, Generator)

Expand Down Expand Up @@ -62,10 +62,11 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
val connector = (connectorNameOpt match {
case Some(connectorName) => withNameInsensitiveOption(connectorName)
case None =>
val lcName = sourceOrSinkName.toLowerCase
val lcNameSuffixed = s"${lcName}_$sourceOrSink"
val lcName = sourceOrSinkName.toLowerCase.replaceAll("-", "_")
val lcNameSuffixed = s"${lcName}_$sourceOrSink"
val lcNameUnsuffixed = lcName.replace(s"_$sourceOrSink", "")
values.find { c =>
Seq(lcName, lcNameSuffixed).exists(
Seq(lcName, lcNameSuffixed, lcNameUnsuffixed).exists(
_.contains(c.entryName.toLowerCase)
)
}
Expand Down
@@ -1,5 +1,6 @@
package io.epiphanous.flinkrunner.model

import com.amazonaws.regions.Regions
import io.epiphanous.flinkrunner.model.sink.SinkConfig
import io.epiphanous.flinkrunner.util.ConfigToProps.{
getFromEither,
Expand All @@ -21,6 +22,15 @@ case class KinesisProperties(
maxRecordSizeInBytes: Option[Long])

object KinesisProperties {

final val DEFAULT_REGION = Regions.US_EAST_1.getName
final val DEFAULT_FAIL_ON_ERROR = false
final val DEFAULT_MAX_BATCH_SIZE_IN_NUMBER = 500
final val DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 4 * 1024 * 1024
final val DEFAULT_MAX_BUFFERED_REQUESTS = 10000
final val DEFAULT_MAX_BUFFER_TIME = 5000
final val DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50

def fromSinkConfig[SC <: SinkConfig[_]](
sinkConfig: SC): KinesisProperties = {
val config = sinkConfig.config
Expand All @@ -47,7 +57,7 @@ object KinesisProperties {

clientProperties.putIfAbsent(
AWSConfigConstants.AWS_REGION,
awsRegion.getOrElse("us-east-1")
DEFAULT_REGION
)

awsEndpoint.foreach(endpoint =>
Expand All @@ -68,32 +78,69 @@ object KinesisProperties {
config.getStringOpt
).getOrElse(
throw new RuntimeException(
s"kinesis stream name required but missing in sink ${sinkConfig.name} of job ${config.jobName}"
s"kinesis stream name required but missing in sink <${sinkConfig.name}> of job <${config.jobName}>"
)
)

val failOnError: Boolean =
config.getBooleanOpt("fail.on.error").getOrElse(false)
val failOnError: Boolean = getFromEither(
pfx,
Seq("failOnError", "fail.on.error"),
config.getBooleanOpt
).getOrElse(DEFAULT_FAIL_ON_ERROR)

val maxInFlightRequests: Int =
config.getIntOpt("max.in.flight.requests").getOrElse(50)
val maxInFlightRequests: Int = getFromEither(
pfx,
Seq("maxInFlightRequests", "max.in.flight.requests"),
config.getIntOpt
).getOrElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS)

val maxBufferedRequests: Int =
config.getIntOpt("max.buffer.requests").getOrElse(10000)
getFromEither(
pfx,
Seq("maxBufferedRequests", "max.buffered.requests"),
config.getIntOpt
).getOrElse(DEFAULT_MAX_BUFFERED_REQUESTS)

val maxBatchSizeInNumber: Int =
config.getIntOpt("max.batch.size.number").getOrElse(500)
getFromEither(
pfx,
Seq(
"maxBatchSizeInNumber",
"max.batch.size.in.number",
"max.batch.size.number"
),
config.getIntOpt
).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_NUMBER)

val maxBatchSizeInBytes: Long =
config.getLongOpt("max.batch.size.bytes").getOrElse(4 * 1024 * 1024)

val maxBufferTime: Long = config
.getDurationOpt("max.buffer.time")
getFromEither(
pfx,
Seq(
"maxBatchSizeInBytes",
"max.batch.size.in.bytes",
"max.batch.size.bytes"
),
config.getLongOpt
).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_BYTES)

val maxBufferTime: Long = getFromEither(
pfx,
Seq("maxBufferTime", "max.buffer.time"),
config.getDurationOpt
)
.map(_.toMillis)
.getOrElse(5000)
.getOrElse(DEFAULT_MAX_BUFFER_TIME)

val maxRecordSize: Option[Long] =
config.getLongOpt("max.record.size")
val maxRecordSizeInBytes: Option[Long] = getFromEither(
pfx,
Seq(
"maxRecordSizeInBytes",
"maxRecordSize",
"max.record.size",
"max.record.size.in.bytes"
),
config.getLongOpt
)

KinesisProperties(
stream,
Expand All @@ -104,7 +151,7 @@ object KinesisProperties {
maxBufferedRequests,
maxBufferTime,
maxInFlightRequests,
maxRecordSize
maxRecordSizeInBytes
)
}

Expand Down
Expand Up @@ -57,13 +57,13 @@ import org.apache.flink.streaming.api.scala.DataStream
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class KinesisFirehoseSinkConfig[ADT <: FlinkEvent: TypeInformation](
case class FirehoseSinkConfig[ADT <: FlinkEvent: TypeInformation](
name: String,
config: FlinkConfig
) extends SinkConfig[ADT]
with LazyLogging {
override def connector: FlinkConnectorName =
FlinkConnectorName.KinesisFirehoseSink
FlinkConnectorName.Firehose

val props: KinesisProperties = KinesisProperties.fromSinkConfig(this)

Expand Down
Expand Up @@ -22,6 +22,7 @@ import java.util.Properties
* - [[FlinkConnectorName.Cassandra]]
* - [[FlinkConnectorName.Elasticsearch]]
* - [[FlinkConnectorName.File]]
* - [[FlinkConnectorName.Firehose]]
* - [[FlinkConnectorName.Jdbc]]
* - [[FlinkConnectorName.Kafka]]
* - [[FlinkConnectorName.Kinesis]]
Expand Down Expand Up @@ -72,6 +73,7 @@ object SinkConfig {
) match {
case Kafka => KafkaSinkConfig(name, config)
case Kinesis => KinesisSinkConfig(name, config)
case Firehose => FirehoseSinkConfig(name, config)
case File => FileSinkConfig(name, config)
case Socket => SocketSinkConfig(name, config)
case Jdbc => JdbcSinkConfig(name, config)
Expand All @@ -82,7 +84,7 @@ object SinkConfig {
case RabbitMQ => RabbitMQSinkConfig(name, config)
case connector =>
throw new RuntimeException(
s"Don't know how to configure ${connector.entryName} sink connector $name (job ${config.jobName}"
s"Don't know how to configure ${connector.entryName} sink connector <$name> (in job <${config.jobName}>)"
)
}
}
Expand Down
Expand Up @@ -206,20 +206,20 @@ case class FileSourceConfig[ADT <: FlinkEvent](
WatermarkStrategy.noWatermarks(),
rawName
)
.uid(rawName)
.uid(s"raw:$uid")
.setParallelism(parallelism)
}

def nameAndWatermark[E <: ADT: TypeInformation](
stream: DataStream[E],
label: String): DataStream[E] = {
stream
.name(label)
.uid(label)
.assignTimestampsAndWatermarks(
getWatermarkStrategy[E]
)
.name(s"wm:$label")
.uid(s"wm:$label")
.uid(s"wm:$uid")
.setParallelism(parallelism)
}

def flatMapTextStream[E <: ADT: TypeInformation](
Expand All @@ -229,10 +229,7 @@ case class FileSourceConfig[ADT <: FlinkEvent](
textStream
.flatMap[E](new FlatMapFunction[String, E] {
override def flatMap(line: String, out: Collector[E]): Unit =
decoder.decode(line).foreach { e =>
println(s"decoded event from $line")
out.collect(e)
}
decoder.decode(line).foreach(out.collect)
}),
label
)
Expand Down Expand Up @@ -292,6 +289,8 @@ case class FileSourceConfig[ADT <: FlinkEvent](
getWatermarkStrategy,
label
)
.uid(uid)
.setParallelism(parallelism)
case StreamFormatName.Avro =>
val avroInputFormat = new AvroInputFormat(
origin,
Expand All @@ -311,6 +310,7 @@ case class FileSourceConfig[ADT <: FlinkEvent](
)
.uid(s"avro:$label")
.name(s"avro:$label")
.setParallelism(parallelism)
.map(g =>
toEmbeddedAvroInstance[E, A, ADT](
g,
Expand Down
Expand Up @@ -10,10 +10,6 @@ import io.epiphanous.flinkrunner.serde.JsonRMQDeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.source.{Source, SourceSplit}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.connectors.rabbitmq.{
RMQDeserializationSchema,
RMQSource
Expand All @@ -34,6 +30,8 @@ case class RabbitMQSourceConfig[ADT <: FlinkEvent](

override val connector: FlinkConnectorName = FlinkConnectorName.RabbitMQ

override lazy val parallelism: Int = 1 // ensure exactly once

val uri: String = config.getString(pfx("uri"))
val useCorrelationId: Boolean =
config.getBoolean(pfx("use.correlation.id"))
Expand Down Expand Up @@ -64,8 +62,4 @@ case class RabbitMQSourceConfig[ADT <: FlinkEvent](
)
)

override def getSourceStream[E <: ADT: TypeInformation](
env: StreamExecutionEnvironment): DataStream[E] =
super.getSourceStream(env).setParallelism(1) // to ensure exactly once

}
Expand Up @@ -59,9 +59,9 @@ case class SocketSourceConfig[ADT <: FlinkEvent](
env
.socketTextStream(host, port)
.name(s"raw:$label")
.uid(s"raw:$label")
.uid(s"raw:$uid")
.flatMap(line => decoder.decode(line))
.uid(label)
.name(label)
.uid(uid)
.name(uid)
}
}

0 comments on commit 601f530

Please sign in to comment.