From 74a45640631d0dfa18247c47b1cf74281a87a74a Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Mon, 28 Sep 2020 22:14:32 -0400 Subject: [PATCH] add isKeyed to KafkaSinkConfig --- .../scala/io/epiphanous/flinkrunner/model/SinkConfig.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala index b702424..f168007 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala @@ -18,7 +18,11 @@ object SinkConfig { case Some(connector) => connector match { case Kafka => - KafkaSinkConfig(connector, name, config.getString(s"$p.topic"), config.getProperties(s"$p.config")) + KafkaSinkConfig(connector, + name, + config.getString(s"$p.topic"), + config.getBoolean(s"$p.isKeyed"), + config.getProperties(s"$p.config")) case Kinesis => KinesisSinkConfig(connector, name, config.getString(s"$p.stream"), config.getProperties(s"$p.config")) case File => @@ -57,6 +61,7 @@ final case class KafkaSinkConfig( connector: FlinkConnectorName = Kafka, name: String, topic: String, + isKeyed: Boolean, properties: Properties) extends SinkConfig final case class KinesisSinkConfig(