Skip to content

Commit

Permalink
begin refactor of confluent avro support
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Nov 30, 2021
1 parent 6d12b3a commit ca1edc8
Show file tree
Hide file tree
Showing 12 changed files with 423 additions and 93 deletions.
77 changes: 41 additions & 36 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,64 +24,69 @@ inThisBuild(
Test / parallelExecution := false
Test / fork := true
resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"
resolvers += "Confluent Repository" at "https://packages.confluent.io/maven/"

val V = new {
val flink = "1.13.2"
val logback = "1.2.6"
val flink = "1.13.3"
val logback = "1.2.7"
val scalaLogging = "3.9.4"
val scalaTest = "3.2.10"
val scalaCheck = "1.15.4"
val circe = "0.14.1"
val http4s = "0.21.29"
val enumeratum = "1.7.0"
val typesafeConfig = "1.4.1"
val guava = "29.0-jre"
val guava = "31.0.1-jre" //"29.0-jre"
val squants = "1.8.3"
val avro = "1.10.2"
val avro = "1.11.0"
val avro4s = "4.0.11"
val schemaRegistry = "7.0.0"
}

val flinkDeps =
Seq("scala", "streaming-scala", "cep-scala").map(a =>
"org.apache.flink" %% s"flink-$a" % V.flink % Provided
) ++
Seq(
"connector-kafka",
"connector-kinesis",
"connector-cassandra",
"connector-elasticsearch7",
"statebackend-rocksdb"
).map(a => "org.apache.flink" %% s"flink-$a" % V.flink) ++
Seq("org.apache.flink" %% "flink-test-utils" % V.flink % Test)
val flinkDeps =
Seq(
"org.apache.flink" %% s"flink-scala" % V.flink % Provided,
"org.apache.flink" %% s"flink-streaming-scala" % V.flink % Provided,
"org.apache.flink" %% s"flink-cep-scala" % V.flink % Provided,
"org.apache.flink" %% s"flink-connector-kafka" % V.flink,
"org.apache.flink" %% s"flink-connector-kinesis" % V.flink,
"org.apache.flink" %% s"flink-connector-cassandra" % V.flink,
"org.apache.flink" %% s"flink-connector-elasticsearch7" % V.flink,
"org.apache.flink" %% s"flink-statebackend-rocksdb" % V.flink,
"org.apache.flink" % s"flink-avro-confluent-registry" % V.flink,
"org.apache.flink" %% s"flink-test-utils" % V.flink % Test
)

val loggingDeps = Seq(
"ch.qos.logback" % "logback-classic" % V.logback % Provided,
"com.typesafe.scala-logging" %% "scala-logging" % V.scalaLogging
)

val http4sDeps =
Seq("http4s-dsl", "http4s-client", "http4s-blaze-client", "http4s-circe")
.map("org.http4s" %% _ % V.http4s)
val http4sDeps = Seq(
"dsl",
"client",
"blaze-client",
"circe"
).map(d => "org.http4s" %% s"http4s-$d" % V.http4s)

val circeDeps = Seq(
"circe-core",
"circe-generic",
"circe-generic-extras",
"circe-parser"
).map(
"io.circe" %% _ % V.circe
)
"core",
"generic",
"generic-extras",
"parser"
).map(d => "io.circe" %% s"circe-$d" % V.circe)

val otherDeps = Seq(
"com.beachape" %% "enumeratum" % V.enumeratum,
"org.apache.avro" % "avro" % V.avro,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s,
"org.scalactic" %% "scalactic" % V.scalaTest % Test,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck % Test
val otherDeps = Seq(
"io.confluent" % "kafka-schema-registry-client" % V.schemaRegistry,
"com.beachape" %% "enumeratum" % V.enumeratum,
"org.apache.avro" % "avro" % V.avro,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s,
"org.scalactic" %% "scalactic" % V.scalaTest % Test,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck % Test
)

/**
Expand Down
60 changes: 16 additions & 44 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,42 +72,28 @@ class FlinkRunner[ADT <: FlinkEvent](
val env: SEE = config.configureStreamExecutionEnvironment

/**
* An intermediate method to process main args, with optional callback to
* capture output of flink job.
*
* @param callback
* a function from an iterator to unit
*/
def process(
callback: PartialFunction[List[_], Unit] = { case _ =>
()
}
): Unit =
if (config.jobName == "help") showHelp()
else process1(callback)

/**
* Actually invoke the job based on the job name and arguments passed in.
* If the job run returns an iterator of results, pass those results to
* the callback. Otherwise, just return. The callback is for testing the
* Invoke a job based on the job name and arguments passed in. If the job
* run returns an iterator of results, pass those results to the
* callback. Otherwise, just return. The callback is for testing the
* stream of results from a flink job. It will only be invoked if
* --mock.edges option is on.
*
* @param callback
* a function from a stream to unit that receives results from running
* flink job
*/
def process1(
def process(
callback: PartialFunction[List[_], Unit] = { case _ =>
()
}
): Unit = {
if (
if (config.jobName == "help") showHelp()
else if (
config.jobArgs.headOption
.exists(s => List("help", "--help", "-help", "-h").contains(s))
) showJobHelp()
else {
factory.getJobInstance(config.jobName, config).run() match {
factory.getJobInstance(config.jobName, this).run() match {
case Left(results) => callback(results)
case Right(_) => ()
}
Expand Down Expand Up @@ -185,7 +171,7 @@ class FlinkRunner[ADT <: FlinkEvent](
* @return
* BoundedLatenessGenerator[E]
*/
def boundedOutofOrdernessWatermarks[E <: ADT: TypeInformation]()
def boundedOutOfOrderWatermarks[E <: ADT: TypeInformation]()
: WatermarkStrategy[E] =
WatermarkStrategy
.forBoundedOutOfOrderness(config.maxLateness)
Expand All @@ -205,8 +191,6 @@ class FlinkRunner[ADT <: FlinkEvent](
* Assign timestamps/watermarks if we're using event time
* @param in
* the input stream to watermark
* @param env
* implicit stream execution environment
* @tparam E
* event type
* @return
Expand All @@ -217,10 +201,10 @@ class FlinkRunner[ADT <: FlinkEvent](
srcConfig: SourceConfig
): DataStream[E] =
in.assignTimestampsAndWatermarks(srcConfig.watermarkStrategy match {
case "bounded out of orderness" =>
boundedOutofOrdernessWatermarks()
case "ascending timestamps" => ascendingTimestampsWatermarks()
case _ => boundedLatenessWatermarks(in.name)
case "bounded out of order" =>
boundedOutOfOrderWatermarks()
case "ascending timestamps" => ascendingTimestampsWatermarks()
case _ => boundedLatenessWatermarks(in.name)
}).name(s"wm:${in.name}")
.uid(s"wm:${in.name}")

Expand Down Expand Up @@ -406,7 +390,7 @@ class FlinkRunner[ADT <: FlinkEvent](
file.getAbsolutePath
}

val runner = this
val runner: FlinkRunner[ADT] = this

implicit class EventStreamOps[E <: ADT: TypeInformation](
stream: DataStream[E]) {
Expand All @@ -422,7 +406,7 @@ class FlinkRunner[ADT <: FlinkEvent](
.uid(s"cast types $name")
}

def toSink(sinkName: String = "") =
def toSink(sinkName: String = ""): Object =
runner.toSink[E](stream, sinkName)

}
Expand All @@ -434,8 +418,6 @@ class FlinkRunner[ADT <: FlinkEvent](
* the data stream to send to sink
* @param sinkName
* a sink name to obtain configuration
* @param config
* implicit flink job args
* @tparam E
* stream element type
* @return
Expand All @@ -444,7 +426,7 @@ class FlinkRunner[ADT <: FlinkEvent](
def toSink[E <: ADT: TypeInformation](
stream: DataStream[E],
sinkName: String = ""
) = {
): Object = {
val name = if (sinkName.isEmpty) config.getSinkNames.head else sinkName
config.getSinkConfig(name) match {
case s: KafkaSinkConfig => toKafka[E](stream, s)
Expand All @@ -468,8 +450,6 @@ class FlinkRunner[ADT <: FlinkEvent](
* the data stream
* @param sinkConfig
* a sink configuration
* @param config
* implicit job args
* @tparam E
* stream element type
* @return
Expand Down Expand Up @@ -500,8 +480,6 @@ class FlinkRunner[ADT <: FlinkEvent](
* the data stream
* @param sinkConfig
* a sink configuration
* @param config
* implicit job args
* @tparam E
* stream element type
* @return
Expand Down Expand Up @@ -535,8 +513,6 @@ class FlinkRunner[ADT <: FlinkEvent](
* the data stream
* @param sinkConfig
* a sink configuration
* @param config
* implicit job args
* @tparam E
* stream element type
* @return
Expand Down Expand Up @@ -565,8 +541,6 @@ class FlinkRunner[ADT <: FlinkEvent](
* the data stream
* @param sinkConfig
* a sink configuration
* @param config
* implicit job args
* @tparam E
* stream element type
* @return
Expand Down Expand Up @@ -661,8 +635,6 @@ class FlinkRunner[ADT <: FlinkEvent](
* the data stream
* @param sinkConfig
* a sink configuration
* @param config
* implicit job args
* @tparam E
* stream element type
* @return
Expand Down Expand Up @@ -697,7 +669,7 @@ class FlinkRunner[ADT <: FlinkEvent](
*/
def toCassandraSink[E <: ADT: TypeInformation](
stream: DataStream[E],
sinkConfig: CassandraSinkConfig) =
sinkConfig: CassandraSinkConfig): CassandraSink[E] =
CassandraSink
.addSink(stream)
.setHost(sinkConfig.host)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ trait FlinkRunnerFactory[ADT <: FlinkEvent] {
optConfig: Option[String] = None) =
new FlinkConfig[ADT](args, this, sources, optConfig)

def getJobInstance(
def getJobInstance[DS, OUT <: ADT](
name: String,
config: FlinkConfig[ADT]): BaseFlinkJob[_, _, ADT]
runner: FlinkRunner[ADT]): BaseFlinkJob[DS, OUT, ADT]

def getDeserializationSchema[E <: ADT](
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,17 @@ class ConfluentSchemaRegistryClient[ADT <: FlinkEvent: TypeInformation](
*/
protected def getSubjectName[E](
event: E,
optContext: Option[ConfluentSchemaRegistryContext] = None): String =
(event.getClass.getCanonicalName.split("\\.")
:+ (if (optContext.getOrElse(ConfluentSchemaRegistryContext()).isKey)
"key"
else "value"))
.map(snakify)
.map(name => clean(name, replacement = "_"))
.mkString("_")
optContext: Option[ConfluentSchemaRegistryContext] = None)
: String = {
val keyOrValue =
if (optContext.getOrElse(ConfluentSchemaRegistryContext()).isKey)
"key"
else "value"
val subjectName = config.getString(
s"schema.registry.${event.getClass.getCanonicalName}"
)
s"$subjectName-$keyOrValue"
}

/**
* Retrieve a schema based on its id or subject, and optionally, some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ case class RegisteredAvroSchema(
*/
def decode[E: Decoder](bytes: Array[Byte]): Try[E] = {
Try(
AvroInputStream
.binary[E]
AvroInputStream.binary
.from(bytes)
.build(schema)
.iterator
Expand All @@ -55,7 +54,7 @@ case class RegisteredAvroSchema(
magic: Array[Byte] = Array.emptyByteArray): Try[Array[Byte]] =
Try {
val baos = new ByteArrayOutputStream()
val os = AvroOutputStream.binary[E].to(baos).build()
val os = AvroOutputStream.binary.to(baos).build()
os.write(event)
os.flush()
os.close()
Expand Down
Loading

0 comments on commit ca1edc8

Please sign in to comment.