From 465faa6f5693b69b2358ddb37646dffe45fdaf15 Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Thu, 6 Oct 2022 10:25:59 -0400 Subject: [PATCH] Bug/fix schema reg config (#43) * fix order of schema registry config args * readme and ignore updates * refactor SchemaRegistryConfig construction Co-authored-by: Robert Lyons --- .gitignore | 1 + README.md | 41 +++++++++---------- .../model/SchemaRegistryConfig.scala | 25 +++++++++++ .../model/sink/KafkaSinkConfig.scala | 22 ++-------- .../model/source/KafkaSourceConfig.scala | 23 ++--------- 5 files changed, 53 insertions(+), 59 deletions(-) diff --git a/.gitignore b/.gitignore index 4defa17..a803075 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ project/target *.log *.log_* log.file_IS_UNDEFINED* +.tool-versions diff --git a/README.md b/README.md index a540376..db25d58 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,7 @@ build -Coverage Status +Coverage Status
@@ -258,7 +257,7 @@ Next, write some jobs! This is the fun part. ``` class MyJob1(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventA](runner) { - override def transform:DataStream[MyEventA] = + override def transform:DataStream[MyEventA] = singleSource[MyEventA]() } @@ -296,9 +295,9 @@ Next, wire up your runner to a main method. ``` object Main { - def main(args:Array[String]) = + def main(args:Array[String]) = new MyRunner(new FlinkConfig(args)) - } + } ``` Finally, assemble an uber jar with all your dependencies, deploy it to your flink cluster, @@ -318,7 +317,7 @@ flink jobs. If your output event types require avro support, you should instead ``` class StreamJob[ - OUT <: ADT, + OUT <: ADT, ADT <: FlinkEvent](runner:FlinkRunner[ADT]) ``` @@ -426,8 +425,8 @@ graph defined by your `transform` method. ``` class AvroStreamJob[ - OUT <: ADT with EmbeddedAvroRecord[A], - A<:GenericRecord, + OUT <: ADT with EmbeddedAvroRecord[A], + A<:GenericRecord, ADT <: FlinkEvent](runner:FlinkRunner[ADT]) ``` @@ -440,7 +439,7 @@ encoded sink (kafka or parquet-avro files). trait EmbeddedAvroRecord[A <: GenericRecord { def $recordKey: Option[String] = None - + def $record: A def $recordHeaders: Map[String, String] = Map.empty @@ -532,36 +531,36 @@ trait SinkConfig[ADT <: FlinkEvent] ``` trait Aggregate { - + def name:String - + // the kind of measurement being aggregated def dimension: String - + // the preferred unit of measurements being aggregated def unit: String - + // the current aggregated value def value: Double - + // the current count of measurements included in this aggregate def count: BigInt - + // the timestamp of the most recent aggregated event def aggregatedLastUpdated: Instant - + // the timestamp when this aggregate was last updated def lastUpdated: Instant - + // other aggregations this aggregation depends on def dependentAggregations: Map[String, Aggregate] - + // configuration parameters def params: Map[String, String] - + // a method to update the aggregate with a new value - def update(value: Double, unit: String, - aggLU: Instant): Try[Aggregate] + def update(value: Double, unit: String, + aggLU: Instant): Try[Aggregate] } ``` diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala index add9f6a..273b244 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala @@ -1,11 +1,15 @@ package io.epiphanous.flinkrunner.model +import com.typesafe.config.ConfigObject import io.confluent.kafka.schemaregistry.client.{ CachedSchemaRegistryClient, SchemaRegistryClient } +import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject +import io.epiphanous.flinkrunner.util.StreamUtils.RichProps import java.util +import scala.util.Try case class SchemaRegistryConfig( url: String = "http://schema-registry:8082", @@ -24,3 +28,24 @@ case class SchemaRegistryConfig( ) } } +object SchemaRegistryConfig { + def apply(configOpt: Option[ConfigObject]): SchemaRegistryConfig = + configOpt + .map { o => + val c = o.toConfig + val url = c.getString("url") + val cacheCapacity = + Try(c.getInt("cache.capacity")).toOption.getOrElse(1000) + val headers = + Try(c.getObject("headers")).toOption.asProperties.asJavaMap + val props = + Try(c.getObject("props")).toOption.asProperties.asJavaMap + SchemaRegistryConfig( + url = url, + cacheCapacity = cacheCapacity, + headers = headers, + props = props + ) + } + .getOrElse(SchemaRegistryConfig()) +} diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala index 7d655ad..0b82516 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala @@ -60,7 +60,7 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation]( DeliveryGuarantee.AT_LEAST_ONCE case Some("none") => DeliveryGuarantee.NONE - case _ => DeliveryGuarantee.EXACTLY_ONCE + case _ => DeliveryGuarantee.AT_LEAST_ONCE } /** ensure transaction.timeout.ms is set */ @@ -70,26 +70,10 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation]( t.toLong } - val schemaRegistryConfig: SchemaRegistryConfig = + val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig( config .getObjectOption(pfx("schema.registry")) - .map { o => - val c = o.toConfig - val url = c.getString("url") - val cacheCapacity = - Try(c.getInt("cache.capacity")).toOption.getOrElse(1000) - val headers = - Try(c.getObject("headers")).toOption.asProperties.asJavaMap - val props = - Try(c.getObject("props")).toOption.asProperties.asJavaMap - SchemaRegistryConfig( - url, - cacheCapacity, - props, - headers - ) - } - .getOrElse(SchemaRegistryConfig()) + ) /** Return an confluent avro serialization schema */ def getAvroSerializationSchema[ diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala index f8f9629..0f6d87d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala @@ -126,25 +126,10 @@ case class KafkaSourceConfig[ADT <: FlinkEvent]( .getStringOpt(pfx("group.id")) .getOrElse(s"${config.jobName}.$name") - val schemaRegistryConfig: SchemaRegistryConfig = - getFromEither(pfx(), Seq("schema.registry"), config.getObjectOption) - .map { o => - val c = o.toConfig - val url = c.getString("url") - val cacheCapacity = - Try(c.getInt("cache.capacity")).toOption.getOrElse(1000) - val headers = - Try(c.getObject("headers")).toOption.asProperties.asJavaMap - val props = - Try(c.getObject("props")).toOption.asProperties.asJavaMap - SchemaRegistryConfig( - url, - cacheCapacity, - props, - headers - ) - } - .getOrElse(SchemaRegistryConfig()) + val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig( + config + .getObjectOption(pfx("schema.registry")) + ) /** Returns a confluent avro registry aware deserialization schema for * kafka.