Skip to content

Commit

Permalink
Merge 70db93c into cb0a1a6
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Apr 29, 2019
2 parents cb0a1a6 + 70db93c commit b8bd962
Showing 1 changed file with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ object KafkaBuffer {
val AMOUNT_OF_REPLICAS = "AMOUNT_OF_REPLICAS"
val COMPRESSION_TYPE = "compression.type"

//OFFSETS
val START_POSITION = "START_POSITION"
val START_TIMESTAMP = "START_TIMESTAMP"
val GROUP_OFFSETS = "GROUP_OFFSETS"
val TIMESTAMP = "TIMESTAMP"
val LATEST = "LATEST"
val EARLIEST = "EARLIEST"

}

/** The implementation for the Kafka buffer. This buffer is the default.
Expand Down Expand Up @@ -96,6 +104,10 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
val AMOUNT_OF_PARTITIONS = 1
val AMOUNT_OF_REPLICAS = 1
val COMPRESSION_TYPE = "none"

//OFFSETS
val START_POSITION = KafkaBuffer.GROUP_OFFSETS
val START_TIMESTAMP = 0x0
}

/** Get a Kafka Consumer as source for a stage.
Expand All @@ -111,9 +123,27 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
.get[String](KafkaBuffer.BROKER)
.getOrElse(KafkaBufferDefaults.BROKER))

val kafkaConsumer =
new FlinkKafkaConsumer[T](topic, serde, getKafkaProperties)
val startPosition = properties.getOrElse[String](
KafkaBuffer.START_POSITION,
KafkaBufferDefaults.START_POSITION)

/** Configure the starting point FromGroupOffsets. */
startPosition match {
case KafkaBuffer.EARLIEST => kafkaConsumer.setStartFromEarliest()
case KafkaBuffer.LATEST => kafkaConsumer.setStartFromLatest()
case KafkaBuffer.TIMESTAMP =>
kafkaConsumer.setStartFromTimestamp(
properties.getOrElse[Long](
KafkaBuffer.START_POSITION,
KafkaBufferDefaults.START_TIMESTAMP)(_.toLong))
case KafkaBuffer.GROUP_OFFSETS => kafkaConsumer.setStartFromGroupOffsets()
case _ => kafkaConsumer.setStartFromGroupOffsets()
}

// Add a source.
pipeline.environment.addSource(
new FlinkKafkaConsumer[T](topic, serde, getKafkaProperties))
pipeline.environment.addSource(kafkaConsumer)
}

/** Get a Kafka Producer as sink to the buffer.
Expand Down

0 comments on commit b8bd962

Please sign in to comment.