Skip to content

Commit

Permalink
Merge e3568d1 into 5561c8a
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Oct 1, 2022
2 parents 5561c8a + e3568d1 commit b33dad4
Show file tree
Hide file tree
Showing 27 changed files with 310 additions and 180 deletions.
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ val V = new {
val squants = "1.8.3"
val confluentAvroSerde = "7.1.1"
val parquet = "1.12.3"
val awsSdk = "1.12.296"
val awsSdk = "1.12.307"
val jdbcMysql = "8.0.30"
val jdbcPg = "42.5.0"
val jdbcMssql = "11.2.0.jre11"
val hadoop = "2.8.5"
val hadoop = "3.3.2"
}

val flinkDeps =
Expand Down Expand Up @@ -119,7 +119,6 @@ val otherDeps = Seq(
"org.scalatestplus" %% "scalacheck-1-16" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
// not sure this works with nested types on csv as well as jsonmapper...will wait to find out
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % V.jackson,
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
Expand Down
52 changes: 37 additions & 15 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@ import io.epiphanous.flinkrunner.model.source._
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import scala.collection.JavaConverters._

/** Flink Job Invoker
/** FlinkRunner base class. All users of Flinkrunner will create their own
* subclass. The only required parameter is a [[FlinkConfig]] object. Two
* additional optional arguments exist for simplifying testing:
* - [[CheckResults]] - a class to provide inputs and check outputs to
* test your jobs transformation functions
* @param config
* a flink runner configuration
* @param checkResultsOpt
* an optional CheckResults class for testing
* @tparam ADT
* an algebraic data type for events processed by this flinkrunner
*/
abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
val config: FlinkConfig,
Expand Down Expand Up @@ -132,7 +143,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
*/
def getSourceConfig(
sourceName: String = getDefaultSourceName): SourceConfig[ADT] =
SourceConfig[ADT](sourceName, config)
SourceConfig[ADT](sourceName, this)

/** Helper method to convert a source config into a json-encoded source
* data stream.
Expand All @@ -154,16 +165,24 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
env.fromCollection(mockEvents).name(lbl).uid(lbl)
case _ =>
sourceConfig match {
case s: FileSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KafkaSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KinesisSourceConfig[ADT] => s.getSourceStream[E](env)
case s: RabbitMQSourceConfig[ADT] => s.getSourceStream[E](env)
case s: SocketSourceConfig[ADT] => s.getSourceStream[E](env)
case s: HybridSourceConfig[ADT] => s.getSourceStream[E](env)
case s: FileSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KafkaSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KinesisSourceConfig[ADT] => s.getSourceStream[E](env)
case s: RabbitMQSourceConfig[ADT] => s.getSourceStream[E](env)
case s: SocketSourceConfig[ADT] => s.getSourceStream[E](env)
case s: HybridSourceConfig[ADT] => s.getSourceStream[E](env)
case s: GeneratorSourceConfig[ADT] => s.getSourceStream[E](env)
}
}
}

def getDataGenerator[E <: ADT: TypeInformation]: DataGenerator[E] = ???

def getAvroDataGenerator[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): DataGenerator[E] = ???

/** Helper method to convert a source config into an avro-encoded source
* data stream. At the moment this is only supported for kafka sources
* (and trivially for collection sources for testing).
Expand Down Expand Up @@ -195,16 +214,19 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
env.fromCollection(mockEvents).name(lbl).uid(lbl)
case _ =>
sourceConfig match {
case s: FileSourceConfig[ADT] => s.getAvroSourceStream[E, A](env)
case s: KafkaSourceConfig[ADT] =>
case s: FileSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KafkaSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KinesisSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KinesisSourceConfig[ADT] =>
case s: RabbitMQSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: RabbitMQSourceConfig[ADT] =>
case s: SocketSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: SocketSourceConfig[ADT] =>
case s: HybridSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: HybridSourceConfig[ADT] =>
case s: GeneratorSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
}
}
Expand Down Expand Up @@ -250,7 +272,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](

def getSinkConfig(
sinkName: String = getDefaultSinkName): SinkConfig[ADT] =
SinkConfig[ADT](sinkName, config)
SinkConfig[ADT](sinkName, this)

/** Usually, we should write to the sink, unless we have a non-empty
* CheckResults configuration that determines otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,20 @@ trait BasePropGenerators {
def genOneWith[T](arb: Arbitrary[T]): T =
genOne[T](arb)

def genStreamWith[T](arb: Arbitrary[T]): Stream[T] = genStream(arb)

def genOne[T](implicit arb: Arbitrary[T]): T = genPop[T](1).head

def genStream[T](implicit arb: Arbitrary[T]): Stream[T] =
Stream
.from(0)
.flatMap(_ => arb.arbitrary.sample)

def genPop[T](
mean: Int = 10,
sd: Double = 0
)(implicit arb: Arbitrary[T]): List[T] =
Stream
.from(0)
.map(_ => arb.arbitrary.sample)
.filter(_.nonEmpty)
genStream[T]
.take(((Random.nextGaussian() - 0.5) * sd + mean).round.toInt)
.flatten
.toList
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.{Config, ConfigFactory, ConfigObject, ConfigOriginFactory}
import com.typesafe.config.{
Config,
ConfigFactory,
ConfigObject,
ConfigOriginFactory
}
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.FileUtils.getResourceOrFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {

case object RabbitMQ extends FlinkConnectorName

case object Generator extends FlinkConnectorName

val sources: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(CassandraSink, ElasticsearchSink)
val sinks: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Hybrid)
values diff IndexedSeq(Hybrid, Generator)

def fromSourceName(
sourceName: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.epiphanous.flinkrunner.model.sink

import io.epiphanous.flinkrunner.model.{
FlinkConfig,
FlinkConnectorName,
FlinkEvent
}
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model.{FlinkConnectorName, FlinkEvent}
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.cassandra.CassandraSink

Expand All @@ -17,14 +14,14 @@ import org.apache.flink.streaming.connectors.cassandra.CassandraSink
*
* @param name
* name of the sink
* @param config
* flink runner configuration
* @param runner
* flink runner instance
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class CassandraSinkConfig[ADT <: FlinkEvent](
name: String,
config: FlinkConfig
runner: FlinkRunner[ADT]
) extends SinkConfig[ADT] {

override val connector: FlinkConnectorName =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.{
FlinkConfig,
FlinkConnectorName,
FlinkEvent
}
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model.{FlinkConnectorName, FlinkEvent}
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.connector.elasticsearch.sink
import org.apache.flink.connector.elasticsearch.sink.{
Expand Down Expand Up @@ -36,14 +33,14 @@ import scala.collection.JavaConverters.mapAsJavaMap
*
* @param name
* name of the sink
* @param config
* flinkrunner configuration
* @param runner
* flinkrunner instance
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class ElasticsearchSinkConfig[ADT <: FlinkEvent](
name: String,
config: FlinkConfig
runner: FlinkRunner[ADT]
) extends SinkConfig[ADT]
with LazyLogging {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.serde._
import io.epiphanous.flinkrunner.util.AvroUtils.instanceOf
Expand Down Expand Up @@ -74,14 +75,14 @@ import scala.collection.JavaConverters._
*
* @param name
* name of the sink
* @param config
* flinkrunner config
* @param runner
* flinkrunner instance
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class FileSinkConfig[ADT <: FlinkEvent](
name: String,
config: FlinkConfig
runner: FlinkRunner[ADT]
) extends SinkConfig[ADT]
with LazyLogging {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model.SupportedDatabase.Snowflake
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink.JdbcSinkConfig.DEFAULT_CONNECTION_TIMEOUT
Expand Down Expand Up @@ -85,12 +86,14 @@ import scala.util.{Failure, Success, Try}
*
* @param name
* name of the sink
* @param config
* the flink runner configuration in which this sink is defined
* @param runner
* a flink runner instance
* @tparam ADT
* flink runner algebraic data type
*/
case class JdbcSinkConfig[ADT <: FlinkEvent](
name: String,
config: FlinkConfig)
runner: FlinkRunner[ADT])
extends SinkConfig[ADT]
with LazyLogging {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.serde.{
ConfluentAvroRegistryKafkaRecordSerializationSchema,
Expand Down Expand Up @@ -28,14 +29,14 @@ import scala.util.Try
*
* @param name
* name of the sink
* @param config
* flinkrunner config
* @param runner
* flinkrunner instance
* @tparam ADT
* the flinkrunner algebraic data type
*/
case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
name: String,
config: FlinkConfig
runner: FlinkRunner[ADT]
) extends SinkConfig[ADT]
with LazyLogging {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.{
FlinkConfig,
FlinkConnectorName,
FlinkEvent
}
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model.{FlinkConfig, FlinkConnectorName, FlinkEvent}
import io.epiphanous.flinkrunner.serde.JsonSerializationSchema
import org.apache.flink.api.common.serialization.SerializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand Down Expand Up @@ -36,7 +33,7 @@ import org.apache.flink.streaming.api.scala.DataStream
*/
case class KinesisSinkConfig[ADT <: FlinkEvent: TypeInformation](
name: String,
config: FlinkConfig
runner: FlinkRunner[ADT]
) extends SinkConfig[ADT]
with LazyLogging {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.epiphanous.flinkrunner.model.sink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model.{
FlinkConfig,
FlinkConnectorName,
FlinkEvent,
RabbitMQConnectionInfo
Expand All @@ -19,7 +19,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.{

case class RabbitMQSinkConfig[ADT <: FlinkEvent: TypeInformation](
name: String,
config: FlinkConfig)
runner: FlinkRunner[ADT])
extends SinkConfig[ADT]
with LazyLogging {

Expand Down

0 comments on commit b33dad4

Please sign in to comment.