-
Notifications
You must be signed in to change notification settings - Fork 4
/
ElasticsearchSinkConfig.scala
133 lines (119 loc) · 4.33 KB
/
ElasticsearchSinkConfig.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package io.epiphanous.flinkrunner.model.sink
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
FlinkConfig,
FlinkConnectorName,
FlinkEvent
}
import io.epiphanous.flinkrunner.util.AvroUtils.RichGenericRecord
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.connector.elasticsearch.sink
import org.apache.flink.connector.elasticsearch.sink.{
Elasticsearch7SinkBuilder,
ElasticsearchEmitter,
FlushBackoffType
}
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
import java.net.URL
import scala.collection.JavaConverters._
/** Elasticsearch sink config
*
* Configuration:
* - `index`: the name of the elasticsearch index to insert records into
* - `transports`: list of elasticsearch endpoints
* - `bulk.flush.backoff`:
* - `type`
* - `retries`
* - `delay`
* - `bulk.flush.max.actions`
* - `bulk.flush.max.size.mb`
* - `bulk.flush.interval.ms`
*
* @param name
* name of the sink
* @param config
* flinkrunner configuration
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class ElasticsearchSinkConfig[ADT <: FlinkEvent](
name: String,
config: FlinkConfig
) extends SinkConfig[ADT]
with LazyLogging {
override val connector: FlinkConnectorName =
FlinkConnectorName.Elasticsearch
val index: String = config.getString(pfx("index"))
val transports: List[HttpHost] =
config.getStringList(pfx("transports")).map { s =>
val url = new URL(if (s.startsWith("http")) s else s"http://$s")
val hostname = url.getHost
val port = if (url.getPort < 0) 9200 else url.getPort
new HttpHost(hostname, port, url.getProtocol)
}
val bulkFlushBackoffType: FlushBackoffType = FlushBackoffType
.valueOf(properties.getProperty("bulk.flush.backoff.type", "NONE"))
val bulkFlushBackoffRetries: Int =
properties.getProperty("bulk.flush.backoff.retries", "5").toInt
val bulkFlushBackoffDelay: Long =
properties.getProperty("bulk.flush.backoff.delay", "1000").toLong
val bulkFlushMaxActions: Option[Int] =
Option(properties.getProperty("bulk.flush.max.actions")).map(_.toInt)
val bulkFlushMaxSizeMb: Option[Int] =
Option(properties.getProperty("bulk.flush.max.size.mb")).map(_.toInt)
val bulkFlushIntervalMs: Option[Long] =
Option(properties.getProperty("bulk.flush.interval.ms")).map(_.toLong)
def _getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E],
emitter: ElasticsearchEmitter[E]): DataStreamSink[E] = {
val esb =
new Elasticsearch7SinkBuilder[E]
.setHosts(transports: _*)
.setEmitter[E](emitter)
.setBulkFlushBackoffStrategy(
bulkFlushBackoffType,
bulkFlushBackoffRetries,
bulkFlushBackoffDelay
)
bulkFlushMaxActions.foreach(esb.setBulkFlushMaxActions)
bulkFlushMaxSizeMb.foreach(esb.setBulkFlushMaxSizeMb)
bulkFlushIntervalMs.foreach(esb.setBulkFlushInterval)
dataStream.sinkTo(esb.build()).uid(label).name(label)
}
override def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
_getSink(dataStream, getEmitter[E])
override def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
_getSink(dataStream, getAvroEmitter[E, A])
def _getEmitter[E <: ADT](
getData: E => AnyRef): ElasticsearchEmitter[E] =
(element: E, _: SinkWriter.Context, indexer: sink.RequestIndexer) =>
indexer.add(
Requests.indexRequest
.index(index)
.source(
Map("data" -> getData(element)).asJava
)
)
def getEmitter[E <: ADT]: ElasticsearchEmitter[E] = _getEmitter { e =>
val values = e.productIterator
e.getClass.getDeclaredFields
.map(_.getName -> values.next())
.toMap
.asJava
}
def getAvroEmitter[
E <: ADT with EmbeddedAvroRecord[A],
A <: GenericRecord]: ElasticsearchEmitter[E] = _getEmitter(
_.$record.getDataAsMap.asJava
)
}