Skip to content

Commit

Permalink
Merge b815558 into 5561c8a
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Oct 3, 2022
2 parents 5561c8a + b815558 commit 8fb811a
Show file tree
Hide file tree
Showing 38 changed files with 714 additions and 128 deletions.
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ val V = new {
val squants = "1.8.3"
val confluentAvroSerde = "7.1.1"
val parquet = "1.12.3"
val awsSdk = "1.12.296"
val awsSdk = "1.12.307"
val jdbcMysql = "8.0.30"
val jdbcPg = "42.5.0"
val jdbcMssql = "11.2.0.jre11"
val hadoop = "2.8.5"
val hadoop = "3.3.2"
}

val flinkDeps =
Expand Down Expand Up @@ -119,7 +119,6 @@ val otherDeps = Seq(
"org.scalatestplus" %% "scalacheck-1-16" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
// not sure this works with nested types on csv as well as jsonmapper...will wait to find out
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % V.jackson,
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
Expand Down
10 changes: 5 additions & 5 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.2")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1")
addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.2")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1")
addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
76 changes: 57 additions & 19 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,55 @@ import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.model.sink._
import io.epiphanous.flinkrunner.model.source._
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import scala.collection.JavaConverters._

/** Flink Job Invoker
/** FlinkRunner base class. All users of Flinkrunner will create their own
* subclass. The only required parameter is a FlinkConfig object. Two
* additional optional arguments exist for simplifying testing:
* - CheckResults - a class to provide inputs and check outputs to test
* your job's transformation functions
* - GeneratorFactory - a factory class to create DataGenerator
* instances to build random event streams for testing
* @param config
* a flink runner configuration
* @param checkResultsOpt
* an optional CheckResults class for testing
* @param generatorFactoryOpt
* an optional GeneratorFactory instance to create data generators if
* you plan to use the GeneratorSource
* @tparam ADT
* an algebraic data type for events processed by this flinkrunner
*/
abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
val config: FlinkConfig,
val checkResultsOpt: Option[CheckResults[ADT]] = None)
val checkResultsOpt: Option[CheckResults[ADT]] = None,
val generatorFactoryOpt: Option[GeneratorFactory[ADT]] = None)
extends LazyLogging {

/** the configured StreamExecutionEnvironment */
val env: StreamExecutionEnvironment =
config.configureStreamExecutionEnvironment
config.getStreamExecutionEnvironment

/** the configured StreamTableEnvironment (for table jobs) */
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

/** Gets (and returns as string) the execution plan for the job from the
* StreamExecutionEnvironment.
* @return
* String
*/
def getExecutionPlan: String = env.getExecutionPlan

/** Executes the job graph.
* @return
* JobExecutionResult
*/
def execute: JobExecutionResult = env.execute(config.jobName)

config.showConfig match {
case ShowConfigOption.None => ()
case ShowConfigOption.Concise =>
Expand All @@ -35,6 +63,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
}

/** Invoke a job by name. Must be provided by an implementing class.
*
* @param jobName
* the job name
*/
Expand Down Expand Up @@ -125,14 +154,15 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
/** Helper method to resolve the source configuration. Implementers can
* override this method to customize source configuration behavior, in
* particular, the deserialization schemas used by flink runner.
*
* @param sourceName
* source name
* @return
* SourceConfig
*/
def getSourceConfig(
sourceName: String = getDefaultSourceName): SourceConfig[ADT] =
SourceConfig[ADT](sourceName, config)
SourceConfig[ADT](sourceName, config, generatorFactoryOpt)

/** Helper method to convert a source config into a json-encoded source
* data stream.
Expand All @@ -154,12 +184,13 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
env.fromCollection(mockEvents).name(lbl).uid(lbl)
case _ =>
sourceConfig match {
case s: FileSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KafkaSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KinesisSourceConfig[ADT] => s.getSourceStream[E](env)
case s: RabbitMQSourceConfig[ADT] => s.getSourceStream[E](env)
case s: SocketSourceConfig[ADT] => s.getSourceStream[E](env)
case s: HybridSourceConfig[ADT] => s.getSourceStream[E](env)
case s: FileSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KafkaSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KinesisSourceConfig[ADT] => s.getSourceStream[E](env)
case s: RabbitMQSourceConfig[ADT] => s.getSourceStream[E](env)
case s: SocketSourceConfig[ADT] => s.getSourceStream[E](env)
case s: HybridSourceConfig[ADT] => s.getSourceStream[E](env)
case s: GeneratorSourceConfig[ADT] => s.getSourceStream[E](env)
}
}
}
Expand All @@ -186,7 +217,8 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
sourceConfig: SourceConfig[ADT])(implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): DataStream[E] =
fromKV: EmbeddedAvroRecordInfo[A] => E): DataStream[E] = {

checkResultsOpt
.map(c => c.getInputEvents[E](sourceConfig.name))
.getOrElse(Seq.empty[E]) match {
Expand All @@ -195,19 +227,23 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
env.fromCollection(mockEvents).name(lbl).uid(lbl)
case _ =>
sourceConfig match {
case s: FileSourceConfig[ADT] => s.getAvroSourceStream[E, A](env)
case s: KafkaSourceConfig[ADT] =>
case s: FileSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KafkaSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KinesisSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: KinesisSourceConfig[ADT] =>
case s: RabbitMQSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: RabbitMQSourceConfig[ADT] =>
case s: SocketSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: SocketSourceConfig[ADT] =>
case s: HybridSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
case s: HybridSourceConfig[ADT] =>
case s: GeneratorSourceConfig[ADT] =>
s.getAvroSourceStream[E, A](env)
}
}
}

// ********************** SINKS **********************

Expand All @@ -229,6 +265,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
configToSink[E](stream, getSinkConfig(sinkName))

/** Create an avro-encoded stream sink from configuration.
*
* @param stream
* the data stream to send to the sink
* @param sinkName
Expand All @@ -254,6 +291,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](

/** Usually, we should write to the sink, unless we have a non-empty
* CheckResults configuration that determines otherwise.
*
* @return
* true to write to the sink, false otherwise
*/
Expand Down
30 changes: 18 additions & 12 deletions src/main/scala/io/epiphanous/flinkrunner/flink/StreamJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ package io.epiphanous.flinkrunner.flink

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.FlinkRunner
import io.epiphanous.flinkrunner.model.aggregate.{Aggregate, AggregateAccumulator, WindowedAggregationInitializer}
import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, EmbeddedAvroRecordInfo, FlinkConfig, FlinkEvent}
import io.epiphanous.flinkrunner.model.aggregate.{
Aggregate,
AggregateAccumulator,
WindowedAggregationInitializer
}
import io.epiphanous.flinkrunner.model.{
EmbeddedAvroRecord,
EmbeddedAvroRecordInfo,
FlinkConfig,
FlinkEvent
}
import io.epiphanous.flinkrunner.util.StreamUtils.Pipe
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.util.Collector
import squants.Quantity

Expand All @@ -29,9 +37,7 @@ abstract class StreamJob[
ADT <: FlinkEvent: TypeInformation](runner: FlinkRunner[ADT])
extends LazyLogging {

val config: FlinkConfig = runner.config
val env: StreamExecutionEnvironment = runner.env
val tableEnv: StreamTableEnvironment = runner.tableEnv
val config: FlinkConfig = runner.config

def transform: DataStream[OUT]

Expand Down Expand Up @@ -267,8 +273,8 @@ abstract class StreamJob[
in1GetKeyFunc: IN1 => KEY,
in2GetKeyFunc: IN2 => KEY)(implicit
fromKV1: EmbeddedAvroRecordInfo[IN1A] => IN1,
fromKV2: EmbeddedAvroRecordInfo[IN2A] => IN2
): ConnectedStreams[IN1, IN2] = {
fromKV2: EmbeddedAvroRecordInfo[IN2A] => IN2)
: ConnectedStreams[IN1, IN2] = {
val source1 = singleAvroSource[IN1, IN1A](source1Name)
val source2 = singleAvroSource[IN2, IN2A](source2Name)
source1.connect(source2).keyBy[KEY](in1GetKeyFunc, in2GetKeyFunc)
Expand Down Expand Up @@ -324,8 +330,8 @@ abstract class StreamJob[
controlGetKeyFunc: CONTROL => KEY,
dataGetKeyFunc: DATA => KEY)(implicit
fromKVControl: EmbeddedAvroRecordInfo[CONTROLA] => CONTROL,
fromKVData: EmbeddedAvroRecordInfo[DATAA] => DATA
): DataStream[DATA] = {
fromKVData: EmbeddedAvroRecordInfo[DATAA] => DATA)
: DataStream[DATA] = {
val controlLockoutDuration =
config.getDuration("control.lockout.duration").toMillis
implicit val eitherTypeInfo: TypeInformation[Either[CONTROL, DATA]] =
Expand Down Expand Up @@ -517,7 +523,7 @@ abstract class StreamJob[
val stream = transform |# maybeSink

if (config.showPlan)
logger.info(s"\nPLAN:\n${env.getExecutionPlan}\n")
logger.info(s"\nPLAN:\n${runner.getExecutionPlan}\n")

runner.checkResultsOpt match {

Expand All @@ -533,7 +539,7 @@ abstract class StreamJob[
)

case _ =>
val result = env.execute(config.jobName)
val result = runner.execute
logger.info(result.toString)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,20 @@ trait BasePropGenerators {
def genOneWith[T](arb: Arbitrary[T]): T =
genOne[T](arb)

def genStreamWith[T](arb: Arbitrary[T]): Stream[T] = genStream(arb)

def genOne[T](implicit arb: Arbitrary[T]): T = genPop[T](1).head

def genStream[T](implicit arb: Arbitrary[T]): Stream[T] =
Stream
.from(0)
.flatMap(_ => arb.arbitrary.sample)

def genPop[T](
mean: Int = 10,
sd: Double = 0
)(implicit arb: Arbitrary[T]): List[T] =
Stream
.from(0)
.map(_ => arb.arbitrary.sample)
.filter(_.nonEmpty)
genStream[T]
.take(((Random.nextGaussian() - 0.5) * sd + mean).round.toInt)
.flatten
.toList
}
15 changes: 8 additions & 7 deletions src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.{Config, ConfigFactory, ConfigObject, ConfigOriginFactory}
import com.typesafe.config.{
Config,
ConfigFactory,
ConfigObject,
ConfigOriginFactory
}
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.FileUtils.getResourceOrFile
Expand Down Expand Up @@ -184,13 +189,9 @@ class FlinkConfig(args: Array[String], optConfig: Option[String] = None)
lazy val isStage: Boolean = environment.startsWith("stag")
lazy val isProd: Boolean = environment.startsWith("prod")

private[flinkrunner] def configureStreamExecutionEnvironment
private[flinkrunner] def getStreamExecutionEnvironment
: StreamExecutionEnvironment = {
val env =
if (isDev)
StreamExecutionEnvironment.createLocalEnvironment(1)
else
StreamExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// maybe disable generic types (prevents kyro serialization fallback)
if (disableGenericTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ object FlinkConnectorName extends Enum[FlinkConnectorName] {

case object RabbitMQ extends FlinkConnectorName

case object Generator extends FlinkConnectorName

val sources: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(CassandraSink, ElasticsearchSink)
val sinks: immutable.Seq[FlinkConnectorName] =
values diff IndexedSeq(Hybrid)
values diff IndexedSeq(Hybrid, Generator)

def fromSourceName(
sourceName: String,
Expand Down

0 comments on commit 8fb811a

Please sign in to comment.