Skip to content

Commit

Permalink
add simple stream node inspection for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Apr 11, 2023
1 parent 3db983d commit 9646809
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 222 deletions.
32 changes: 19 additions & 13 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.AvroSchemaSerializer
import org.apache.flink.streaming.api.graph.StreamGraph
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.data.RowData
Expand Down Expand Up @@ -37,7 +38,8 @@ import scala.reflect.runtime.{universe => ru}
abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
val config: FlinkConfig,
val checkResultsOpt: Option[CheckResults[ADT]] = None,
val generatorFactoryOpt: Option[GeneratorFactory[ADT]] = None)
val generatorFactoryOpt: Option[GeneratorFactory[ADT]] = None,
val executeJob: Boolean = true)
extends LazyLogging {

val env: StreamExecutionEnvironment =
Expand All @@ -57,6 +59,20 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
*/
def getExecutionPlan: String = env.getExecutionPlan

/** Get the stream graph for the configured job. This is primarily useful
* for testing the stream jobs constructed in flinkrunner. It will throw
* an exception if you call it before running a job against this runner.
* If you only are interested in the stream graph and don't need the job
* to be executed, you can set executeJob = false when constructing the
* FlinkRunner instance.
* @return
* JobGraph
*/
def getStreamGraph: StreamGraph = env.getStreamGraph(false)

def getStreamNodesInfo: Seq[StreamNodeInfo] =
StreamNodeInfo.from(getStreamGraph)

/** Executes the job graph.
* @return
* JobExecutionResult
Expand Down Expand Up @@ -232,7 +248,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
*
* @param sourceConfig
* the source config
* @param fromRow
* @param fromRowData
* an implicit method to convert a Row into an event of type E
* @tparam E
* the event data type
Expand Down Expand Up @@ -266,17 +282,7 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
def addSink[E <: ADT: TypeInformation](
stream: DataStream[E],
sinkName: String): Unit =
getSinkConfig(sinkName) match {
case s: CassandraSinkConfig[ADT] => s.addSink[E](stream)
case s: ElasticsearchSinkConfig[ADT] => s.addSink[E](stream)
case s: FileSinkConfig[ADT] => s.addSink[E](stream)
case s: JdbcSinkConfig[ADT] => s.addSink[E](stream)
case s: KafkaSinkConfig[ADT] => s.addSink[E](stream)
case s: KinesisSinkConfig[ADT] => s.addSink[E](stream)
case s: RabbitMQSinkConfig[ADT] => s.addSink[E](stream)
case s: SocketSinkConfig[ADT] => s.addSink[E](stream)
case s: IcebergSinkConfig[ADT] => s.addSink[E](stream)
}
getSinkConfig(sinkName).addSink[E](stream)

def addAvroSink[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
Expand Down
45 changes: 27 additions & 18 deletions src/main/scala/io/epiphanous/flinkrunner/flink/StreamJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.{GenericRowData, RowData}
import org.apache.flink.util.Collector
import squants.Quantity

Expand Down Expand Up @@ -58,9 +58,12 @@ abstract class StreamJob[
IN <: ADT with EmbeddedRowType: TypeInformation](
seq: Seq[IN] = Seq.empty,
name: Option[String] = None)(implicit
fromRowData: RowData => IN): DataStream[IN] = if (seq.nonEmpty)
runner.env.fromCollection[IN](seq)
else singleRowSource[IN](name.getOrElse(runner.getDefaultSourceName))
fromRowData: RowData => IN): DataStream[IN] = if (seq.nonEmpty) {
val rd = seq.zipWithIndex.map { case (_, i) =>
GenericRowData.of(Integer.valueOf(i))
}
runner.env.fromCollection(rd).map(g => seq(g.getInt(0)))
} else singleRowSource[IN](name.getOrElse(runner.getDefaultSourceName))

/** Configure a single input source stream.
* @param name
Expand Down Expand Up @@ -330,6 +333,8 @@ abstract class StreamJob[
def maybeSink(out: DataStream[OUT]): Unit =
if (runner.writeToSink) sink(out)

def buildJobGraph: DataStream[OUT] = transform |# maybeSink

/** Runs the job, meaning it constructs the flow and executes it.
*/
def run(): Unit = {
Expand All @@ -339,27 +344,31 @@ abstract class StreamJob[
)

// build the job graph
val stream = transform |# maybeSink
val stream = buildJobGraph

if (config.showPlan)
logger.info(s"\nPLAN:\n${runner.getExecutionPlan}\n")

runner.checkResultsOpt match {
if (runner.executeJob) {
runner.checkResultsOpt match {

case Some(checkResults) =>
logger.info(
s"routing job ${config.jobName} results back through CheckResults<${checkResults.name}>"
)
checkResults.checkOutputEvents[OUT](
stream.executeAndCollect(
config.jobName,
checkResults.collectLimit
case Some(checkResults) =>
logger.info(
s"routing job ${config.jobName} results back through CheckResults<${checkResults.name}>.checkOutputEvents"
)
checkResults.checkOutputEvents[OUT](
stream.executeAndCollect(
config.jobName,
checkResults.collectLimit
)
)
)

case _ =>
val result = runner.execute
logger.info(result.toString)
case _ =>
val result = runner.execute
logger.info(result.toString)
}
} else {
logger.info("NOT EXECUTING JOB GRAPH")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ sealed trait FlinkConnectorName extends EnumEntry with Snakecase
object FlinkConnectorName extends Enum[FlinkConnectorName] {
val values: immutable.IndexedSeq[FlinkConnectorName] = findValues

case object Empty extends FlinkConnectorName
case object Hybrid extends FlinkConnectorName

case object Kinesis extends FlinkConnectorName
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.epiphanous.flinkrunner.model

import org.apache.flink.streaming.api.graph.StreamGraph

import scala.collection.JavaConverters._

case class StreamNodeInfo(
id: Int,
name: String,
parallelism: Int,
inClasses: List[String],
outClass: Option[String]) {
val isSource: Boolean = inClasses.isEmpty
val isSink: Boolean = outClass.isEmpty
val isTransform: Boolean = !isSource && !isSink
val nodeKind: String =
if (isSource) "source" else if (isTransform) "transform" else "sink"
val simpleInClasses: List[String] =
inClasses.map(_.split("\\.").last)
val simpleOutClass: Option[String] = outClass.map(_.split("\\.").last)
}

object StreamNodeInfo {
def from(sg: StreamGraph): Seq[StreamNodeInfo] = {
sg.getStreamNodes.asScala.map { sn =>
val id = sn.getId
val name = sn.getOperatorName
val parallelism = sn.getParallelism
val inClasses = sn.getTypeSerializersIn.toList.map(
_.createInstance().getClass.getCanonicalName
)
val outClass =
Option(sn.getTypeSerializerOut)
.map(_.createInstance().getClass.getCanonicalName)
StreamNodeInfo(
id = id,
name = name,
parallelism = parallelism,
inClasses = inClasses,
outClass = outClass
)
}.toSeq
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.epiphanous.flinkrunner.model.source

import io.epiphanous.flinkrunner.model._
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.table.data.{GenericRowData, RowData}

case class EmptySourceConfig[ADT <: FlinkEvent](
name: String,
config: FlinkConfig)
extends SourceConfig[ADT] {
override def connector: FlinkConnectorName = FlinkConnectorName.Empty

def _emptySource[E: TypeInformation](
env: StreamExecutionEnvironment): DataStream[E] = {
val x = env.fromCollection(Seq.empty[E])
x
}

override def getSourceStream[E <: ADT: TypeInformation](
env: StreamExecutionEnvironment): DataStream[E] =
_emptySource[E](env)

override def getAvroSourceStream[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
env: StreamExecutionEnvironment)(implicit
fromKV: EmbeddedAvroRecordInfo[A] => E): DataStream[E] =
_emptySource[E](env)

override def getRowSourceStream[
E <: ADT with EmbeddedRowType: TypeInformation](
env: StreamExecutionEnvironment)(implicit
fromRowData: RowData => E): DataStream[E] =
_emptySource[GenericRowData](env).map(fromRowData)

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ object SourceConfig {
config.jobName,
config.getStringOpt(s"sources.$name.connector")
) match {
case Empty => EmptySourceConfig[ADT](name, config)
case File => FileSourceConfig[ADT](name, config)
case Hybrid => HybridSourceConfig[ADT](name, config)
case Kafka => KafkaSourceConfig[ADT](name, config)
Expand Down
Loading

0 comments on commit 9646809

Please sign in to comment.