Skip to content

Commit

Permalink
Bug/fix schema reg config (#43)
Browse files Browse the repository at this point in the history
* fix order of schema registry config args

* readme and ignore updates

* refactor SchemaRegistryConfig construction

Co-authored-by: Robert Lyons <nextdude@gmail.com>
  • Loading branch information
nextdude-mdsol and nextdude committed Oct 6, 2022
1 parent 0139751 commit 465faa6
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -6,3 +6,4 @@ project/target
*.log
*.log_*
log.file_IS_UNDEFINED*
.tool-versions
41 changes: 20 additions & 21 deletions README.md
Expand Up @@ -27,8 +27,7 @@
<img src="https://img.shields.io/travis/com/epiphanous/flinkrunner.svg" alt="build" />
</a>
<!-- coverage -->
<a href='https://coveralls.io/github/epiphanous/flinkrunner?branch=main'><img
src='https://coveralls.io/repos/github/epiphanous/flinkrunner/badge.svg?branch=master' alt='Coverage Status' /></a>
<a href='https://coveralls.io/github/epiphanous/flinkrunner?branch=main'><img src='https://coveralls.io/repos/github/epiphanous/flinkrunner/badge.svg?branch=main' alt='Coverage Status' /></a>
</div>

<div align="center">
Expand Down Expand Up @@ -258,7 +257,7 @@ Next, write some jobs! This is the fun part.
```
class MyJob1(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventA](runner) {
override def transform:DataStream[MyEventA] =
override def transform:DataStream[MyEventA] =
singleSource[MyEventA]()
}
Expand Down Expand Up @@ -296,9 +295,9 @@ Next, wire up your runner to a main method.

```
object Main {
def main(args:Array[String]) =
def main(args:Array[String]) =
new MyRunner(new FlinkConfig(args))
}
}
```

Finally, assemble an uber jar with all your dependencies, deploy it to your flink cluster,
Expand All @@ -318,7 +317,7 @@ flink jobs. If your output event types require avro support, you should instead

```
class StreamJob[
OUT <: ADT,
OUT <: ADT,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
```

Expand Down Expand Up @@ -426,8 +425,8 @@ graph defined by your `transform` method.

```
class AvroStreamJob[
OUT <: ADT with EmbeddedAvroRecord[A],
A<:GenericRecord,
OUT <: ADT with EmbeddedAvroRecord[A],
A<:GenericRecord,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
```

Expand All @@ -440,7 +439,7 @@ encoded sink (kafka or parquet-avro files).
trait EmbeddedAvroRecord[A <: GenericRecord {
def $recordKey: Option[String] = None
def $record: A
def $recordHeaders: Map[String, String] = Map.empty
Expand Down Expand Up @@ -532,36 +531,36 @@ trait SinkConfig[ADT <: FlinkEvent]

```
trait Aggregate {
def name:String
// the kind of measurement being aggregated
def dimension: String
// the preferred unit of measurements being aggregated
def unit: String
// the current aggregated value
def value: Double
// the current count of measurements included in this aggregate
def count: BigInt
// the timestamp of the most recent aggregated event
def aggregatedLastUpdated: Instant
// the timestamp when this aggregate was last updated
def lastUpdated: Instant
// other aggregations this aggregation depends on
def dependentAggregations: Map[String, Aggregate]
// configuration parameters
def params: Map[String, String]
// a method to update the aggregate with a new value
def update(value: Double, unit: String,
aggLU: Instant): Try[Aggregate]
def update(value: Double, unit: String,
aggLU: Instant): Try[Aggregate]
}
```

Expand Down
@@ -1,11 +1,15 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.ConfigObject
import io.confluent.kafka.schemaregistry.client.{
CachedSchemaRegistryClient,
SchemaRegistryClient
}
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps

import java.util
import scala.util.Try

case class SchemaRegistryConfig(
url: String = "http://schema-registry:8082",
Expand All @@ -24,3 +28,24 @@ case class SchemaRegistryConfig(
)
}
}
object SchemaRegistryConfig {
def apply(configOpt: Option[ConfigObject]): SchemaRegistryConfig =
configOpt
.map { o =>
val c = o.toConfig
val url = c.getString("url")
val cacheCapacity =
Try(c.getInt("cache.capacity")).toOption.getOrElse(1000)
val headers =
Try(c.getObject("headers")).toOption.asProperties.asJavaMap
val props =
Try(c.getObject("props")).toOption.asProperties.asJavaMap
SchemaRegistryConfig(
url = url,
cacheCapacity = cacheCapacity,
headers = headers,
props = props
)
}
.getOrElse(SchemaRegistryConfig())
}
Expand Up @@ -60,7 +60,7 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
DeliveryGuarantee.AT_LEAST_ONCE
case Some("none") =>
DeliveryGuarantee.NONE
case _ => DeliveryGuarantee.EXACTLY_ONCE
case _ => DeliveryGuarantee.AT_LEAST_ONCE
}

/** ensure transaction.timeout.ms is set */
Expand All @@ -70,26 +70,10 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
t.toLong
}

val schemaRegistryConfig: SchemaRegistryConfig =
val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
config
.getObjectOption(pfx("schema.registry"))
.map { o =>
val c = o.toConfig
val url = c.getString("url")
val cacheCapacity =
Try(c.getInt("cache.capacity")).toOption.getOrElse(1000)
val headers =
Try(c.getObject("headers")).toOption.asProperties.asJavaMap
val props =
Try(c.getObject("props")).toOption.asProperties.asJavaMap
SchemaRegistryConfig(
url,
cacheCapacity,
props,
headers
)
}
.getOrElse(SchemaRegistryConfig())
)

/** Return an confluent avro serialization schema */
def getAvroSerializationSchema[
Expand Down
Expand Up @@ -126,25 +126,10 @@ case class KafkaSourceConfig[ADT <: FlinkEvent](
.getStringOpt(pfx("group.id"))
.getOrElse(s"${config.jobName}.$name")

val schemaRegistryConfig: SchemaRegistryConfig =
getFromEither(pfx(), Seq("schema.registry"), config.getObjectOption)
.map { o =>
val c = o.toConfig
val url = c.getString("url")
val cacheCapacity =
Try(c.getInt("cache.capacity")).toOption.getOrElse(1000)
val headers =
Try(c.getObject("headers")).toOption.asProperties.asJavaMap
val props =
Try(c.getObject("props")).toOption.asProperties.asJavaMap
SchemaRegistryConfig(
url,
cacheCapacity,
props,
headers
)
}
.getOrElse(SchemaRegistryConfig())
val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
config
.getObjectOption(pfx("schema.registry"))
)

/** Returns a confluent avro registry aware deserialization schema for
* kafka.
Expand Down

0 comments on commit 465faa6

Please sign in to comment.