-
Notifications
You must be signed in to change notification settings - Fork 4
/
FlinkConnectorName.scala
91 lines (72 loc) · 2.92 KB
/
FlinkConnectorName.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package io.epiphanous.flinkrunner.model
import enumeratum.EnumEntry.Snakecase
import enumeratum.{Enum, EnumEntry}
import scala.collection.immutable
sealed trait FlinkConnectorName extends EnumEntry with Snakecase
object FlinkConnectorName extends Enum[FlinkConnectorName] {
val values: immutable.IndexedSeq[FlinkConnectorName] = findValues
case object Hybrid extends FlinkConnectorName
case object Kinesis extends FlinkConnectorName
case object Kafka extends FlinkConnectorName
case object File extends FlinkConnectorName
case object Socket extends FlinkConnectorName
case object Cassandra extends FlinkConnectorName
case object Elasticsearch extends FlinkConnectorName
case object Jdbc extends FlinkConnectorName
case object RabbitMQ extends FlinkConnectorName
case object Generator extends FlinkConnectorName
val sources: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Cassandra, Elasticsearch)
val sinks: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Hybrid, Generator)
def fromSourceName(
sourceName: String,
jobName: String,
connectorNameOpt: Option[String] = None,
defaultOpt: Option[FlinkConnectorName] = None): FlinkConnectorName =
fromName("source", sourceName, jobName, connectorNameOpt, defaultOpt)
def fromSinkName(
sinkName: String,
jobName: String,
connectorNameOpt: Option[String] = None,
defaultOpt: Option[FlinkConnectorName] = None): FlinkConnectorName =
fromName("sink", sinkName, jobName, connectorNameOpt, defaultOpt)
def fromName(
sourceOrSink: String,
sourceOrSinkName: String,
jobName: String,
connectorNameOpt: Option[String] = None,
defaultOpt: Option[FlinkConnectorName] = None)
: FlinkConnectorName = {
val sourceOrSinkID = s"$sourceOrSinkName $sourceOrSink in job $jobName"
val connector = (connectorNameOpt match {
case Some(connectorName) => withNameInsensitiveOption(connectorName)
case None =>
val lcName = sourceOrSinkName.toLowerCase
val lcNameSuffixed = s"${lcName}_$sourceOrSink"
values.find { c =>
Seq(lcName, lcNameSuffixed).exists(
_.contains(c.entryName.toLowerCase)
)
}
}) match {
case Some(c) => c
case None =>
defaultOpt match {
case Some(c) => c
case None =>
throw new RuntimeException(
s"No valid connector type found for $sourceOrSinkID. Please set the connector type in the $sourceOrSink configuration."
)
}
}
sourceOrSink match {
case "source" if sources.contains(connector) => connector
case "sink" if sinks.contains(connector) => connector
case _ =>
throw new RuntimeException(
s"${connector.entryName} is an invalid connector for $sourceOrSinkID"
)
}
}
}