Skip to content

Commit

Permalink
Merge 3398a49 into 284a4b1
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude-mdsol committed Apr 6, 2023
2 parents 284a4b1 + 3398a49 commit aabb605
Show file tree
Hide file tree
Showing 63 changed files with 2,689 additions and 1,151 deletions.
85 changes: 49 additions & 36 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,34 @@ resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath
resolvers += "Confluent Repository" at "https://packages.confluent.io/maven/"

val V = new {
val flink = "1.16.0"
val logback = "1.2.11"
val flink = "1.16.1"
val flinkMinor = "1.16"
val logback = "1.4.5"
val scalaLogging = "3.9.5"
val scalaTest = "3.2.14"
val scalaTestPlus = "3.2.14.0"
val scalaTest = "3.2.15"
val scalaTestPlus = "3.2.15.0"
val scalaCheck = "1.17.0"
val testContainersScala = "0.40.11"
val jackson = "2.14.1"
val testContainersScala = "0.40.12"
val jackson = "2.14.2"
val circe = "0.14.2"
val http4s = "0.23.12"
val enumeratum = "1.7.0"
val enumeratum = "1.7.2"
val typesafeConfig = "1.4.2"
val guava = "31.1-jre"
val squants = "1.8.3"
val confluentAvroSerde = "7.1.1"
val parquet = "1.12.3"
val awsSdk = "1.12.351"
val jdbcMysql = "8.0.30"
val jdbcPg = "42.5.1"
val awsSdk = "1.12.429"
val jdbcMysql = "8.0.32"
val jdbcPg = "42.5.4"
val jdbcMssql = "11.2.0.jre11"
val hadoop = "3.3.2"
val cassandraDriver = "3.11.3"
val uuidCreator = "5.2.0"
val iceberg = "1.2.0"
val jna = "5.12.1" // needed for testcontainers in some jvms
val minio = "8.5.2"
val awsSdk2 = "2.20.26"
}

val flinkDeps =
Expand All @@ -62,6 +66,8 @@ val flinkDeps =
"org.apache.flink" %% "flink-streaming-scala" % V.flink,
// rocksdb
"org.apache.flink" % "flink-statebackend-rocksdb" % V.flink,
// sql parser
"org.apache.flink" % "flink-sql-parser" % V.flink,
// queryable state
"org.apache.flink" % "flink-queryable-state-runtime" % V.flink % Provided,
// complex event processing
Expand Down Expand Up @@ -110,32 +116,39 @@ val circeDeps = Seq(
).map(d => "io.circe" %% s"circe-$d" % V.circe)

val otherDeps = Seq(
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
"com.beachape" %% "enumeratum" % V.enumeratum,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"org.scalactic" %% "scalactic" % V.scalaTest,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
"com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test,
"org.postgresql" % "postgresql" % V.jdbcPg % Provided,
"com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test,
"net.java.dev.jna" % "jna" % V.jna % Test,
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test,
"com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
"com.amazonaws" % "aws-java-sdk-s3" % V.awsSdk % Test,
"com.beachape" %% "enumeratum" % V.enumeratum,
"com.datastax.cassandra" % "cassandra-driver-extras" % V.cassandraDriver % Provided,
"com.dimafeng" %% "testcontainers-scala-cassandra" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-localstack-v2" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mssqlserver" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-mysql" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.google.guava" % "guava" % V.guava,
"com.lihaoyi" %% "requests" % "0.8.0" % Test,
"com.microsoft.sqlserver" % "mssql-jdbc" % V.jdbcMssql % Provided,
"com.typesafe" % "config" % V.typesafeConfig,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"io.minio" % "minio" % V.minio % Test,
"mysql" % "mysql-connector-java" % V.jdbcMysql % Provided,
"net.java.dev.jna" % "jna" % V.jna % Test,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"org.apache.iceberg" % s"iceberg-flink-runtime-${V.flinkMinor}" % V.iceberg,
"org.postgresql" % "postgresql" % V.jdbcPg % Provided,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"org.scalactic" %% "scalactic" % V.scalaTest,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.typelevel" %% "squants" % V.squants,
"software.amazon.awssdk" % "aws-sdk-java" % V.awsSdk2 % Test,
"software.amazon.awssdk" % "url-connection-client" % V.awsSdk2 % Test
) ++
Seq("org.apache.parquet" % "parquet-avro" % V.parquet % Provided).map(
m =>
Expand Down
158 changes: 58 additions & 100 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.AvroSchemaSerializer
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.data.RowData

import scala.collection.JavaConverters._
import scala.reflect.runtime.{universe => ru}

/** FlinkRunner base class. All users of Flinkrunner will create their own
* subclass. The only required parameter is a FlinkConfig object. Two
Expand Down Expand Up @@ -171,6 +172,13 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
sourceName: String = getDefaultSourceName): SourceConfig[ADT] =
SourceConfig[ADT](sourceName, config, generatorFactoryOpt)

def _mockSource[E <: ADT: TypeInformation](
sourceConfig: SourceConfig[ADT],
mockEvents: Seq[E]): DataStream[E] = {
val lbl = s"mock:${sourceConfig.label}"
env.fromCollection(mockEvents).name(lbl).uid(lbl)
}

/** Helper method to convert a source config into a json-encoded source
* data stream.
*
Expand All @@ -182,25 +190,13 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
* DataStream[E]
*/
def configToSource[E <: ADT: TypeInformation](
sourceConfig: SourceConfig[ADT]): DataStream[E] = {
sourceConfig: SourceConfig[ADT]): DataStream[E] =
checkResultsOpt
.map(c => c.getInputEvents[E](sourceConfig.name))
.getOrElse(List.empty[E]) match {
case mockEvents if mockEvents.nonEmpty =>
val lbl = s"mock:${sourceConfig.label}"
env.fromCollection(mockEvents).name(lbl).uid(lbl)
case _ =>
sourceConfig match {
case s: FileSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KafkaSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KinesisSourceConfig[ADT] => s.getSourceStream[E](env)
case s: RabbitMQSourceConfig[ADT] => s.getSourceStream[E](env)
case s: SocketSourceConfig[ADT] => s.getSourceStream[E](env)
case s: HybridSourceConfig[ADT] => s.getSourceStream[E](env)
case s: GeneratorSourceConfig[ADT] => s.getSourceStream[E](env)
}
}
}
.filter(_.nonEmpty)
.fold(sourceConfig.getSourceStream[E](env))(
_mockSource(sourceConfig, _)
)

/** Helper method to convert a source config into an avro-encoded source
* data stream. At the moment this is only supported for kafka sources
Expand All @@ -224,73 +220,36 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
sourceConfig: SourceConfig[ADT])(implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): DataStream[E] = {

fromKV: EmbeddedAvroRecordInfo[A] => E): DataStream[E] =
checkResultsOpt
.map(c => c.getInputEvents[E](sourceConfig.name))
.getOrElse(Seq.empty[E]) match {
case mockEvents if mockEvents.nonEmpty =>
val lbl = s"mock:${sourceConfig.label}"
env.fromCollection(mockEvents).name(lbl).uid(lbl)
case _ =>
sourceConfig match {
case s: FileSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KafkaSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KinesisSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: RabbitMQSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: SocketSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: HybridSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: GeneratorSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
}
}
}
.filter(_.nonEmpty)
.fold(sourceConfig.getAvroSourceStream[E, A](env))(
_mockSource(sourceConfig, _)
)

// ********************** SINKS **********************

/** Create a json-encoded stream sink from configuration.
/** Helper method to convert a source configuration into a DataStream[E]
*
* @param stream
* the data stream to send to sink
* @param sinkName
* the sink to send it to
* @param sourceConfig
* the source config
* @param fromRow
* an implicit method to convert a Row into an event of type E
* @tparam E
* stream element type
* the event data type
* @return
* DataStream[E]
*/
def toSink[E <: ADT: TypeInformation](
stream: DataStream[E],
sinkName: String
): DataStreamSink[E] =
configToSink[E](stream, getSinkConfig(sinkName))
def configToRowSource[E <: ADT with EmbeddedRowType: TypeInformation](
sourceConfig: SourceConfig[ADT])(implicit
fromRowData: RowData => E): DataStream[E] = {
checkResultsOpt
.map(c => c.getInputEvents[E](sourceConfig.name))
.filter(_.nonEmpty)
.fold(sourceConfig.getRowSourceStream[E](env))(
_mockSource(sourceConfig, _)
)
}

/** Create an avro-encoded stream sink from configuration.
*
* @param stream
* the data stream to send to the sink
* @param sinkName
* an optional sink name (defaults to first sink)
* @tparam E
* the event type
* @tparam A
* the avro record type
* @return
* the
*/
def toAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
stream: DataStream[E],
sinkName: String
): DataStreamSink[E] =
configToAvroSink[E, A](stream, getSinkConfig(sinkName))
// ********************** SINKS **********************

def getSinkConfig(
sinkName: String = getDefaultSinkName): SinkConfig[ADT] =
Expand All @@ -304,33 +263,32 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
*/
def writeToSink: Boolean = checkResultsOpt.forall(_.writeToSink)

def configToSink[E <: ADT: TypeInformation](
def addSink[E <: ADT: TypeInformation](
stream: DataStream[E],
sinkConfig: SinkConfig[ADT]): DataStreamSink[E] =
sinkConfig match {
case s: CassandraSinkConfig[ADT] => s.getSink[E](stream)
case s: ElasticsearchSinkConfig[ADT] => s.getSink[E](stream)
case s: FileSinkConfig[ADT] => s.getSink[E](stream)
case s: JdbcSinkConfig[ADT] => s.getSink[E](stream)
case s: KafkaSinkConfig[ADT] => s.getSink[E](stream)
case s: KinesisSinkConfig[ADT] => s.getSink[E](stream)
case s: RabbitMQSinkConfig[ADT] => s.getSink[E](stream)
case s: SocketSinkConfig[ADT] => s.getSink[E](stream)
sinkName: String): Unit =
getSinkConfig(sinkName) match {
case s: CassandraSinkConfig[ADT] => s.addSink[E](stream)
case s: ElasticsearchSinkConfig[ADT] => s.addSink[E](stream)
case s: FileSinkConfig[ADT] => s.addSink[E](stream)
case s: JdbcSinkConfig[ADT] => s.addSink[E](stream)
case s: KafkaSinkConfig[ADT] => s.addSink[E](stream)
case s: KinesisSinkConfig[ADT] => s.addSink[E](stream)
case s: RabbitMQSinkConfig[ADT] => s.addSink[E](stream)
case s: SocketSinkConfig[ADT] => s.addSink[E](stream)
case s: IcebergSinkConfig[ADT] => s.addSink[E](stream)
}

def configToAvroSink[
def addAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
stream: DataStream[E],
sinkConfig: SinkConfig[ADT]): DataStreamSink[E] =
sinkConfig match {
case s: CassandraSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: ElasticsearchSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: FileSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: JdbcSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: KafkaSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: KinesisSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: RabbitMQSinkConfig[ADT] => s.getAvroSink[E, A](stream)
case s: SocketSinkConfig[ADT] => s.getAvroSink[E, A](stream)
}
sinkName: String): Unit =
getSinkConfig(sinkName).addAvroSink[E, A](stream)

def addRowSink[
E <: ADT with EmbeddedRowType: TypeInformation: ru.TypeTag](
stream: DataStream[E],
sinkName: String = getDefaultSinkName): Unit =
getSinkConfig(sinkName).addRowSink[E](stream)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ abstract class AvroStreamJob[

override def sink(out: DataStream[OUT]): Unit =
runner.getSinkNames.foreach(name =>
runner.toAvroSink[OUT, A](out, name)
runner.addAvroSink[OUT, A](out, name)
)
}
Loading

0 comments on commit aabb605

Please sign in to comment.