Skip to content

Commit

Permalink
Merge 932a2bc into cb0a1a6
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Apr 30, 2019
2 parents cb0a1a6 + 932a2bc commit a78fee7
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 23 deletions.
16 changes: 16 additions & 0 deletions codefeedr-core/src/main/scala/org/codefeedr/Properties.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ class Properties(private val contents: Map[String, String] = Map()) {
def set[T](key: String, value: T)(implicit convert: T => String): Properties =
new Properties(contents + (key -> value))

/** Merges two Properties object.
*
* @param properties the second properties object. These properties will override the keys of *this*.
* @return new properties.
*/
def merge(properties: Properties): Properties = {
val keys = properties.keys()
var newProps = this

for (key <- keys) {
newProps = newProps.set(key, properties.get(key).get)
}

newProps
}

/**
* Get a set if keys in this properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ abstract class Buffer[T <: Serializable with AnyRef: ClassTag: TypeTag](

Serializer.getSerde[T](serializer)
}

/** Returns the properties of this buffer. */
def getProperties = properties
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ class BufferFactory[+In <: Serializable with AnyRef,
pipeline.bufferType match {
case BufferType.Kafka => {
val cleanedSubject = subject.replace("$", "-")
new KafkaBuffer[T](pipeline,
pipeline.bufferProperties,
cleanedSubject,
groupIdFinal)
new KafkaBuffer[T](
pipeline,
pipeline.bufferProperties.merge(stage.getContext.stageProperties),
cleanedSubject,
groupIdFinal)
}
case x if BufferFactory.registry.exists(_._1 == x) => {
val tt = typeTag[T]
Expand All @@ -80,16 +81,22 @@ class BufferFactory[+In <: Serializable with AnyRef,
.get
.runtimeClass
.getConstructors()(0)
.newInstance(pipeline, pipeline.bufferProperties, subject, ct, tt)
.newInstance(
pipeline,
pipeline.bufferProperties.merge(stage.getContext.stageProperties),
subject,
ct,
tt)
.asInstanceOf[Buffer[T]]
}
case _ => {
//Switch to Kafka.
val cleanedSubject = subject.replace("$", "-")
new KafkaBuffer[T](pipeline,
pipeline.bufferProperties,
cleanedSubject,
groupIdFinal)
new KafkaBuffer[T](
pipeline,
pipeline.bufferProperties.merge(stage.getContext.stageProperties),
cleanedSubject,
groupIdFinal)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ object KafkaBuffer {
val AMOUNT_OF_REPLICAS = "AMOUNT_OF_REPLICAS"
val COMPRESSION_TYPE = "compression.type"

//OFFSETS
val START_POSITION = "START_POSITION"
val START_TIMESTAMP = "START_TIMESTAMP"
val GROUP_OFFSETS = "GROUP_OFFSETS"
val TIMESTAMP = "TIMESTAMP"
val LATEST = "LATEST"
val EARLIEST = "EARLIEST"

}

/** The implementation for the Kafka buffer. This buffer is the default.
Expand Down Expand Up @@ -96,6 +104,10 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
val AMOUNT_OF_PARTITIONS = 1
val AMOUNT_OF_REPLICAS = 1
val COMPRESSION_TYPE = "none"

//OFFSETS
val START_POSITION = KafkaBuffer.GROUP_OFFSETS
val START_TIMESTAMP = 0x0
}

/** Get a Kafka Consumer as source for a stage.
Expand All @@ -111,9 +123,27 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
.get[String](KafkaBuffer.BROKER)
.getOrElse(KafkaBufferDefaults.BROKER))

val kafkaConsumer =
new FlinkKafkaConsumer[T](topic, serde, getKafkaProperties)
val startPosition = properties.getOrElse[String](
KafkaBuffer.START_POSITION,
KafkaBufferDefaults.START_POSITION)

/** Configure the starting point FromGroupOffsets. */
startPosition match {
case KafkaBuffer.EARLIEST => kafkaConsumer.setStartFromEarliest()
case KafkaBuffer.LATEST => kafkaConsumer.setStartFromLatest()
case KafkaBuffer.TIMESTAMP =>
kafkaConsumer.setStartFromTimestamp(
properties.getOrElse[Long](
KafkaBuffer.START_TIMESTAMP,
KafkaBufferDefaults.START_TIMESTAMP)(_.toLong))
case KafkaBuffer.GROUP_OFFSETS => kafkaConsumer.setStartFromGroupOffsets()
case _ => kafkaConsumer.setStartFromGroupOffsets()
}

// Add a source.
pipeline.environment.addSource(
new FlinkKafkaConsumer[T](topic, serde, getKafkaProperties))
pipeline.environment.addSource(kafkaConsumer)
}

/** Get a Kafka Producer as sink to the buffer.
Expand Down
26 changes: 14 additions & 12 deletions codefeedr-core/src/main/scala/org/codefeedr/pipeline/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ case class Pipeline(var name: String,
graph: DirectedAcyclicGraph,
objectProperties: Map[String, Properties]) {

/** Prepare the pipeline by adding this instance to every stage.*/
def prepare() = {
val nodes = getNodes

// Run all setups.
for (obj <- nodes) {
obj.setUp(this)
}
}

/** Run this immediately. */
prepare()

/** The mutable StreamExecutionEnvironment. */
var _environment: StreamExecutionEnvironment = null

Expand Down Expand Up @@ -215,11 +228,6 @@ case class Pipeline(var name: String,

val nodes = getNodes

// Run all setups.
for (nodes <- nodes) {
nodes.setUp(this)
}

// Connect each object by getting a starting buffer, if any, and sending it to the next.
var buffer: DataStream[Serializable with AnyRef] = null
for (obj <- nodes) {
Expand All @@ -236,11 +244,6 @@ case class Pipeline(var name: String,
def startLocal(): Unit = {
val nodes = getNodes

// Run all setups.
for (obj <- nodes) {
obj.setUp(this)
}

// For each PO, make buffers and run.
for (obj <- nodes) {
runStage(obj)
Expand Down Expand Up @@ -273,8 +276,7 @@ case class Pipeline(var name: String,
val obj = optObj.get
.asInstanceOf[Stage[Serializable with AnyRef, Serializable with AnyRef]]

// Setup and run object.
obj.setUp(this)
// Run object.
runStage(obj, groupId)

// Run stage in one environment.
Expand Down
27 changes: 27 additions & 0 deletions codefeedr-core/src/test/scala/org/codefeedr/PropertiesTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,31 @@ class PropertiesTest extends FunSuite {
assert(props.has("foo"))
assert(!props.has("bar"))
}

test("Two property maps should be able to merge") {
var props = new Properties()
var props2 = new Properties()

props = props.set("ha", "na")
props2 = props2.set("bla", "na")

val finalProps = props.merge(props2)

assert(finalProps.has("ha"))
assert(finalProps.has("bla"))
}

test("Two property maps should be able to merge and overridden") {
var props = new Properties()
var props2 = new Properties()

props = props.set("ha", "na")
props2 = props2.set("ha", "ba")

val finalProps = props.merge(props2)

assert(finalProps.has("ha"))
assert(!finalProps.has("bla"))
assert(finalProps.get("ha").get.equals("ba"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,50 @@ class BufferFactoryTest extends FunSuite with BeforeAndAfter {
assert(buffer.isInstanceOf[KafkaBuffer[StringType]])
}

test("Buffer properties and stage properties should be merged.") {
val pipeline = new PipelineBuilder()
.setBufferType(BufferType.Kafka)
.append(nodeA)
.setBufferProperty("a", "val")
.setStageProperty(nodeA, "b", "val")
.setStageProperty(nodeB, "c", "val")
.append(nodeB)
.build()

val factory = new BufferFactory(pipeline, nodeA, nodeB)

// created for nodeB sink, so should have subject of nodeB
val nodeSubject = nodeB.getContext.stageId
val buffer = factory.create[StringType]()

assert(buffer.getProperties.has("a"))
assert(buffer.getProperties.has("b"))
assert(!buffer.getProperties.has("c"))
}

test("Stage properties should override buffer properties.") {
val pipeline = new PipelineBuilder()
.setBufferType(BufferType.Kafka)
.append(nodeA)
.setBufferProperty("a", "val")
.setStageProperty(nodeA, "b", "val")
.setStageProperty(nodeB, "c", "val")
.setBufferProperty("b", "no")
.append(nodeB)
.build()

val factory = new BufferFactory(pipeline, nodeA, nodeB)

// created for nodeB sink, so should have subject of nodeB
val nodeSubject = nodeB.getContext.stageId
val buffer = factory.create[StringType]()

assert(buffer.getProperties.has("a"))
assert(buffer.getProperties.has("b"))
assert(!buffer.getProperties.has("c"))
assert(buffer.getProperties.get("b").get.equals("val"))
}

test("Register your own buffer.") {
BufferFactory.register[DummyBuffer[_]]("my_buffer!")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class KafkaBufferTest

//setup simple kafkabuffer
val pipeline = new PipelineBuilder()
.setBufferProperty(KafkaBuffer.START_POSITION, KafkaBuffer.LATEST)
.append(new SimpleSourceStage())
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE_HOST,
s"redis://localhost:$redisPort")
Expand Down Expand Up @@ -118,6 +119,7 @@ class KafkaBufferTest
val numberInput = new NumberInput()

val pipeline = new PipelineBuilder()
.setBufferProperty(KafkaBuffer.START_POSITION, KafkaBuffer.EARLIEST)
.append(numberInput) //pushes 1 till 50
.append(numberOutput) //reads and crashes
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class PipelineTest
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE, "true")
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE_SERVICE, "zookeeper")
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE_HOST, "localhost:2181")
.setBufferProperty(KafkaBuffer.START_POSITION, KafkaBuffer.TIMESTAMP)
.setBufferProperty(KafkaBuffer.START_TIMESTAMP, "0")
.build()

assertThrows[JobExecutionException] {
Expand Down
19 changes: 19 additions & 0 deletions docs/pages/mydoc/mydoc_buffer.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,29 @@ pipelineBuilder
.setBufferProperty(KafkaBuffer.BROKER, "localhost:9092")
```

To set buffer properties on _stage level_ use stage properties `.setStageProperty(stageInstance, key, value)`. This will override a global buffer property.

**Note:** in case of the Kafka buffer all these properties will be
propagated to the Kafka producer/consumer, see the [Kafka
documentation](https://kafka.apache.org/documentation/#configuration)
for specifics.

### Kafka Start position
To set the start position for **all** stages in a pipeline it can be configured as buffer property:

- Group offsets (default): `setBufferProperty(KafkaBuffer.START_POSITION, KafkaBuffer.GROUP_OFFSETS)`
- Latest: `setBufferProperty(KafkaBuffer.START_POSITION, KafkaBuffer.LATEST)`
- Earliest: `setBufferProperty(KafkaBuffer.START_POSITION, KafkaBuffer.EARLIEST)`
- Timestamp: `setBufferProperty(KafkaBuffer.START_POSITION, KafkaBuffer.TIMESTAMP)` and `setBufferProperty(KafkaBuffer.START_TIMESTAMP, "START_TIMESTAMP_HERE")`

To set the start position on **stage** level, you need to use stage properties:
- Group offsets (default): `setStageProperty(stageInstance, KafkaBuffer.START_POSITION, KafkaBuffer.GROUP_OFFSETS)`
- Latest: `setStageProperty(stageInstance, KafkaBuffer.START_POSITION, KafkaBuffer.LATEST)`
- Earliest: `setStageProperty(stageInstance, KafkaBuffer.START_POSITION, KafkaBuffer.EARLIEST)`
- Timestamp: `setStageProperty(stageInstance, KafkaBuffer.START_POSITION, KafkaBuffer.TIMESTAMP)` and `setBufferProperty(KafkaBuffer.START_TIMESTAMP, "START_TIMESTAMP_HERE")`

This will set the start position of the Kafka consumer of the `stageInstance`. I.e. if set to `LATEST`, `stageInstance` will start consuming from the latest entry of the stage it is linked to.

### Default properties

#### Kafka
Expand Down
2 changes: 2 additions & 0 deletions docs/pages/mydoc/mydoc_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ getContext.getStageProperties("the_key")
This will return `Some("the_value")`, if the key is unknown `None` will
be returned.

**Note**: Stage properties are also added to the _buffer properties_ of the buffer belonging to that stage. So to override buffer properties on _stage level_, use stage properties.

### Start the pipeline
If the Pipeline is properly build using the PipelineBuilder, it can be
started in three modes:
Expand Down

0 comments on commit a78fee7

Please sign in to comment.