Skip to content

Commit

Permalink
merged PR#64
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Apr 7, 2023
2 parents 3398a49 + a29cd4c commit 7693447
Show file tree
Hide file tree
Showing 17 changed files with 945 additions and 117 deletions.
62 changes: 33 additions & 29 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ resolvers += "Confluent Repository" at "https://packages.confluent.io/maven/"
val V = new {
val flink = "1.16.1"
val flinkMinor = "1.16"
val logback = "1.4.5"
val logback = "1.4.6"
val scalaLogging = "3.9.5"
val scalaTest = "3.2.15"
val scalaTestPlus = "3.2.15.0"
Expand All @@ -55,46 +55,50 @@ val V = new {
val uuidCreator = "5.2.0"
val iceberg = "1.2.0"
val jna = "5.12.1" // needed for testcontainers in some jvms
val minio = "8.5.2"
val awsSdk2 = "2.20.26"
val dropWizard = "4.2.17"
}

val flinkDeps =
Seq(
// scala
"org.apache.flink" %% "flink-scala" % V.flink,
"org.apache.flink" %% "flink-streaming-scala" % V.flink,
"org.apache.flink" %% "flink-scala" % V.flink,
"org.apache.flink" %% "flink-streaming-scala" % V.flink,
// rocksdb
"org.apache.flink" % "flink-statebackend-rocksdb" % V.flink,
"org.apache.flink" % "flink-statebackend-rocksdb" % V.flink,
// sql parser
"org.apache.flink" % "flink-sql-parser" % V.flink,
"org.apache.flink" % "flink-sql-parser" % V.flink,
// queryable state
"org.apache.flink" % "flink-queryable-state-runtime" % V.flink % Provided,
"org.apache.flink" % "flink-queryable-state-runtime" % V.flink % Provided,
// complex event processing
"org.apache.flink" % "flink-cep" % V.flink % Provided,
"org.apache.flink" % "flink-cep" % V.flink % Provided,
// connectors
"org.apache.flink" % "flink-connector-base" % V.flink % Provided, // ds hybrid source
"org.apache.flink" % "flink-connector-files" % V.flink % Provided, // ds text files
"org.apache.flink" % "flink-parquet" % V.flink % Provided, // parquet bulk sink
"org.apache.flink" % "flink-connector-kafka" % V.flink % Provided,
"org.apache.flink" % "flink-connector-kinesis" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-cassandra" % V.flink % Provided,
"org.apache.flink" % "flink-connector-elasticsearch7" % V.flink % Provided,
"org.apache.flink" % "flink-connector-jdbc" % V.flink % Provided,
"org.apache.flink" % "flink-connector-rabbitmq" % V.flink % Provided,
"org.apache.flink" % "flink-connector-base" % V.flink % Provided, // ds hybrid source
"org.apache.flink" % "flink-connector-files" % V.flink % Provided, // ds text files
"org.apache.flink" % "flink-parquet" % V.flink % Provided, // parquet bulk sink
"org.apache.flink" % "flink-connector-kafka" % V.flink % Provided,
"org.apache.flink" % "flink-connector-kinesis" % V.flink % Provided,
"org.apache.flink" % "flink-connector-aws-kinesis-streams" % V.flink % Provided,
"org.apache.flink" % "flink-connector-aws-kinesis-firehose" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-cassandra" % V.flink % Provided,
"org.apache.flink" % "flink-connector-elasticsearch7" % V.flink % Provided,
"org.apache.flink" % "flink-connector-jdbc" % V.flink % Provided,
"org.apache.flink" % "flink-connector-rabbitmq" % V.flink % Provided,
// avro support
"org.apache.flink" % "flink-avro" % V.flink % Provided, // ds and table avro format
"org.apache.flink" % "flink-avro-confluent-registry" % V.flink % Provided, // ds and table avro registry format
"org.apache.flink" % "flink-avro" % V.flink % Provided, // ds and table avro format
"org.apache.flink" % "flink-avro-confluent-registry" % V.flink % Provided, // ds and table avro registry format
// table api support
"org.apache.flink" %% "flink-table-api-scala-bridge" % V.flink, // table api scala
"org.apache.flink" % "flink-table-planner-loader" % V.flink % Provided, // table api
"org.apache.flink" % "flink-table-runtime" % V.flink % Provided, // table runtime
"org.apache.flink" % "flink-csv" % V.flink % Provided, // table api csv format
"org.apache.flink" % "flink-json" % V.flink % Provided, // table api json format
"org.apache.flink" % "flink-clients" % V.flink,
"org.apache.flink" %% "flink-table-api-scala-bridge" % V.flink, // table api scala
"org.apache.flink" % "flink-table-planner-loader" % V.flink % Provided, // table api
"org.apache.flink" % "flink-table-runtime" % V.flink % Provided, // table runtime
"org.apache.flink" % "flink-csv" % V.flink % Provided, // table api csv format
"org.apache.flink" % "flink-json" % V.flink % Provided, // table api json format
"org.apache.flink" % "flink-clients" % V.flink,
// dropwizard metrics support
"org.apache.flink" % "flink-metrics-dropwizard" % V.flink % Provided,
// test support
"org.apache.flink" % "flink-test-utils" % V.flink % Test,
"org.apache.flink" % "flink-runtime-web" % V.flink % Test
"org.apache.flink" % "flink-test-utils" % V.flink % Test,
"org.apache.flink" % "flink-runtime-web" % V.flink % Test
)

val loggingDeps = Seq(
Expand Down Expand Up @@ -136,7 +140,6 @@ val otherDeps = Seq(
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.typesafe" % "config" % V.typesafeConfig,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"io.minio" % "minio" % V.minio % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
"net.java.dev.jna" % "jna" % V.jna % Test,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
Expand All @@ -148,7 +151,8 @@ val otherDeps = Seq(
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.typelevel" %% "squants" % V.squants,
"software.amazon.awssdk" % "aws-sdk-java" % V.awsSdk2 % Test,
"software.amazon.awssdk" % "url-connection-client" % V.awsSdk2 % Test
"software.amazon.awssdk" % "url-connection-client" % V.awsSdk2 % Test,
"io.dropwizard.metrics" % "metrics-core" % V.dropWizard % Provided
) ++
Seq("org.apache.parquet" % "parquet-avro" % V.parquet % Provided).map(
m =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ class BasicJdbcConnectionProvider(

val url: String = jdbcOptions.getDbURL

jdbcOptions.getUsername.ifPresent(user =>
jdbcOptions.getUsername.ifPresent { user =>
props.setProperty("user", user)
)
jdbcOptions.getPassword.ifPresent(pwd =>
()
}
jdbcOptions.getPassword.ifPresent { pwd =>
props.setProperty("password", pwd)
)
()
}

@transient
var connection: Connection = _
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/epiphanous/flinkrunner/model/D64.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object D64 {
val len = data.length
var hang = 0
data.zipWithIndex.foreach { case (v, i) =>
val v2 = if (v < 0) v + 0x100 else v
val v2 = if (v < 0) v.toInt + 0x100 else v.toInt
(i % 3: @switch) match {
case 0 =>
sb += chars(v2 >> 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {

case object Kinesis extends FlinkConnectorName

case object Firehose extends FlinkConnectorName

case object Kafka extends FlinkConnectorName

case object File extends FlinkConnectorName
Expand All @@ -35,7 +37,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
case object Print extends FlinkConnectorName

val sources: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Cassandra, Elasticsearch, Print)
values diff IndexedSeq(Cassandra, Elasticsearch, Firehose, Print)
val sinks: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Hybrid, Generator)

Expand Down Expand Up @@ -64,10 +66,11 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
val connector = (connectorNameOpt match {
case Some(connectorName) => withNameInsensitiveOption(connectorName)
case None =>
val lcName = sourceOrSinkName.toLowerCase
val lcNameSuffixed = s"${lcName}_$sourceOrSink"
val lcName = sourceOrSinkName.toLowerCase.replaceAll("-", "_")
val lcNameSuffixed = s"${lcName}_$sourceOrSink"
val lcNameUnsuffixed = lcName.replace(s"_$sourceOrSink", "")
values.find { c =>
Seq(lcName, lcNameSuffixed).exists(
Seq(lcName, lcNameSuffixed, lcNameUnsuffixed).exists(
_.contains(c.entryName.toLowerCase)
)
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/epiphanous/flinkrunner/model/Id64.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ object Id64 {
def ticksOf(id: String): Long = uuidOf(id).timestamp()
def microsOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10)
.floor((ticksOf(id) - GREGORIAN_OFFSET).toDouble / 10.0)
.toLong
def millisOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10000)
.floor((ticksOf(id) - GREGORIAN_OFFSET).toDouble / 10000.0)
.toLong
def instantOf(id: String): Instant = {
val t = ticksOf(id) - GREGORIAN_OFFSET
Expand Down
158 changes: 158 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/model/KinesisProperties.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package io.epiphanous.flinkrunner.model

import com.amazonaws.regions.Regions
import io.epiphanous.flinkrunner.model.sink.SinkConfig
import io.epiphanous.flinkrunner.util.ConfigToProps.{
getFromEither,
RichConfigObject
}
import org.apache.flink.connector.aws.config.AWSConfigConstants

import java.util.Properties

case class KinesisProperties(
stream: String,
clientProperties: Properties,
failOnError: Boolean,
maxBatchSizeInNumber: Int,
maxBatchSizeInBytes: Long,
maxBufferedRequests: Int,
maxBufferTime: Long,
maxInFlightRequests: Int,
maxRecordSizeInBytes: Option[Long])

object KinesisProperties {

final val DEFAULT_REGION = Regions.US_EAST_1.getName
final val DEFAULT_FAIL_ON_ERROR = false
final val DEFAULT_MAX_BATCH_SIZE_IN_NUMBER = 500
final val DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 4 * 1024 * 1024
final val DEFAULT_MAX_BUFFERED_REQUESTS = 10000
final val DEFAULT_MAX_BUFFER_TIME = 5000
final val DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50

def fromSinkConfig[SC <: SinkConfig[_]](
sinkConfig: SC): KinesisProperties = {
val config = sinkConfig.config
val pfx = sinkConfig.pfx()

val awsRegion: Option[String] = getFromEither(
pfx,
Seq("aws.region", "region", AWSConfigConstants.AWS_REGION),
config.getStringOpt
)

val awsEndpoint: Option[String] = getFromEither(
pfx,
Seq("aws.endpoint", "endpoint", AWSConfigConstants.AWS_ENDPOINT),
config.getStringOpt
)

val clientProperties: Properties =
getFromEither(
pfx,
Seq("client"),
config.getObjectOption
).asProperties

clientProperties.putIfAbsent(
AWSConfigConstants.AWS_REGION,
DEFAULT_REGION
)

awsEndpoint.foreach(endpoint =>
clientProperties.putIfAbsent(
AWSConfigConstants.AWS_ENDPOINT,
endpoint
)
)

val stream: String = getFromEither(
pfx,
Seq(
"stream",
"stream.name",
"delivery.stream",
"delivery.stream.name"
),
config.getStringOpt
).getOrElse(
throw new RuntimeException(
s"kinesis stream name required but missing in sink <${sinkConfig.name}> of job <${config.jobName}>"
)
)

val failOnError: Boolean = getFromEither(
pfx,
Seq("failOnError", "fail.on.error"),
config.getBooleanOpt
).getOrElse(DEFAULT_FAIL_ON_ERROR)

val maxInFlightRequests: Int = getFromEither(
pfx,
Seq("maxInFlightRequests", "max.in.flight.requests"),
config.getIntOpt
).getOrElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS)

val maxBufferedRequests: Int =
getFromEither(
pfx,
Seq("maxBufferedRequests", "max.buffered.requests"),
config.getIntOpt
).getOrElse(DEFAULT_MAX_BUFFERED_REQUESTS)

val maxBatchSizeInNumber: Int =
getFromEither(
pfx,
Seq(
"maxBatchSizeInNumber",
"max.batch.size.in.number",
"max.batch.size.number"
),
config.getIntOpt
).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_NUMBER)

val maxBatchSizeInBytes: Long =
getFromEither(
pfx,
Seq(
"maxBatchSizeInBytes",
"max.batch.size.in.bytes",
"max.batch.size.bytes"
),
config.getLongOpt
).getOrElse(DEFAULT_MAX_BATCH_SIZE_IN_BYTES)

val maxBufferTime: Long = getFromEither(
pfx,
Seq("maxBufferTime", "max.buffer.time"),
config.getDurationOpt
)
.map(_.toMillis)
.getOrElse(DEFAULT_MAX_BUFFER_TIME)

val maxRecordSizeInBytes: Option[Long] = getFromEither(
pfx,
Seq(
"maxRecordSizeInBytes",
"maxRecordSize",
"max.record.size",
"max.record.size.in.bytes"
),
config.getLongOpt
)

KinesisProperties(
stream,
clientProperties,
failOnError,
maxBatchSizeInNumber,
maxBatchSizeInBytes,
maxBufferedRequests,
maxBufferTime,
maxInFlightRequests,
maxRecordSizeInBytes
)
}

}
Loading

0 comments on commit 7693447

Please sign in to comment.