Skip to content

Commit

Permalink
add isKeyed to KafkaSinkConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Sep 29, 2020
1 parent 3ed3aec commit 74a4564
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ object SinkConfig {
case Some(connector) =>
connector match {
case Kafka =>
KafkaSinkConfig(connector, name, config.getString(s"$p.topic"), config.getProperties(s"$p.config"))
KafkaSinkConfig(connector,
name,
config.getString(s"$p.topic"),
config.getBoolean(s"$p.isKeyed"),
config.getProperties(s"$p.config"))
case Kinesis =>
KinesisSinkConfig(connector, name, config.getString(s"$p.stream"), config.getProperties(s"$p.config"))
case File =>
Expand Down Expand Up @@ -57,6 +61,7 @@ final case class KafkaSinkConfig(
connector: FlinkConnectorName = Kafka,
name: String,
topic: String,
isKeyed: Boolean,
properties: Properties)
extends SinkConfig
final case class KinesisSinkConfig(
Expand Down

0 comments on commit 74a4564

Please sign in to comment.