-
Notifications
You must be signed in to change notification settings - Fork 4
/
KafkaSinkConfig.scala
112 lines (98 loc) · 3.57 KB
/
KafkaSinkConfig.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package io.epiphanous.flinkrunner.model.sink
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.serde.{
ConfluentAvroRegistryKafkaRecordSerializationSchema,
JsonKafkaRecordSerializationSchema
}
import io.epiphanous.flinkrunner.util.ConfigToProps
import io.epiphanous.flinkrunner.util.ConfigToProps._
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{
KafkaRecordSerializationSchema,
KafkaSink
}
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
import java.util.Properties
import scala.util.Try
/** Kafka sink config.
*
* Configuration:
*
* @param name
* name of the sink
* @param config
* flinkrunner config
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
name: String,
config: FlinkConfig
) extends SinkConfig[ADT]
with LazyLogging {
override val connector: FlinkConnectorName = FlinkConnectorName.Kafka
override val properties: Properties = ConfigToProps.normalizeProps(
config,
pfx(),
List("bootstrap.servers")
)
val bootstrapServers: String =
properties.getProperty("bootstrap.servers")
val topic: String = config.getString(pfx("topic"))
val isKeyed: Boolean =
config.getBooleanOpt(pfx("is.keyed")).getOrElse(true)
def deliveryGuarantee: DeliveryGuarantee = config
.getStringOpt(pfx("delivery.guarantee"))
.map(s => s.toLowerCase.replaceAll("[^a-z]+", "-")) match {
case Some("at-least-once") =>
DeliveryGuarantee.AT_LEAST_ONCE
case Some("none") =>
DeliveryGuarantee.NONE
case _ => DeliveryGuarantee.AT_LEAST_ONCE
}
/** ensure transaction.timeout.ms is set */
val transactionTimeoutMs: Long = {
val t = properties.getProperty("transaction.timeout.ms", "60000")
properties.setProperty("transaction.timeout.ms", t)
t.toLong
}
val schemaRegistryConfig: SchemaRegistryConfig = SchemaRegistryConfig(
config
.getObjectOption(pfx("schema.registry"))
)
/** Return an confluent avro serialization schema */
def getAvroSerializationSchema[
E <: ADT with EmbeddedAvroRecord[A],
A <: GenericRecord]: KafkaRecordSerializationSchema[E] = {
new ConfluentAvroRegistryKafkaRecordSerializationSchema[E, A, ADT](
this
)
}
/** Returns, by default, a json serialization schema */
def getSerializationSchema[E <: ADT: TypeInformation]
: KafkaRecordSerializationSchema[E] =
new JsonKafkaRecordSerializationSchema[E, ADT](this)
def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
dataStream.sinkTo(_getSink[E](getAvroSerializationSchema[E, A]))
def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
dataStream.sinkTo(_getSink[E](getSerializationSchema[E]))
def _getSink[E <: ADT: TypeInformation](
serializer: KafkaRecordSerializationSchema[E]): KafkaSink[E] =
KafkaSink
.builder()
.setBootstrapServers(bootstrapServers)
.setDeliverGuarantee(deliveryGuarantee)
.setTransactionalIdPrefix(name)
.setKafkaProducerConfig(properties)
.setRecordSerializer(serializer)
.build()
}