Skip to content

Commit

Permalink
Merge fba98c9 into 9734da0
Browse files Browse the repository at this point in the history
  • Loading branch information
rbalaban-mdsol committed Nov 14, 2022
2 parents 9734da0 + fba98c9 commit e5e7905
Show file tree
Hide file tree
Showing 27 changed files with 702 additions and 267 deletions.
7 changes: 5 additions & 2 deletions build.sbt
Expand Up @@ -36,7 +36,7 @@ val V = new {
val scalaTestPlus = "3.2.14.0"
val scalaCheck = "1.17.0"
val testContainersScala = "0.40.11"
val jackson = "2.13.4"
val jackson = "2.14.0"
val circe = "0.14.2"
val http4s = "0.23.12"
val enumeratum = "1.7.0"
Expand All @@ -50,6 +50,7 @@ val V = new {
val jdbcPg = "42.5.0"
val jdbcMssql = "11.2.0.jre11"
val hadoop = "3.3.2"
val cassandraDriver = "3.11.3"
val jna = "5.12.1" // needed for testcontainers in some jvms
}

Expand Down Expand Up @@ -130,7 +131,9 @@ val otherDeps = Seq(
"org.postgresql" % "postgresql" % V.jdbcPg % Provided,
"com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test,
"net.java.dev.jna" % "jna" % V.jna % Test,
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test,
"com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided
) ++
Seq("org.apache.parquet" % "parquet-avro" % V.parquet % Provided).map(
m =>
Expand Down
27 changes: 18 additions & 9 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Expand Up @@ -5,9 +5,11 @@ import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink._
import io.epiphanous.flinkrunner.model.source._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.AvroSchemaSerializer
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
Expand Down Expand Up @@ -42,6 +44,11 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](

val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

env.getConfig.addDefaultKryoSerializer(
classOf[Schema],
classOf[AvroSchemaSerializer]
)

/** Gets (and returns as string) the execution plan for the job from the
* StreamExecutionEnvironment.
* @return
Expand Down Expand Up @@ -261,7 +268,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
def toSink[E <: ADT: TypeInformation](
stream: DataStream[E],
sinkName: String
): Object =
): DataStreamSink[E] =
configToSink[E](stream, getSinkConfig(sinkName))

/** Create an avro-encoded stream sink from configuration.
Expand All @@ -282,7 +289,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
A <: GenericRecord: TypeInformation](
stream: DataStream[E],
sinkName: String
): Object =
): DataStreamSink[E] =
configToAvroSink[E, A](stream, getSinkConfig(sinkName))

def getSinkConfig(
Expand All @@ -299,7 +306,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](

def configToSink[E <: ADT: TypeInformation](
stream: DataStream[E],
sinkConfig: SinkConfig[ADT]): Object =
sinkConfig: SinkConfig[ADT]): DataStreamSink[E] =
sinkConfig match {
case s: CassandraSinkConfig[ADT] => s.getSink[E](stream)
case s: ElasticsearchSinkConfig[ADT] => s.getSink[E](stream)
Expand All @@ -317,11 +324,13 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
stream: DataStream[E],
sinkConfig: SinkConfig[ADT]): DataStreamSink[E] =
sinkConfig match {
case s: KafkaSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: FileSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s =>
throw new RuntimeException(
s"Avro serialization not supported for ${s.connector} sinks"
)
case s: CassandraSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: ElasticsearchSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: FileSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: JdbcSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: KafkaSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: KinesisSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: RabbitMQSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: SocketSinkConfig[ADT] => s.getAvroSink[E, A](stream)
}
}
Expand Up @@ -20,9 +20,9 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {

case object Socket extends FlinkConnectorName

case object CassandraSink extends FlinkConnectorName
case object Cassandra extends FlinkConnectorName

case object ElasticsearchSink extends FlinkConnectorName
case object Elasticsearch extends FlinkConnectorName

case object Jdbc extends FlinkConnectorName

Expand All @@ -31,7 +31,7 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {
case object Generator extends FlinkConnectorName

val sources: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(CassandraSink, ElasticsearchSink)
values diff IndexedSeq(Cassandra, Elasticsearch)
val sinks: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Hybrid, Generator)

Expand Down
@@ -1,12 +1,19 @@
package io.epiphanous.flinkrunner.model.sink

import com.datastax.driver.core.{Cluster, CodecRegistry}
import com.datastax.driver.extras.codecs.jdk8.InstantCodec
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
FlinkConfig,
FlinkConnectorName,
FlinkEvent
}
import io.epiphanous.flinkrunner.util.AvroUtils.RichGenericRecord
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.cassandra._

/** A cassandra sink config.
*
Expand All @@ -28,17 +35,52 @@ case class CassandraSinkConfig[ADT <: FlinkEvent](
) extends SinkConfig[ADT] {

override val connector: FlinkConnectorName =
FlinkConnectorName.CassandraSink
FlinkConnectorName.Cassandra

val host: String = config.getString(pfx("host"))
val host: String =
config.getStringOpt(pfx("host")).getOrElse("localhost")
val port: Int = config.getIntOpt(pfx("port")).getOrElse(9042)
val query: String = config.getString(pfx("query"))

def getSink[E <: ADT](stream: DataStream[E]): CassandraSink[E] =
CassandraSink
.addSink(stream)
.setHost(host)
.setQuery(query)
.build()
/** Don't convert to single abstract method...flink will complain
*/
val clusterBuilder: ClusterBuilder = new ClusterBuilder {
override def buildCluster(builder: Cluster.Builder): Cluster =
builder
.addContactPoint(host)
.withPort(port)
.withoutJMXReporting()
.withCodecRegistry(
new CodecRegistry().register(InstantCodec.instance)
)
.build()
}

def getSink[E <: ADT: TypeInformation](
stream: DataStream[E]): DataStreamSink[E] = {
stream
.addSink(new CassandraScalaProductSink[E](query, clusterBuilder))
.uid(label)
.name(label)
}

override def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
stream: DataStream[E]): DataStreamSink[E] =
stream
.addSink(
new AbstractCassandraTupleSink[E](
query,
clusterBuilder,
CassandraSinkBaseConfig.newBuilder().build(),
new NoOpCassandraFailureHandler()
) {
override def extract(record: E): Array[AnyRef] =
record.$record.getDataAsSeq.toArray
}
)
.uid(label)
.name(label)

}
Expand Up @@ -2,10 +2,14 @@ package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
FlinkConfig,
FlinkConnectorName,
FlinkEvent
}
import io.epiphanous.flinkrunner.util.AvroUtils.RichGenericRecord
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.connector.elasticsearch.sink
import org.apache.flink.connector.elasticsearch.sink.{
Expand All @@ -19,7 +23,7 @@ import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

import java.net.URL
import scala.collection.JavaConverters.mapAsJavaMap
import scala.collection.JavaConverters._

/** Elasticsearch sink config
*
Expand Down Expand Up @@ -48,7 +52,7 @@ case class ElasticsearchSinkConfig[ADT <: FlinkEvent](
with LazyLogging {

override val connector: FlinkConnectorName =
FlinkConnectorName.ElasticsearchSink
FlinkConnectorName.Elasticsearch

val index: String = config.getString(pfx("index"))
val transports: List[HttpHost] =
Expand All @@ -75,11 +79,13 @@ case class ElasticsearchSinkConfig[ADT <: FlinkEvent](
val bulkFlushIntervalMs: Option[Long] =
Option(properties.getProperty("bulk.flush.interval.ms")).map(_.toLong)

def getSink[E <: ADT](dataStream: DataStream[E]): DataStreamSink[E] = {
def _getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E],
emitter: ElasticsearchEmitter[E]): DataStreamSink[E] = {
val esb =
new Elasticsearch7SinkBuilder[E]
.setHosts(transports: _*)
.setEmitter[E](getEmitter[E])
.setEmitter[E](emitter)
.setBulkFlushBackoffStrategy(
bulkFlushBackoffType,
bulkFlushBackoffRetries,
Expand All @@ -91,13 +97,37 @@ case class ElasticsearchSinkConfig[ADT <: FlinkEvent](
dataStream.sinkTo(esb.build()).uid(label).name(label)
}

def getEmitter[E <: ADT]: ElasticsearchEmitter[E] =
override def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
_getSink(dataStream, getEmitter[E])

override def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
_getSink(dataStream, getAvroEmitter[E, A])

def _getEmitter[E <: ADT](
getData: E => AnyRef): ElasticsearchEmitter[E] =
(element: E, _: SinkWriter.Context, indexer: sink.RequestIndexer) =>
indexer.add(
Requests.indexRequest
.index(index)
.source(
mapAsJavaMap(Map("data" -> element.asInstanceOf[AnyRef]))
Map("data" -> getData(element)).asJava
)
)

def getEmitter[E <: ADT]: ElasticsearchEmitter[E] = _getEmitter { e =>
val values = e.productIterator
e.getClass.getDeclaredFields
.map(_.getName -> values.next())
.toMap
.asJava
}
def getAvroEmitter[
E <: ADT with EmbeddedAvroRecord[A],
A <: GenericRecord]: ElasticsearchEmitter[E] = _getEmitter(
_.$record.getDataAsMap.asJava
)
}
Expand Up @@ -110,7 +110,7 @@ case class FileSinkConfig[ADT <: FlinkEvent](
* @return
* DataStreamSink[E]
*/
def getSink[E <: ADT: TypeInformation](
override def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
dataStream.sinkTo(
FileSink
Expand All @@ -133,7 +133,7 @@ case class FileSinkConfig[ADT <: FlinkEvent](
* @return
* DataStream[E]
*/
def getAvroSink[
override def getAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] = {
Expand Down

0 comments on commit e5e7905

Please sign in to comment.