-
Notifications
You must be signed in to change notification settings - Fork 4
/
CassandraSinkConfig.scala
86 lines (78 loc) · 2.47 KB
/
CassandraSinkConfig.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package io.epiphanous.flinkrunner.model.sink
import com.datastax.driver.core.{Cluster, CodecRegistry}
import com.datastax.driver.extras.codecs.jdk8.InstantCodec
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
FlinkConfig,
FlinkConnectorName,
FlinkEvent
}
import io.epiphanous.flinkrunner.util.AvroUtils.RichGenericRecord
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.cassandra._
/** A cassandra sink config.
*
* Configuration:
*
* - `host`: the cassandra endpoint
* - `query`: an insert query
*
* @param name
* name of the sink
* @param config
* flink runner configuration
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class CassandraSinkConfig[ADT <: FlinkEvent](
name: String,
config: FlinkConfig
) extends SinkConfig[ADT] {
override val connector: FlinkConnectorName =
FlinkConnectorName.Cassandra
val host: String =
config.getStringOpt(pfx("host")).getOrElse("localhost")
val port: Int = config.getIntOpt(pfx("port")).getOrElse(9042)
val query: String = config.getString(pfx("query"))
/** Don't convert to single abstract method...flink will complain
*/
val clusterBuilder: ClusterBuilder = new ClusterBuilder {
override def buildCluster(builder: Cluster.Builder): Cluster =
builder
.addContactPoint(host)
.withPort(port)
.withoutJMXReporting()
.withCodecRegistry(
new CodecRegistry().register(InstantCodec.instance)
)
.build()
}
def getSink[E <: ADT: TypeInformation](
stream: DataStream[E]): DataStreamSink[E] = {
stream
.addSink(new CassandraScalaProductSink[E](query, clusterBuilder))
.uid(label)
.name(label)
}
override def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
stream: DataStream[E]): DataStreamSink[E] =
stream
.addSink(
new AbstractCassandraTupleSink[E](
query,
clusterBuilder,
CassandraSinkBaseConfig.newBuilder().build(),
new NoOpCassandraFailureHandler()
) {
override def extract(record: E): Array[AnyRef] =
record.$record.getDataAsSeq.toArray
}
)
.uid(label)
.name(label)
}