Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support kinesis firehose, unify kinesis config, add better metric support #64

Closed
wants to merge 10 commits into from
107 changes: 57 additions & 50 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,42 +53,47 @@ val V = new {
val cassandraDriver = "3.11.3"
val uuidCreator = "5.2.0"
val jna = "5.12.1" // needed for testcontainers in some jvms
val dropWizard = "4.2.13"
}

val flinkDeps =
Seq(
// scala
"org.apache.flink" %% "flink-scala" % V.flink,
"org.apache.flink" %% "flink-streaming-scala" % V.flink,
"org.apache.flink" %% "flink-scala" % V.flink,
"org.apache.flink" %% "flink-streaming-scala" % V.flink,
// rocksdb
"org.apache.flink" % "flink-statebackend-rocksdb" % V.flink,
"org.apache.flink" % "flink-statebackend-rocksdb" % V.flink,
// queryable state
"org.apache.flink" % "flink-queryable-state-runtime" % V.flink % Provided,
"org.apache.flink" % "flink-queryable-state-runtime" % V.flink % Provided,
// complex event processing
"org.apache.flink" % "flink-cep" % V.flink % Provided,
"org.apache.flink" % "flink-cep" % V.flink % Provided,
// connectors
"org.apache.flink" % "flink-connector-base" % V.flink % Provided, // ds hybrid source
"org.apache.flink" % "flink-connector-files" % V.flink % Provided, // ds text files
"org.apache.flink" % "flink-parquet" % V.flink % Provided, // parquet bulk sink
"org.apache.flink" % "flink-connector-kafka" % V.flink % Provided,
"org.apache.flink" % "flink-connector-kinesis" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-cassandra" % V.flink % Provided,
"org.apache.flink" % "flink-connector-elasticsearch7" % V.flink % Provided,
"org.apache.flink" % "flink-connector-jdbc" % V.flink % Provided,
"org.apache.flink" % "flink-connector-rabbitmq" % V.flink % Provided,
"org.apache.flink" % "flink-connector-base" % V.flink % Provided, // ds hybrid source
"org.apache.flink" % "flink-connector-files" % V.flink % Provided, // ds text files
"org.apache.flink" % "flink-parquet" % V.flink % Provided, // parquet bulk sink
"org.apache.flink" % "flink-connector-kafka" % V.flink % Provided,
"org.apache.flink" % "flink-connector-kinesis" % V.flink % Provided,
"org.apache.flink" % "flink-connector-aws-kinesis-streams" % V.flink % Provided,
"org.apache.flink" % "flink-connector-aws-kinesis-firehose" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-cassandra" % V.flink % Provided,
"org.apache.flink" % "flink-connector-elasticsearch7" % V.flink % Provided,
"org.apache.flink" % "flink-connector-jdbc" % V.flink % Provided,
"org.apache.flink" % "flink-connector-rabbitmq" % V.flink % Provided,
// avro support
"org.apache.flink" % "flink-avro" % V.flink % Provided, // ds and table avro format
"org.apache.flink" % "flink-avro-confluent-registry" % V.flink % Provided, // ds and table avro registry format
"org.apache.flink" % "flink-avro" % V.flink % Provided, // ds and table avro format
"org.apache.flink" % "flink-avro-confluent-registry" % V.flink % Provided, // ds and table avro registry format
// table api support
"org.apache.flink" %% "flink-table-api-scala-bridge" % V.flink, // table api scala
"org.apache.flink" % "flink-table-planner-loader" % V.flink % Provided, // table api
"org.apache.flink" % "flink-table-runtime" % V.flink % Provided, // table runtime
"org.apache.flink" % "flink-csv" % V.flink % Provided, // table api csv format
"org.apache.flink" % "flink-json" % V.flink % Provided, // table api json format
"org.apache.flink" % "flink-clients" % V.flink,
"org.apache.flink" %% "flink-table-api-scala-bridge" % V.flink, // table api scala
"org.apache.flink" % "flink-table-planner-loader" % V.flink % Provided, // table api
"org.apache.flink" % "flink-table-runtime" % V.flink % Provided, // table runtime
"org.apache.flink" % "flink-csv" % V.flink % Provided, // table api csv format
"org.apache.flink" % "flink-json" % V.flink % Provided, // table api json format
"org.apache.flink" % "flink-clients" % V.flink,
// dropwizard metrics support
"org.apache.flink" % "flink-metrics-dropwizard" % V.flink % Provided,
// test support
"org.apache.flink" % "flink-test-utils" % V.flink % Test,
"org.apache.flink" % "flink-runtime-web" % V.flink % Test
"org.apache.flink" % "flink-test-utils" % V.flink % Test,
"org.apache.flink" % "flink-runtime-web" % V.flink % Test
)

val loggingDeps = Seq(
Expand All @@ -110,32 +115,34 @@ val circeDeps = Seq(
).map(d => "io.circe" %% s"circe-$d" % V.circe)

val otherDeps = Seq(
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
"com.beachape" %% "enumeratum" % V.enumeratum,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"org.scalactic" %% "scalactic" % V.scalaTest,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
"com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test,
"org.postgresql" % "postgresql" % V.jdbcPg % Provided,
"com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test,
"net.java.dev.jna" % "jna" % V.jna % Test,
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test,
"com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided
"io.dropwizard.metrics" % "metrics-core" % V.dropWizard % Provided,
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
"com.beachape" %% "enumeratum" % V.enumeratum,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"org.scalactic" %% "scalactic" % V.scalaTest,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-localstack-v2" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
"com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test,
"org.postgresql" % "postgresql" % V.jdbcPg % Provided,
"com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test,
"net.java.dev.jna" % "jna" % V.jna % Test,
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test,
"com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided
) ++
Seq("org.apache.parquet" % "parquet-avro" % V.parquet % Provided).map(
m =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ class BasicJdbcConnectionProvider(

val url: String = jdbcOptions.getDbURL

jdbcOptions.getUsername.ifPresent(user =>
jdbcOptions.getUsername.ifPresent { user =>
props.setProperty("user", user)
)
jdbcOptions.getPassword.ifPresent(pwd =>
()
}
jdbcOptions.getPassword.ifPresent { pwd =>
props.setProperty("password", pwd)
)
()
}

@transient
var connection: Connection = _
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/epiphanous/flinkrunner/model/D64.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object D64 {
val len = data.length
var hang = 0
data.zipWithIndex.foreach { case (v, i) =>
val v2 = if (v < 0) v + 0x100 else v
val v2 = if (v < 0) v.toInt + 0x100 else v.toInt
(i % 3: @switch) match {
case 0 =>
sb += chars(v2 >> 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {

case object Kinesis extends FlinkConnectorName

case object Firehose extends FlinkConnectorName

case object Kafka extends FlinkConnectorName

case object File extends FlinkConnectorName
Expand All @@ -31,7 +33,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
case object Generator extends FlinkConnectorName

val sources: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Cassandra, Elasticsearch)
values diff IndexedSeq(Cassandra, Elasticsearch, Firehose)
val sinks: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Hybrid, Generator)

Expand Down Expand Up @@ -60,10 +62,11 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
val connector = (connectorNameOpt match {
case Some(connectorName) => withNameInsensitiveOption(connectorName)
case None =>
val lcName = sourceOrSinkName.toLowerCase
val lcNameSuffixed = s"${lcName}_$sourceOrSink"
val lcName = sourceOrSinkName.toLowerCase.replaceAll("-", "_")
val lcNameSuffixed = s"${lcName}_$sourceOrSink"
val lcNameUnsuffixed = lcName.replace(s"_$sourceOrSink", "")
values.find { c =>
Seq(lcName, lcNameSuffixed).exists(
Seq(lcName, lcNameSuffixed, lcNameUnsuffixed).exists(
_.contains(c.entryName.toLowerCase)
)
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/epiphanous/flinkrunner/model/Id64.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ object Id64 {
def ticksOf(id: String): Long = uuidOf(id).timestamp()
def microsOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10)
.floor((ticksOf(id) - GREGORIAN_OFFSET).toDouble / 10.0)
.toLong
def millisOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10000)
.floor((ticksOf(id) - GREGORIAN_OFFSET).toDouble / 10000.0)
.toLong
def instantOf(id: String): Instant = {
val t = ticksOf(id) - GREGORIAN_OFFSET
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package io.epiphanous.flinkrunner.model

import com.amazonaws.regions.Regions
import io.epiphanous.flinkrunner.model.sink.SinkConfig
import io.epiphanous.flinkrunner.util.ConfigToProps.{
getFromEither,
RichConfigObject
}
import org.apache.flink.connector.aws.config.AWSConfigConstants

import java.util.Properties

case class KinesisProperties(
stream: String,
clientProperties: Properties,
failOnError: Boolean,
maxBatchSizeInNumber: Int,
maxBatchSizeInBytes: Long,
maxBufferedRequests: Int,
maxBufferTime: Long,
maxInFlightRequests: Int,
maxRecordSizeInBytes: Option[Long])

object KinesisProperties {

final val DEFAULT_REGION = Regions.US_EAST_1.getName
final val DEFAULT_FAIL_ON_ERROR = false
final val DEFAULT_MAX_BATCH_SIZE_IN_NUMBER = 500
final val DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 4 * 1024 * 1024
final val DEFAULT_MAX_BUFFERED_REQUESTS = 10000
final val DEFAULT_MAX_BUFFER_TIME = 5000
final val DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50

def fromSinkConfig[SC <: SinkConfig[_]](
sinkConfig: SC): KinesisProperties = {
val config = sinkConfig.config
val pfx = sinkConfig.pfx()

val awsRegion: Option[String] = getFromEither(
pfx,
Seq("aws.region", "region", AWSConfigConstants.AWS_REGION),
config.getStringOpt
)

val awsEndpoint: Option[String] = getFromEither(
pfx,
Seq("aws.endpoint", "endpoint", AWSConfigConstants.AWS_ENDPOINT),
config.getStringOpt
)

val clientProperties: Properties =
getFromEither(
pfx,
Seq("client"),
config.getObjectOption
).asProperties

clientProperties.putIfAbsent(
AWSConfigConstants.AWS_REGION,
DEFAULT_REGION
)

awsEndpoint.foreach(endpoint =>
clientProperties.putIfAbsent(
AWSConfigConstants.AWS_ENDPOINT,
endpoint
)
)

val stream: String = getFromEither(
pfx,
Seq(
"stream",
"stream.name",
"delivery.stream",
"delivery.stream.name"
),
config.getStringOpt
).getOrElse(
throw new RuntimeException(
s"kinesis stream name required but missing in sink <${sinkConfig.name}> of job <${config.jobName}>"
)
)

val failOnError: Boolean = getFromEither(
pfx,
Seq("failOnError", "fail.on.error"),
config.getBooleanOpt
).getOrElse(DEFAULT_FAIL_ON_ERROR)

val maxInFlightRequests: Int = getFromEither(
pfx,
Seq("maxInFlightRequests", "max.in.flight.requests"),
config.getIntOpt
).getOrElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS)

val maxBufferedRequests: Int =
getFromEither(
pfx,
Seq("maxBufferedRequests", "max.buffered.requests"),
config.getIntOpt
).getOrElse(DEFAULT_MAX_BUFFERED_REQUESTS)

val maxBatchSizeInNumber: Int =
getFromEither(
pfx,
Seq(
"maxBatchSizeInNumber",
"max.batch.size.in.number",
"max.batch.size.number"
),
config.getIntOpt
).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_NUMBER)

val maxBatchSizeInBytes: Long =
getFromEither(
pfx,
Seq(
"maxBatchSizeInBytes",
"max.batch.size.in.bytes",
"max.batch.size.bytes"
),
config.getLongOpt
).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_BYTES)

val maxBufferTime: Long = getFromEither(
pfx,
Seq("maxBufferTime", "max.buffer.time"),
config.getDurationOpt
)
.map(_.toMillis)
.getOrElse(DEFAULT_MAX_BUFFER_TIME)

val maxRecordSizeInBytes: Option[Long] = getFromEither(
pfx,
Seq(
"maxRecordSizeInBytes",
"maxRecordSize",
"max.record.size",
"max.record.size.in.bytes"
),
config.getLongOpt
)

KinesisProperties(
stream,
clientProperties,
failOnError,
maxBatchSizeInNumber,
maxBatchSizeInBytes,
maxBufferedRequests,
maxBufferTime,
maxInFlightRequests,
maxRecordSizeInBytes
)
}

}