Skip to content

Commit

Permalink
implement serdes and upgrade flink to 1.14
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Dec 2, 2021
1 parent 0d2c088 commit 4890d74
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 241 deletions.
23 changes: 12 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath
resolvers += "Confluent Repository" at "https://packages.confluent.io/maven/"

val V = new {
val flink = "1.13.3"
val flink = "1.14.0"
val logback = "1.2.7"
val scalaLogging = "3.9.4"
val scalaTest = "3.2.10"
Expand Down Expand Up @@ -77,16 +77,17 @@ val circeDeps = Seq(
).map(d => "io.circe" %% s"circe-$d" % V.circe)

val otherDeps = Seq(
"io.confluent" % "kafka-schema-registry-client" % V.schemaRegistry,
"com.beachape" %% "enumeratum" % V.enumeratum,
"org.apache.avro" % "avro" % V.avro,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s,
"org.scalactic" %% "scalactic" % V.scalaTest % Test,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck % Test
// "io.confluent" % "kafka-schema-registry-client" % V.schemaRegistry,
"io.confluent" % "kafka-streams-avro-serde" % "7.0.0",
"com.beachape" %% "enumeratum" % V.enumeratum,
// "org.apache.avro" % "avro" % V.avro,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s,
"org.scalactic" %% "scalactic" % V.scalaTest % Test,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck % Test
)

/**
Expand Down
111 changes: 54 additions & 57 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.apache.flink.api.common.serialization.{
SerializationSchema
}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.kafka.sink.KafkaSink
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.{
Expand All @@ -33,17 +35,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, _}
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer,
KafkaDeserializationSchema,
KafkaSerializationSchema
}
import org.apache.flink.streaming.connectors.kinesis.serialization.{
KinesisDeserializationSchema,
KinesisSerializationSchema
}
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema
import org.apache.flink.streaming.connectors.kinesis.{
FlinkKinesisConsumer,
FlinkKinesisProducer
Expand Down Expand Up @@ -247,18 +239,27 @@ class FlinkRunner[ADT <: FlinkEvent](
*/
def fromKafka[E <: ADT: TypeInformation](
srcConfig: KafkaSourceConfig
): DataStream[E] = {
val consumer =
new FlinkKafkaConsumer[E](
srcConfig.topic,
config
.getKafkaDeserializationSchema[E](srcConfig.name)
.asInstanceOf[KafkaDeserializationSchema[E]],
srcConfig.properties
)
): DataStream[E] =
env
.addSource(consumer)
}
.fromSource(
KafkaSource
.builder[E]()
.setProperties(srcConfig.properties)
.setDeserializer(
config
.getKafkaRecordDeserializationSchema[E](
srcConfig.name
)
)
.build(),
srcConfig.watermarkStrategy match {
case "bounded out of order" =>
boundedOutOfOrderWatermarks[E]()
case "ascending timestamps" => ascendingTimestampsWatermarks[E]()
case _ => boundedLatenessWatermarks[E](srcConfig.name)
},
srcConfig.label
)

/**
* Configure stream from kinesis.
Expand All @@ -277,13 +278,11 @@ class FlinkRunner[ADT <: FlinkEvent](
new FlinkKinesisConsumer[E](
srcConfig.stream,
config
.getKinesisDeserializationSchema(srcConfig.name)
.asInstanceOf[KinesisDeserializationSchema[E]],
.getKinesisDeserializationSchema[E](srcConfig.name),
srcConfig.properties
)
env
.addSource(consumer)
.name(srcConfig.label)
}

/**
Expand All @@ -304,8 +303,7 @@ class FlinkRunner[ADT <: FlinkEvent](
case other => other
}
val ds = config
.getDeserializationSchema(srcConfig.name)
.asInstanceOf[DeserializationSchema[E]]
.getDeserializationSchema[E](srcConfig.name)
env
.readTextFile(path)
.name(s"raw:${srcConfig.label}")
Expand Down Expand Up @@ -427,15 +425,22 @@ class FlinkRunner[ADT <: FlinkEvent](
stream: DataStream[E],
sinkName: String = ""
): Object = {
val name = if (sinkName.isEmpty) config.getSinkNames.head else sinkName
config.getSinkConfig(name) match {
case s: KafkaSinkConfig => toKafka[E](stream, s)
case s: KinesisSinkConfig => toKinesis[E](stream, s)
case s: FileSinkConfig => toFile[E](stream, s)
case s: SocketSinkConfig => toSocket[E](stream, s)
case s: JdbcSinkConfig => toJdbc[E](stream, s)
case s: CassandraSinkConfig => toCassandraSink[E](stream, s)
case s: ElasticsearchSinkConfig => toElasticsearchSink[E](stream, s)
val name = if (sinkName.isEmpty) config.getSinkNames.head else sinkName
val sinkConfig = config.getSinkConfig(name)
val label = sinkConfig.label
sinkConfig match {
case s: KafkaSinkConfig =>
toKafka[E](stream, s).uid(label).name(label)
case s: KinesisSinkConfig =>
toKinesis[E](stream, s).uid(label).name(label)
case s: FileSinkConfig => toFile[E](stream, s).uid(label).name(label)
case s: SocketSinkConfig =>
toSocket[E](stream, s).uid(label).name(label)
case s: JdbcSinkConfig => toJdbc[E](stream, s).uid(label).name(label)
case s: CassandraSinkConfig =>
toCassandraSink[E](stream, s).uid(label).name(label)
case s: ElasticsearchSinkConfig =>
toElasticsearchSink[E](stream, s).uid(label).name(label)
case s =>
throw new IllegalArgumentException(
s"unsupported source connector: ${s.connector}"
Expand All @@ -460,18 +465,18 @@ class FlinkRunner[ADT <: FlinkEvent](
sinkConfig: KafkaSinkConfig
): DataStreamSink[E] =
stream
.addSink(
new FlinkKafkaProducer[E](
sinkConfig.topic,
config
.getKafkaSerializationSchema(sinkConfig.name)
.asInstanceOf[KafkaSerializationSchema[E]],
sinkConfig.properties,
Semantic.AT_LEAST_ONCE
)
.sinkTo(
KafkaSink
.builder()
.setKafkaProducerConfig(sinkConfig.properties)
.setRecordSerializer(
config
.getKafkaRecordSerializationSchema[E](
sinkConfig.name
)
)
.build()
)
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to a kinesis sink.
Expand Down Expand Up @@ -503,8 +508,6 @@ class FlinkRunner[ADT <: FlinkEvent](
sink.setDefaultPartition("0")
sink
}
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to a socket sink.
Expand All @@ -531,8 +534,6 @@ class FlinkRunner[ADT <: FlinkEvent](
.asInstanceOf[AddToJdbcBatchFunction[E]]
)
)
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to a rolling file sink.
Expand Down Expand Up @@ -625,7 +626,7 @@ class FlinkRunner[ADT <: FlinkEvent](
s"Unknown file sink encoder format: '$encoderFormat'"
)
}
stream.addSink(sink).uid(sinkConfig.label).name(sinkConfig.label)
stream.addSink(sink)
}

/**
Expand All @@ -652,8 +653,6 @@ class FlinkRunner[ADT <: FlinkEvent](
.getSerializationSchema(sinkConfig.name)
.asInstanceOf[SerializationSchema[E]]
)
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to a cassandra sink.
Expand All @@ -675,8 +674,6 @@ class FlinkRunner[ADT <: FlinkEvent](
.setHost(sinkConfig.host)
.setQuery(sinkConfig.query)
.build()
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to an elasticsearch sink.
Expand Down Expand Up @@ -723,7 +720,7 @@ class FlinkRunner[ADT <: FlinkEvent](
indexer.add(req)
}
).build()
stream.addSink(esSink).uid(sinkConfig.label).name(sinkConfig.label)
stream.addSink(esSink)
}

}
10 changes: 10 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import org.apache.flink.api.common.serialization.{
Encoder,
SerializationSchema
}
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
import org.apache.flink.streaming.connectors.kafka.{
KafkaDeserializationSchema,
Expand Down Expand Up @@ -40,6 +42,14 @@ trait FlinkRunnerFactory[ADT <: FlinkEvent] {
config: FlinkConfig[ADT]): KafkaDeserializationSchema[E] =
???

def getKafkaRecordSerializationSchema[E <: ADT](
name: String,
config: FlinkConfig[ADT]): KafkaRecordSerializationSchema[E] = ???

def getKafkaRecordDeserializationSchema[E <: ADT](
name: String,
config: FlinkConfig[ADT]): KafkaRecordDeserializationSchema[E] = ???

def getKinesisDeserializationSchema[E <: ADT](
name: String,
config: FlinkConfig[ADT]): KinesisDeserializationSchema[E] = ???
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class StableBloomFilter[T](
)

/** number of cells per unit storage */
val storedCells: Int = Math.floor(STORAGE_BITS / d).toInt
val storedCells: Int = STORAGE_BITS / d

/** number of bits used per unit storage */
val storedBits: Int = storedCells * d
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import io.epiphanous.flinkrunner.model.{FlinkConfig, FlinkEvent}
import io.epiphanous.flinkrunner.util.StringUtils
import org.apache.avro.Schema.Parser
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.concurrent.Executors.directExecutionContext
import org.apache.flink.util.concurrent.Executors
import org.http4s.EntityDecoder
import org.http4s.circe.jsonOf
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.{Failure, Success, Try}

class ConfluentSchemaRegistryClient[ADT <: FlinkEvent: TypeInformation](
Expand All @@ -37,7 +37,8 @@ class ConfluentSchemaRegistryClient[ADT <: FlinkEvent: TypeInformation](
jsonOf[IO, ConfluentSchemaRegistryResponse]

@transient
lazy implicit val ec: ExecutionContext = directExecutionContext()
lazy implicit val ec: ExecutionContextExecutor =
ExecutionContext.fromExecutor(Executors.directExecutor())

@transient
lazy implicit val cs: ContextShift[IO] = IO.contextShift(ec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.{FlinkRunnerFactory, SEE}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

Expand Down Expand Up @@ -151,6 +152,10 @@ class FlinkConfig[ADT <: FlinkEvent](
def getKafkaDeserializationSchema[E <: ADT](name: String) =
factory.getKafkaDeserializationSchema[E](name, this)

def getKafkaRecordDeserializationSchema[E <: ADT](
name: String): KafkaRecordDeserializationSchema[E] =
factory.getKafkaRecordDeserializationSchema[E](name, this)

def getKinesisDeserializationSchema[E <: ADT](name: String) =
factory.getKinesisDeserializationSchema[E](name, this)

Expand All @@ -160,6 +165,9 @@ class FlinkConfig[ADT <: FlinkEvent](
def getKafkaSerializationSchema[E <: ADT](name: String) =
factory.getKafkaSerializationSchema[E](name, this)

def getKafkaRecordSerializationSchema[E <: ADT](name: String) =
factory.getKafkaRecordSerializationSchema[E](name, this)

def getKinesisSerializationSchema[E <: ADT](name: String) =
factory.getKinesisSerializationSchema[E](name, this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.typesafe.scalalogging.LazyLogging
import io.circe.Decoder
import io.epiphanous.flinkrunner.model.{FlinkConfig, FlinkEvent}
import org.apache.flink.runtime.concurrent.Executors.directExecutionContext
import org.apache.flink.util.concurrent.Executors
import org.apache.flink.streaming.api.scala.async.{
AsyncFunction,
ResultFuture
Expand Down Expand Up @@ -75,7 +75,8 @@ abstract class EnrichmentAsyncFunction[
lazy implicit val entityDecoder: EntityDecoder[IO, CV] = jsonOf[IO, CV]

@transient
lazy implicit val ec: ExecutionContext = directExecutionContext()
lazy implicit val ec: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())

@transient
lazy implicit val cs: ContextShift[IO] = IO.contextShift(ec)
Expand Down
Loading

0 comments on commit 4890d74

Please sign in to comment.