-
Notifications
You must be signed in to change notification settings - Fork 4
/
SinkConfig.scala
91 lines (79 loc) · 2.92 KB
/
SinkConfig.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package io.epiphanous.flinkrunner.model.sink
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.FlinkConnectorName._
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.util.StreamUtils._
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
import java.util
import java.util.Properties
/** A flinkrunner sink configuration trait. All sink configs have a few
* common configuration options.
*
* Common Configuration Options:
*
* - `name`: the sink name
* - `connector`: one of
* - [[FlinkConnectorName.Cassandra]]
* - [[FlinkConnectorName.Elasticsearch]]
* - [[FlinkConnectorName.File]]
* - [[FlinkConnectorName.Firehose]]
* - [[FlinkConnectorName.Jdbc]]
* - [[FlinkConnectorName.Kafka]]
* - [[FlinkConnectorName.Kinesis]]
* - [[FlinkConnectorName.RabbitMQ]]
* - [[FlinkConnectorName.Socket]]
*
* @tparam ADT
* the flinkrunner algebraic data type
*/
trait SinkConfig[ADT <: FlinkEvent] extends LazyLogging {
def name: String
def config: FlinkConfig
def connector: FlinkConnectorName
def pfx(path: String = ""): String = Seq(
Some("sinks"),
Some(name),
if (path.isEmpty) None else Some(path)
).flatten.mkString(".")
val properties: Properties = config.getProperties(pfx("config"))
lazy val propertiesMap: util.HashMap[String, String] =
properties.asJavaMap
lazy val label: String = s"${connector.entryName.toLowerCase}/$name"
def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E]
def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E]
}
object SinkConfig {
def apply[ADT <: FlinkEvent: TypeInformation](
name: String,
config: FlinkConfig): SinkConfig[ADT] = {
FlinkConnectorName
.fromSinkName(
name,
config.jobName,
config.getStringOpt(s"sinks.$name.connector")
) match {
case Kafka => KafkaSinkConfig(name, config)
case Kinesis => KinesisSinkConfig(name, config)
case Firehose => FirehoseSinkConfig(name, config)
case File => FileSinkConfig(name, config)
case Socket => SocketSinkConfig(name, config)
case Jdbc => JdbcSinkConfig(name, config)
case Cassandra =>
CassandraSinkConfig(name, config)
case Elasticsearch =>
ElasticsearchSinkConfig(name, config)
case RabbitMQ => RabbitMQSinkConfig(name, config)
case connector =>
throw new RuntimeException(
s"Don't know how to configure ${connector.entryName} sink connector <$name> (in job <${config.jobName}>)"
)
}
}
}