diff --git a/.gitignore b/.gitignore
index 4defa17d..a803075c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@ project/target
*.log
*.log_*
log.file_IS_UNDEFINED*
+.tool-versions
diff --git a/README.md b/README.md
index a540376c..db25d58a 100644
--- a/README.md
+++ b/README.md
@@ -27,8 +27,7 @@
-
+
@@ -258,7 +257,7 @@ Next, write some jobs! This is the fun part.
```
class MyJob1(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventA](runner) {
- override def transform:DataStream[MyEventA] =
+ override def transform:DataStream[MyEventA] =
singleSource[MyEventA]()
}
@@ -296,9 +295,9 @@ Next, wire up your runner to a main method.
```
object Main {
- def main(args:Array[String]) =
+ def main(args:Array[String]) =
new MyRunner(new FlinkConfig(args))
- }
+ }
```
Finally, assemble an uber jar with all your dependencies, deploy it to your flink cluster,
@@ -318,7 +317,7 @@ flink jobs. If your output event types require avro support, you should instead
```
class StreamJob[
- OUT <: ADT,
+ OUT <: ADT,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
```
@@ -426,8 +425,8 @@ graph defined by your `transform` method.
```
class AvroStreamJob[
- OUT <: ADT with EmbeddedAvroRecord[A],
- A<:GenericRecord,
+ OUT <: ADT with EmbeddedAvroRecord[A],
+ A<:GenericRecord,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
```
@@ -440,7 +439,7 @@ encoded sink (kafka or parquet-avro files).
trait EmbeddedAvroRecord[A <: GenericRecord {
def $recordKey: Option[String] = None
-
+
def $record: A
def $recordHeaders: Map[String, String] = Map.empty
@@ -532,36 +531,36 @@ trait SinkConfig[ADT <: FlinkEvent]
```
trait Aggregate {
-
+
def name:String
-
+
// the kind of measurement being aggregated
def dimension: String
-
+
// the preferred unit of measurements being aggregated
def unit: String
-
+
// the current aggregated value
def value: Double
-
+
// the current count of measurements included in this aggregate
def count: BigInt
-
+
// the timestamp of the most recent aggregated event
def aggregatedLastUpdated: Instant
-
+
// the timestamp when this aggregate was last updated
def lastUpdated: Instant
-
+
// other aggregations this aggregation depends on
def dependentAggregations: Map[String, Aggregate]
-
+
// configuration parameters
def params: Map[String, String]
-
+
// a method to update the aggregate with a new value
- def update(value: Double, unit: String,
- aggLU: Instant): Try[Aggregate]
+ def update(value: Double, unit: String,
+ aggLU: Instant): Try[Aggregate]
}
```
diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala
index add9f6a7..273b2449 100644
--- a/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala
+++ b/src/main/scala/io/epiphanous/flinkrunner/model/SchemaRegistryConfig.scala
@@ -1,11 +1,15 @@
package io.epiphanous.flinkrunner.model
+import com.typesafe.config.ConfigObject
import io.confluent.kafka.schemaregistry.client.{
CachedSchemaRegistryClient,
SchemaRegistryClient
}
+import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
+import io.epiphanous.flinkrunner.util.StreamUtils.RichProps
import java.util
+import scala.util.Try
case class SchemaRegistryConfig(
url: String = "http://schema-registry:8082",
@@ -24,3 +28,24 @@ case class SchemaRegistryConfig(
)
}
}
+object SchemaRegistryConfig {
+ def apply(configOpt: Option[ConfigObject]): SchemaRegistryConfig =
+ configOpt
+ .map { o =>
+ val c = o.toConfig
+ val url = c.getString("url")
+ val cacheCapacity =
+ Try(c.getInt("cache.capacity")).toOption.getOrElse(1000)
+ val headers =
+ Try(c.getObject("headers")).toOption.asProperties.asJavaMap
+ val props =
+ Try(c.getObject("props")).toOption.asProperties.asJavaMap
+ SchemaRegistryConfig(
+ url = url,
+ cacheCapacity = cacheCapacity,
+ headers = headers,
+ props = props
+ )
+ }
+ .getOrElse(SchemaRegistryConfig())
+}
diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala
index 7d655ad6..0b82516e 100644
--- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala
+++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/KafkaSinkConfig.scala
@@ -60,7 +60,7 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
DeliveryGuarantee.AT_LEAST_ONCE
case Some("none") =>
DeliveryGuarantee.NONE
- case _ => DeliveryGuarantee.EXACTLY_ONCE
+ case _ => DeliveryGuarantee.AT_LEAST_ONCE
}
/** ensure transaction.timeout.ms is set */
@@ -70,26 +70,10 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
t.toLong
}
- val schemaRegistryConfig: SchemaRegistryConfig =
+ val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
config
.getObjectOption(pfx("schema.registry"))
- .map { o =>
- val c = o.toConfig
- val url = c.getString("url")
- val cacheCapacity =
- Try(c.getInt("cache.capacity")).toOption.getOrElse(1000)
- val headers =
- Try(c.getObject("headers")).toOption.asProperties.asJavaMap
- val props =
- Try(c.getObject("props")).toOption.asProperties.asJavaMap
- SchemaRegistryConfig(
- url,
- cacheCapacity,
- props,
- headers
- )
- }
- .getOrElse(SchemaRegistryConfig())
+ )
/** Return an confluent avro serialization schema */
def getAvroSerializationSchema[
diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala
index f8f96291..0f6d87d7 100644
--- a/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala
+++ b/src/main/scala/io/epiphanous/flinkrunner/model/source/KafkaSourceConfig.scala
@@ -126,25 +126,10 @@ case class KafkaSourceConfig[ADT <: FlinkEvent](
.getStringOpt(pfx("group.id"))
.getOrElse(s"${config.jobName}.$name")
- val schemaRegistryConfig: SchemaRegistryConfig =
- getFromEither(pfx(), Seq("schema.registry"), config.getObjectOption)
- .map { o =>
- val c = o.toConfig
- val url = c.getString("url")
- val cacheCapacity =
- Try(c.getInt("cache.capacity")).toOption.getOrElse(1000)
- val headers =
- Try(c.getObject("headers")).toOption.asProperties.asJavaMap
- val props =
- Try(c.getObject("props")).toOption.asProperties.asJavaMap
- SchemaRegistryConfig(
- url,
- cacheCapacity,
- props,
- headers
- )
- }
- .getOrElse(SchemaRegistryConfig())
+ val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
+ config
+ .getObjectOption(pfx("schema.registry"))
+ )
/** Returns a confluent avro registry aware deserialization schema for
* kafka.