-
Notifications
You must be signed in to change notification settings - Fork 4
/
FirehoseSinkConfig.scala
113 lines (105 loc) · 4.52 KB
/
FirehoseSinkConfig.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package io.epiphanous.flinkrunner.model.sink
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.serde.{
EmbeddedAvroJsonSerializationSchema,
JsonSerializationSchema
}
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.serialization.SerializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
/** AWS kinesis firehose sink config.
*
* Follow the instructions from the <a
* href="https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html">Amazon
* Kinesis Data Firehose Developer Guide</a> to setup a Kinesis Data
* Firehose delivery stream.
*
* Configuration:
* - stream - required name of the kinesis firehose delivery stream
* - aws.region - optional aws region where kinesis is hosted (defaults
* to us-east-1)
* - aws.endpoint - optional aws kinesis endpoint (defaults to normal
* endpoint from configured kinesis region, but you can specify if you
* are using localstack)
* - client - optional kinesis client properties (aws.region or
* aws.endpoint can also be specified here)
* - max.batch.size.in.number: the maximum size of a batch of entries
* that may be sent to KDS
* - max.in.flight.requests: the maximum number of in flight requests
* that may exist, if any more in flight requests need to be initiated
* once the maximum has been reached, then it will be blocked until
* some have completed
* - max.buffered.requests: the maximum number of elements held in the
* buffer, requests to add elements will be blocked while the number
* of elements in the buffer is at the maximum
* - max.batch.size.in.bytes: the maximum size of a batch of entries
* that may be sent to KDS measured in bytes
* - max.time.in.buffer: the maximum amount of time an entry is allowed
* to live in the buffer, if any element reaches this age, the entire
* buffer will be flushed immediately
* - max.record.size.in.bytes: the maximum size of a record the sink
* will accept into the buffer, a record of size larger than this will
* be rejected when passed to the sink
* - fail.on.error: when an exception is encountered while persisting to
* Kinesis Data Streams, the job will fail immediately if failOnError
* is set
*
* @param name
* name of the sink
* @param config
* flinkrunner config
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class FirehoseSinkConfig[ADT <: FlinkEvent: TypeInformation](
name: String,
config: FlinkConfig
) extends SinkConfig[ADT]
with LazyLogging {
override def connector: FlinkConnectorName =
FlinkConnectorName.Firehose
val props: KinesisProperties = KinesisProperties.fromSinkConfig(this)
override def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
_getSink(dataStream, getSerializationSchema[E])
override def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
_getSink(
dataStream,
getAvroSerializationSchema[E, A]
)
def _getSink[E <: ADT](
dataStream: DataStream[E],
serializationSchema: SerializationSchema[E]): DataStreamSink[E] = {
val kfs = {
val k = KinesisFirehoseSink
.builder[E]()
.setFirehoseClientProperties(props.clientProperties)
.setSerializationSchema(serializationSchema)
.setDeliveryStreamName(props.stream)
.setFailOnError(props.failOnError)
.setMaxInFlightRequests(props.maxInFlightRequests)
.setMaxBufferedRequests(props.maxBufferedRequests)
.setMaxBatchSize(props.maxBatchSizeInNumber)
.setMaxBatchSizeInBytes(props.maxBatchSizeInBytes)
.setMaxTimeInBufferMS(props.maxBufferTime)
props.maxRecordSizeInBytes
.map(k.setMaxBatchSizeInBytes)
.getOrElse(k)
}.build()
dataStream.sinkTo(kfs)
}
def getSerializationSchema[E <: ADT: TypeInformation]
: SerializationSchema[E] =
new JsonSerializationSchema[E, ADT](this)
def getAvroSerializationSchema[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation] =
new EmbeddedAvroJsonSerializationSchema[E, A, ADT](this)
}