Skip to content

Commit

Permalink
add cassandra and elasticsearch sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Sep 10, 2020
1 parent 9c09efb commit 888dfd5
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import java.nio.ByteBuffer
import java.time.Instant
import java.time.temporal.ChronoUnit

import com.sksamuel.avro4s.{AvroSchema, Decoder, Encoder, RecordFormat, SchemaFor}
import org.apache.avro.LogicalTypes.LogicalTypeFactory
import org.apache.avro.{LogicalTypes, Schema}
import com.sksamuel.avro4s._
import org.apache.avro.file.{DataFileReader, DataFileWriter, SeekableByteArrayInput}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.{LogicalTypes, Schema}

import scala.util.Try

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
case object Kafka extends FlinkConnectorName
case object File extends FlinkConnectorName
case object Socket extends FlinkConnectorName
case object Cassandra extends FlinkConnectorName
case object CassandraSink extends FlinkConnectorName
case object ElasticsearchSink extends FlinkConnectorName
case object Jdbc extends FlinkConnectorName
case object Collection extends FlinkConnectorName
}
29 changes: 29 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ object SinkConfig {
config.getProperties(s"$p.config"))
case Jdbc =>
JdbcSinkConfig(connector, name, config.getString(s"$p.query"), config.getProperties(s"$p.config"))
case CassandraSink =>
CassandraSinkConfig(connector,
name,
config.getString(s"$p.host"),
config.getString(s"$p.query"),
config.getProperties(s"$p.config"))
case ElasticsearchSink =>
ElasticsearchSinkConfig(connector,
name,
config.getStringList(s"$p.transports"),
config.getString(s"$p.index"),
config.getString(s"$p.type"),
config.getProperties(s"$p.config"))

case other => throw new RuntimeException(s"$other $name connector not valid sink (job ${config.jobName}")

}
Expand Down Expand Up @@ -80,3 +94,18 @@ final case class JdbcSinkConfig(
query: String,
properties: Properties)
extends SinkConfig
final case class CassandraSinkConfig(
connector: FlinkConnectorName = CassandraSink,
name: String,
host: String,
query: String,
properties: Properties)
extends SinkConfig
final case class ElasticsearchSinkConfig(
connector: FlinkConnectorName = ElasticsearchSink,
name: String,
transports: List[String],
index: String,
`type`: String,
properties: Properties)
extends SinkConfig
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ object SourceConfig {
connector match {
case Kafka =>
KafkaSourceConfig(connector, name, config.getString(s"$p.topic"), config.getProperties(s"$p.config"))
case KeyedKafka =>
KeyedKafkaSourceConfig(connector, name, config.getString(s"$p.topic"), config.getProperties(s"$p.config"))
case Kinesis =>
KinesisSourceConfig(connector, name, config.getString(s"$p.stream"), config.getProperties(s"$p.config"))
case File =>
Expand All @@ -45,12 +43,6 @@ final case class KafkaSourceConfig(
topic: String,
properties: Properties)
extends SourceConfig
final case class KeyedKafkaSourceConfig(
connector: FlinkConnectorName = KeyedKafka,
name: String,
topic: String,
properties: Properties)
extends SourceConfig
final case class KinesisSourceConfig(
connector: FlinkConnectorName = Kinesis,
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.aggregate.Aggregate
import squants.energy.{Energy, Power}
import squants.information.{DataRate, Information}
import squants.market.Money
import squants.mass.{AreaDensity, ChemicalAmount, Density, Mass, MomentOfInertia}
import squants.mass._
import squants.motion._
import squants.photo._
import squants.radio._
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/io/epiphanous/flinkrunner/util/JdbcSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context
import org.apache.flink.streaming.api.scala._

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -59,7 +60,7 @@ class JdbcSink[E <: FlinkEvent: TypeInformation](batchFunction: AddToJdbcBatchFu
super.open(parameters)
}

override def invoke(value: E): Unit = {
override def invoke(value: E, context: Context[_]): Unit = {
pendingRows += value
if (pendingRows.size >= bufferSize) {
pendingRows.foreach(row => batchFunction.addToBatch(row, statement))
Expand Down
103 changes: 83 additions & 20 deletions src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package io.epiphanous.flinkrunner.util

import java.io.{File, FileNotFoundException}
import java.net.URL
import java.nio.charset.StandardCharsets

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.SEE
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.{
BasePathBucketAssigner,
DateTimeBucketAssigner
Expand All @@ -23,6 +24,9 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
}
import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
Expand All @@ -31,6 +35,10 @@ import org.apache.flink.streaming.connectors.kafka.{
KafkaSerializationSchema
}
import org.apache.flink.streaming.connectors.kinesis.{FlinkKinesisConsumer, FlinkKinesisProducer}
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

import scala.collection.JavaConverters._

object StreamUtils extends LazyLogging {

Expand Down Expand Up @@ -130,7 +138,7 @@ object StreamUtils extends LazyLogging {
val uid = src.label
(src match {
case src: KafkaSourceConfig => fromKafka(src)
case src: KinesisSourceConfig => KinesisStreamUtils.fromKinesis(src)
case src: KinesisSourceConfig => fromKinesis(src)
case src: FileSourceConfig => fromFile(src)
case src: SocketSourceConfig => fromSocket(src)
case src: CollectionSourceConfig => fromCollection(src)
Expand Down Expand Up @@ -295,18 +303,19 @@ object StreamUtils extends LazyLogging {
stream: DataStream[E],
sinkName: String = ""
)(implicit config: FlinkConfig
): DataStreamSink[E] = {
) = {
val name = if (sinkName.isEmpty) config.getSinkNames.head else sinkName
val src = config.getSinkConfig(name)
val label = src.label
(src match {
case s: KafkaSinkConfig => toKafka[E](stream, s)
case s: KinesisSinkConfig => KinesisStreamUtils.toKinesis[E](stream, s)
case s: FileSinkConfig => toFile[E](stream, s)
case s: SocketSinkConfig => toSocket[E](stream, s)
case s: JdbcSinkConfig => toJdbc[E](stream, s)
case s => throw new IllegalArgumentException(s"unsupported source connector: ${s.connector}")
}).name(label).uid(label)
case s: KafkaSinkConfig => toKafka[E](stream, s)
case s: KinesisSinkConfig => toKinesis[E](stream, s)
case s: FileSinkConfig => toFile[E](stream, s)
case s: SocketSinkConfig => toSocket[E](stream, s)
case s: JdbcSinkConfig => toJdbc[E](stream, s)
case s: CassandraSinkConfig => toCassandraSink[E](stream, s)
case s: ElasticsearchSinkConfig => toElasticsearchSink[E](stream, s)
case s => throw new IllegalArgumentException(s"unsupported source connector: ${s.connector}")
})
}

/**
Expand All @@ -321,14 +330,16 @@ object StreamUtils extends LazyLogging {
stream: DataStream[E],
sinkConfig: KafkaSinkConfig
)(implicit config: FlinkConfig
): DataStreamSink[E] =
) =
stream
.addSink(
new FlinkKafkaProducer[E](sinkConfig.topic,
config.getKafkaSerializationSchema.asInstanceOf[KafkaSerializationSchema[E]],
sinkConfig.properties,
Semantic.AT_LEAST_ONCE)
)
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to a kinesis sink.
Expand All @@ -342,7 +353,7 @@ object StreamUtils extends LazyLogging {
stream: DataStream[E],
sinkConfig: KinesisSinkConfig
)(implicit config: FlinkConfig
): DataStreamSink[E] =
) =
stream
.addSink({
val sink =
Expand All @@ -353,6 +364,7 @@ object StreamUtils extends LazyLogging {
sink.setDefaultPartition("0")
sink
})
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
Expand All @@ -367,11 +379,13 @@ object StreamUtils extends LazyLogging {
stream: DataStream[E],
sinkConfig: JdbcSinkConfig
)(implicit config: FlinkConfig
): DataStreamSink[E] =
) =
stream
.addSink(
new JdbcSink(config.getAddToJdbcBatchFunction.asInstanceOf[AddToJdbcBatchFunction[E]], sinkConfig.properties)
)
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to a rolling file sink.
Expand All @@ -385,7 +399,7 @@ object StreamUtils extends LazyLogging {
stream: DataStream[E],
sinkConfig: FileSinkConfig
)(implicit config: FlinkConfig
): DataStreamSink[E] = {
) = {
val path = sinkConfig.path
val p = sinkConfig.properties
val bucketCheckInterval = p.getProperty("bucket.check.interval", s"${60000}").toLong
Expand Down Expand Up @@ -423,7 +437,7 @@ object StreamUtils extends LazyLogging {

case _ => throw new IllegalArgumentException(s"Unknown file sink encoder format: '$encoderFormat'")
}
stream.addSink(sink)
stream.addSink(sink).uid(sinkConfig.label).name(sinkConfig.label)
}

/**
Expand All @@ -438,9 +452,58 @@ object StreamUtils extends LazyLogging {
stream: DataStream[E],
sinkConfig: SocketSinkConfig
)(implicit config: FlinkConfig
): DataStreamSink[E] =
stream.writeToSocket(sinkConfig.host,
sinkConfig.port,
config.getSerializationSchema.asInstanceOf[SerializationSchema[E]])
) =
stream
.writeToSocket(sinkConfig.host,
sinkConfig.port,
config.getSerializationSchema.asInstanceOf[SerializationSchema[E]])
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to a cassandra sink.
* @param stream the data stream
* @param sinkConfig a sink configuration
* @tparam E stream element type
* @return DataStreamSink[E]
*/
def toCassandraSink[E <: FlinkEvent: TypeInformation](stream: DataStream[E], sinkConfig: CassandraSinkConfig) =
CassandraSink
.addSink(stream)
.setHost(sinkConfig.host)
.setQuery(sinkConfig.query)
.build()
.uid(sinkConfig.label)
.name(sinkConfig.label)

/**
* Send stream to an elasticsearch sink.
* @param stream the data stream
* @param sinkConfig a sink configuration
* @tparam E stream element type
* @return DataStreamSink[E]
*/
def toElasticsearchSink[E <: FlinkEvent: TypeInformation](
stream: DataStream[E],
sinkConfig: ElasticsearchSinkConfig
) = {
val hosts = sinkConfig.transports
.map(s => {
val url = new URL(s"https://${s}")
val hostname = url.getHost
val port = if (url.getPort < 0) 9200 else url.getPort
new HttpHost(hostname, port, "https")
})
.asJava
val esSink = new ElasticsearchSink.Builder[E](hosts, new ElasticsearchSinkFunction[E] {
override def process(element: E, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
val values = element.productIterator
val data = element.getClass.getDeclaredFields.map(_.getName -> values.next).toMap.asJava
val req = Requests.indexRequest(sinkConfig.index).`type`(sinkConfig.`type`).source(data)
indexer.add(req)
}
}).build()
stream.addSink(esSink).uid(sinkConfig.label).name(sinkConfig.label)
}

}

0 comments on commit 888dfd5

Please sign in to comment.