Skip to content

Commit

Permalink
Reworked verification a bit and added toggle to pipelinebuilder.
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Feb 20, 2019
1 parent 667af31 commit 536f7c8
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ final class DirectedAcyclicGraph(val nodes: Vector[AnyRef] = Vector(),
* @throws InvalidPipelineException if the pipeline is invalid (invalid edges).
* @throws StageTypesIncompatibleException if types within the pipeline are invalid.
*/
def verifyGraph() = {
def verify() = {
// Get all input stages nodes.
val nodesInputStages = nodes.filter(x => x.isInstanceOf[InputStage[_]])

Expand Down Expand Up @@ -229,20 +229,19 @@ final class DirectedAcyclicGraph(val nodes: Vector[AnyRef] = Vector(),
// Get output type of this stage.
val stageOutputType = n
.asInstanceOf[Stage[_, _]]
.getOutType
.getName
.outType
.toString

// Get all the input types of this stage as a set.
val inputTypes = n.getClass.getGenericSuperclass
.asInstanceOf[ParameterizedType]
.getActualTypeArguments
.map(_.getTypeName)
.filter(_ != stageOutputType)
val inputTypes = n
.asInstanceOf[Stage[_, _]]
.inTypes
.map(_.toString)

// Get all the types of the incoming edges (which is the output type of the other stage).
val edgesOutputTypes =
getParents(n)
.map(_.asInstanceOf[Stage[_, _]].getOutType.getName)
.map(_.asInstanceOf[Stage[_, _]].outType.toString)

/**
* The amount of inputTypes should be equal to the amount of incoming edges.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class PipelineBuilder extends Logging {
/** Key manager */
protected var keyManager: KeyManager = _

/** Pipeline verification toggle, defaults to true **/
protected var pipelineVerificationToggle: Boolean = true

/** The StreamTimeCharacteristic. Default: [[TimeCharacteristic.EventTime]]*/
protected var streamTimeCharacteristic: TimeCharacteristic =
TimeCharacteristic.EventTime
Expand Down Expand Up @@ -100,6 +103,17 @@ class PipelineBuilder extends Logging {
*/
def getPipelineType: PipelineType = pipelineType

/** Disables the pipeline verification.
* This is not recommended, it allows for nasty pipelines.
*
* @return The builder instance.
*/
def disablePipelineVerification(): PipelineBuilder = {
this.pipelineVerificationToggle = false

this
}

/** Set the type of the pipeline.
*
* @param pipelineType Type of the pipeline.
Expand Down Expand Up @@ -385,13 +399,21 @@ class PipelineBuilder extends Logging {
def build(): Pipeline = {
if (graph.isEmpty) throw EmptyPipelineException()

// Correctly map and verify graph.
// Correctly map and verify graph for every node.
graph.nodes
.foreach(
_.asInstanceOf[Stage[Serializable with AnyRef,
Serializable with AnyRef]]
.verifyGraph(graph))

// This will verify the graph in terms of types.
if (pipelineVerificationToggle) {
graph.verify()
} else {
logger.warn(
"Pipeline verification has been disabled manually. No type guarantee between stages can be given. Consider enabling it again.")
}

logger.info(
s"Created pipeline with ${graph.nodes.size} nodes and ${graph.edges.size} edges.")
logger.info(
Expand Down
12 changes: 6 additions & 6 deletions codefeedr-core/src/main/scala/org/codefeedr/pipeline/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ protected[codefeedr] abstract class Stage[
Out <: Serializable with AnyRef: ClassTag: TypeTag](
val stageId: Option[String] = None) {

/** Keep track of all incoming types. **/
var inTypes: List[Type] = typeOf[In] :: Nil

/** Keep track of the outgoing type. **/
val outType: Type = typeOf[Out]

/** The pipeline this stage belongs to. */
var pipeline: Pipeline = _

Expand All @@ -45,12 +51,6 @@ protected[codefeedr] abstract class Stage[
/** Get the id of this stage */
def id: String = stageId.getOrElse(getClass.getName)

/** Get the type of IN */
def getInType = classTag[In].runtimeClass.asInstanceOf[Class[In]]

/** Get the type of OUT */
def getOutType = classTag[Out].runtimeClass.asInstanceOf[Class[Out]]

/** Get the properties of this stage.
*
* @return The properties of this stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Out <: Serializable with AnyRef: ClassTag: TypeTag](
stageId: Option[String] = None)
extends Stage[In, Out](stageId) {

this.inTypes = typeOf[In2] :: this.inTypes

/** Transforms from type In to type Out.
*
* @param source The input source with type In.
Expand Down Expand Up @@ -84,6 +86,8 @@ Out <: Serializable with AnyRef: ClassTag: TypeTag](
stageId: Option[String] = None)
extends Stage2[In, In2, Out](stageId) {

this.inTypes = typeOf[In3] :: this.inTypes

/** Transforms from type In and In2 to type Out.
*
* @param source The input source with type In.
Expand Down Expand Up @@ -142,6 +146,8 @@ Out <: Serializable with AnyRef: ClassTag: TypeTag](
stageId: Option[String] = None)
extends Stage3[In, In2, In3, Out](stageId) {

this.inTypes = typeOf[In3] :: this.inTypes

/** Transforms from type In, In2, In3 to type Out.
*
* @param source The input source with type In.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class DirectedAcyclicGraphTest extends FunSuite {
.addEdge(stageOne, stageTwo)

assertThrows[StageTypesIncompatibleException] {
dag.verifyGraph()
dag.verify()
}
}

Expand All @@ -286,7 +286,7 @@ class DirectedAcyclicGraphTest extends FunSuite {
.addNode(stageTwo)
.addEdge(stageOne, stageTwo)

dag.verifyGraph()
dag.verify()
}

test("Verify that graph is invalid; inputstage has incoming edge.") {
Expand All @@ -298,7 +298,7 @@ class DirectedAcyclicGraphTest extends FunSuite {
.addEdge(stageOne, stageOne)

assertThrows[InvalidPipelineException] {
dag.verifyGraph()
dag.verify()
}
}

Expand All @@ -310,7 +310,7 @@ class DirectedAcyclicGraphTest extends FunSuite {
.addEdge(stageTwo, stageTwo)

assertThrows[InvalidPipelineException] {
dag.verifyGraph()
dag.verify()
}
}

Expand All @@ -325,7 +325,7 @@ class DirectedAcyclicGraphTest extends FunSuite {
.addEdge(stageOne, stageThree) // we only add 1 edge

assertThrows[StageTypesIncompatibleException] {
dag.verifyGraph()
dag.verify()
}
}

Expand All @@ -341,7 +341,7 @@ class DirectedAcyclicGraphTest extends FunSuite {
.addEdge(stageTwo, stageThree)
.addEdge(stageOne, stageThree)

dag.verifyGraph()
dag.verify()
}

test(
Expand All @@ -358,7 +358,7 @@ class DirectedAcyclicGraphTest extends FunSuite {
.addEdge(stageOne, stageThree)

assertThrows[StageTypesIncompatibleException] {
dag.verifyGraph()
dag.verify()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers {
test("Every pipeline object should appear in the pipeline (1)") {
val pipeline = builder
.append(new SimpleSourceStage())
.disablePipelineVerification()
.build()

assert(pipeline.graph.nodes.size == 1)
Expand All @@ -84,6 +85,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers {

val pipeline = builder
.append(stage)
.disablePipelineVerification()
.setStageProperty(stage.id, "key", "value")
.setStageProperty(stage.id, "anotherKey", "true")
.build()
Expand Down Expand Up @@ -263,6 +265,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers {
val c = new SimpleTransformStage()

val pipeline = builder
.disablePipelineVerification()
.addParents(c, a :+ b)
.build()

Expand All @@ -276,6 +279,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers {
val c = new SimpleTransformStage()

val pipeline = builder
.disablePipelineVerification()
.addParents(c, a :+ b)
.addParents(c, a)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class PipelineTest

builder
.append(source)
.disablePipelineVerification()
.setPipelineType(PipelineType.DAG)
.edge(source, a)
.edge(source, b)
Expand All @@ -283,6 +284,7 @@ class PipelineTest
val sink = new SimpleSinkStage(1)

val pipeline = builder
.disablePipelineVerification()
.setBufferType(BufferType.Kafka)
.edge(source, sink)
.build()
Expand All @@ -301,6 +303,7 @@ class PipelineTest
val sink = new SimpleSinkStage(1)

val pipeline = builder
.disablePipelineVerification()
.setBufferType(BufferType.Kafka)
.edge(source, sink)
.build()
Expand All @@ -315,6 +318,7 @@ class PipelineTest
val sink = new SimpleSinkStage(1)

val pipeline = builder
.disablePipelineVerification()
.setBufferType(BufferType.Kafka)
.edge(source, sink)
.build()
Expand All @@ -339,6 +343,7 @@ class PipelineTest

test("Show list of pipeline item ids") {
val pipeline = builder
.disablePipelineVerification()
.append(new StringInput())
.append(new JsonPrinterOutput())
.build()
Expand All @@ -356,6 +361,7 @@ class PipelineTest

test("Show list of pipeline item ids in an exception") {
val pipeline = builder
.disablePipelineVerification()
.append(new StringInput())
.append(new JsonPrinterOutput())
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class StageNTest extends FunSuite with BeforeAndAfterAll with EmbeddedKafka {
val job = new MyObject4()

val builder = new PipelineBuilder()
.disablePipelineVerification()
.setBufferType(BufferType.Kafka)
.addParents(job, a :+ b :+ c :+ d :+ e)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class StageTest extends FunSuite {

test("Should throw when getting unknown main source") {
val pipeline = new PipelineBuilder()
.disablePipelineVerification()
.append(new BadSourceObject())
.build()

Expand All @@ -73,6 +74,7 @@ class StageTest extends FunSuite {

test("Should throw when getting unknown sink") {
val pipeline = new PipelineBuilder()
.disablePipelineVerification()
.append(new BadSinkObject())
.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.functions.sink.{
}
import org.codefeedr.pipeline._
import org.codefeedr.stages.utilities.StringType
import org.codefeedr.stages.OutputStage
import org.codefeedr.stages.{InputStage, OutputStage}

//This will be thrown after the print sink received x elements.
final case class JobFinishedException()
Expand All @@ -41,9 +41,8 @@ final case class CodeHitException() extends RuntimeException

//a simple test source which generates some StringType messages
class SimpleSourceStage(stageId: Option[String] = None)
extends Stage[Nothing, StringType](stageId) {
override def transform(
source: DataStream[Nothing]): DataStream[StringType] = {
extends InputStage[StringType](stageId) {
override def main(): DataStream[StringType] = {
pipeline.environment.addSource {
new RichSourceFunction[StringType] {
override def run(
Expand Down

0 comments on commit 536f7c8

Please sign in to comment.