Skip to content

Commit

Permalink
Merge pull request #230 from codefeedr/improve_rmq
Browse files Browse the repository at this point in the history
Make username and password of RMQ configurable.
  • Loading branch information
wzorgdrager committed Feb 8, 2020
2 parents 4965795 + 301497a commit 0426936
Show file tree
Hide file tree
Showing 18 changed files with 150 additions and 339 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -28,7 +28,7 @@ ThisBuild / description := "CodeFeedr provides an infrastructure on top of Apach
ThisBuild / licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
ThisBuild / homepage := Some(url("https://github.com/codefeedr/codefeedr"))

ThisBuild / version := "0.1.4"
ThisBuild / version := "0.1.9"
ThisBuild / organization := "org.codefeedr"
ThisBuild / scalaVersion := scala212

Expand Down
3 changes: 3 additions & 0 deletions codefeedr-core/src/main/scala/org/codefeedr/Properties.scala
Expand Up @@ -26,6 +26,9 @@ object Properties {
// Conversion implicits
implicit def stringToBoolean(str: String): Boolean = str.toBoolean
implicit def booleanToString(bool: Boolean): String = bool.toString



}

/** Object containing configuration properties. */
Expand Down
Expand Up @@ -63,12 +63,12 @@ abstract class Buffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
*
* @return The configured serializer.
*/
def getSerializer: AbstractSerde[T] = {
def getSerializer(topic: String = ""): AbstractSerde[T] = {
val serializer =
properties
.getOrElse[String](Buffer.SERIALIZER, Serializer.JSON) //default is JSON

Serializer.getSerde[T](serializer)
Serializer.getSerde[T](serializer, topic)
}

/** Returns the properties of this buffer. */
Expand Down
Expand Up @@ -18,22 +18,17 @@
*/
package org.codefeedr.buffer

import java.util.Properties
import java.util.{Optional, Properties}

import org.apache.avro.reflect.ReflectData
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer
}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.logging.log4j.scala.Logging
import org.codefeedr.buffer.serialization.schema_exposure.{
RedisSchemaExposer,
SchemaExposer,
ZookeeperSchemaExposer
}
import org.codefeedr.buffer.serialization.schema_exposure.{RedisSchemaExposer, SchemaExposer, ZookeeperSchemaExposer}
import org.codefeedr.pipeline.Pipeline

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -67,6 +62,9 @@ object KafkaBuffer {
val LATEST = "LATEST"
val EARLIEST = "EARLIEST"

//SEMANTICS
val SEMANTIC = "SEMANTIC"

}

/** The implementation for the Kafka buffer. This buffer is the default.
Expand All @@ -85,6 +83,8 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
extends Buffer[T](pipeline, properties, topic)
with Logging {

implicit def stringToSemantic(str: String): Semantic = Semantic.valueOf(str)

/** Default settings for the Kafka buffer. */
private object KafkaBufferDefaults {
//KAFKA RELATED
Expand All @@ -108,14 +108,17 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
//OFFSETS
val START_POSITION = KafkaBuffer.GROUP_OFFSETS
val START_TIMESTAMP = 0x0

//SEMANTIC
val SEMANTIC : Semantic = Semantic.AT_LEAST_ONCE
}

/** Get a Kafka Consumer as source for a stage.
*
* @return The DataStream retrieved from the buffer.
*/
override def getSource: DataStream[T] = {
val serde = getSerializer
val serde = getSerializer()

// Make sure the topic already exists, otherwise create it.
checkAndCreateSubject(topic,
Expand Down Expand Up @@ -165,9 +168,12 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
properties
.getOrElse[String](KafkaBuffer.BROKER, KafkaBufferDefaults.BROKER))

// Check preferred partitioning
val semantic = properties.getOrElse[Semantic](KafkaBuffer.SEMANTIC, KafkaBufferDefaults.SEMANTIC)(stringToSemantic)

// Create Kafka producer.
val producer =
new FlinkKafkaProducer[T](topic, getSerializer, getKafkaProperties)
new FlinkKafkaProducer[T](topic, getSerializer(topic), getKafkaProperties, semantic)
producer.setWriteTimestampToKafka(true)

producer
Expand Down
Expand Up @@ -18,24 +18,22 @@
*/
package org.codefeedr.buffer.serialization

import org.apache.flink.api.common.serialization.{
AbstractDeserializationSchema,
SerializationSchema
}
import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema

import scala.reflect.{ClassTag, classTag}

/** Abstract class for a SerDe.
*
* @tparam T Type of the SerDe.
*/
abstract class AbstractSerde[T <: Serializable: ClassTag]()
abstract class AbstractSerde[T <: Serializable: ClassTag](topic: String = "")
extends AbstractDeserializationSchema[T](
TypeExtractor.createTypeInfo(
classTag[T].runtimeClass.asInstanceOf[Class[T]]))
with SerializationSchema[T] {
with SerializationSchema[T] with KafkaSerializationSchema[T] {

// Get type of class
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
Expand Down
Expand Up @@ -18,7 +18,10 @@
*/
package org.codefeedr.buffer.serialization

import java.lang

import com.mongodb.BasicDBObject
import org.apache.kafka.clients.producer.ProducerRecord
import org.bson._
import org.json4s.NoTypeHints
import org.json4s.ext.JavaTimeSerializers
Expand All @@ -32,8 +35,8 @@ import scala.reflect.runtime.universe._
*
* @tparam T Type of the SerDe.
*/
class BsonSerde[T <: Serializable with AnyRef: TypeTag: ClassTag]
extends AbstractSerde[T] {
class BsonSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String = "")
extends AbstractSerde[T](topic) {

// Implicitly and lazily define the serialization to JSON.
implicit lazy val formats = Serialization.formats(NoTypeHints) ++ JavaTimeSerializers.all
Expand All @@ -60,12 +63,22 @@ class BsonSerde[T <: Serializable with AnyRef: TypeTag: ClassTag]
val json = new RawBsonDocument(message).toJson
Serialization.read[T](json)
}

/** Serialize as Kafka Producer Record.
* @return ProducerRecord.
*/
override def serialize(element: T, timestamp: lang.Long) = {
val json = Serialization.write(element)(formats)
encoder.encode(BasicDBObject.parse(json))

new ProducerRecord(topic, serialize(element))
}
}

/** Companion object to simply instantiation of a BSONSerde. */
object BsonSerde {

/** Creates new BSON Serde. */
def apply[T <: Serializable with AnyRef: TypeTag: ClassTag]: BsonSerde[T] =
new BsonSerde[T]()
def apply[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String = ""): BsonSerde[T] =
new BsonSerde[T](topic)
}
Expand Up @@ -18,8 +18,10 @@
*/
package org.codefeedr.buffer.serialization

import java.lang
import java.nio.charset.StandardCharsets

import org.apache.kafka.clients.producer.ProducerRecord
import org.json4s.NoTypeHints
import org.json4s.ext.JavaTimeSerializers
import org.json4s.jackson.Serialization
Expand All @@ -31,8 +33,8 @@ import scala.reflect.runtime.universe._
*
* @tparam T Type of the SerDe.
*/
class JSONSerde[T <: Serializable with AnyRef: TypeTag: ClassTag]
extends AbstractSerde[T] {
class JSONSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String = "")
extends AbstractSerde[T](topic) {

// Implicitly and lazily define the serialization to JSON.
implicit lazy val formats = Serialization.formats(NoTypeHints) ++ JavaTimeSerializers.all
Expand All @@ -55,12 +57,19 @@ class JSONSerde[T <: Serializable with AnyRef: TypeTag: ClassTag]
override def deserialize(message: Array[Byte]): T = {
Serialization.read[T](new String(message, StandardCharsets.UTF_8))
}

/** Serialize as Kafka Producer Record.
* @return ProducerRecord.
*/
override def serialize(element: T, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord(topic, serialize(element))
}
}

/** Companion object to simply instantiation of a JSONSerde. */
object JSONSerde {

/** Creates new JSON Serde. */
def apply[T <: Serializable with AnyRef: ClassTag: TypeTag]: JSONSerde[T] =
new JSONSerde[T]()
def apply[T <: Serializable with AnyRef: ClassTag: TypeTag](topic: String = ""): JSONSerde[T] =
new JSONSerde[T](topic)
}
Expand Up @@ -18,7 +18,10 @@

package org.codefeedr.buffer.serialization

import java.lang

import com.twitter.chill.{Input, KryoBase, Output, ScalaKryoInstantiator}
import org.apache.kafka.clients.producer.ProducerRecord

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
Expand All @@ -27,7 +30,7 @@ import scala.reflect.runtime.universe._
*
* @tparam T Type of the SerDe.
*/
class KryoSerde[T <: Serializable: TypeTag: ClassTag] extends AbstractSerde[T] {
class KryoSerde[T <: Serializable: TypeTag: ClassTag](topic: String = "") extends AbstractSerde[T](topic) {

// Lazily retrieve kryo instance.
private lazy val kryo: KryoBase = getKryo
Expand Down Expand Up @@ -62,13 +65,24 @@ class KryoSerde[T <: Serializable: TypeTag: ClassTag] extends AbstractSerde[T] {
inst.setRegistrationRequired(false)
inst.newKryo()
}

/** Serialize as Kafka Producer Record.
* @return ProducerRecord.
*/
override def serialize(element: T, timestamp: lang.Long) = {
val buffer : Array[Byte] = new Array[Byte](KryoSerde.BUFFER_SIZE)
val output = new Output(buffer)
kryo.writeObject(output, element)

new ProducerRecord(topic, serialize(element))
}
}

/** Companion object to simply instantiation of a KryoSerde. */
object KryoSerde {
val BUFFER_SIZE = 4096

/** Creates new Kryo Serde. */
def apply[T <: Serializable: ClassTag: TypeTag]: KryoSerde[T] =
new KryoSerde[T]()
def apply[T <: Serializable: ClassTag: TypeTag](topic: String = ""): KryoSerde[T] =
new KryoSerde[T](topic)
}
Expand Up @@ -55,11 +55,11 @@ object Serializer extends Enumeration {
* @tparam T the type which has to be serialized/deserialized.
* @return the serde instance.
*/
def getSerde[T <: Serializable with AnyRef: ClassTag: TypeTag](name: String) =
def getSerde[T <: Serializable with AnyRef: ClassTag: TypeTag](name: String, topic: String = "") =
name match {
case "JSON" => JSONSerde[T]
case "BSON" => BsonSerde[T]
case "KRYO" => KryoSerde[T]
case "JSON" => JSONSerde[T](topic)
case "BSON" => BsonSerde[T](topic)
case "KRYO" => KryoSerde[T](topic)
case _ if registry.exists(_._1 == name) => {
val tt = typeTag[T]
val ct = classTag[T]
Expand All @@ -69,10 +69,10 @@ object Serializer extends Enumeration {
.get
.runtimeClass
.getConstructors()(0) // Get constructor of runtime class.
.newInstance(tt, ct) // Provide class and type tags.
.newInstance(topic, tt, ct) // Provide class and type tags.
.asInstanceOf[AbstractSerde[T]] // Create instance of serde.
}
case _ => JSONSerde[T] //default is JSON
case _ => JSONSerde[T](topic) //default is JSON
}

/** Registers a new SerDe. This SerDe needs to be subclassed from [[AbstractSerde]].
Expand Down
Expand Up @@ -24,6 +24,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.codefeedr.Properties
import org.codefeedr.buffer.BufferType.BufferType
import org.codefeedr.buffer.FakeStage
Expand Down
Expand Up @@ -24,13 +24,11 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.logging.log4j.scala.Logging
import org.codefeedr.Properties
import org.codefeedr.buffer.{Buffer, BufferType}
import org.codefeedr.buffer.{Buffer, BufferType, KafkaBuffer}
import org.codefeedr.buffer.BufferType.BufferType
import org.codefeedr.buffer.serialization.Serializer
import org.codefeedr.buffer.serialization.Serializer.SerializerType
Expand Down Expand Up @@ -264,6 +262,17 @@ class PipelineBuilder extends Logging {
this
}

/** Sets the semantic. Default: AT_LEAST_ONCE.
*
* @param semantic the Kafka buffer semantics.
* @return this builder instance.
*/
def setSemantic(semantic: Semantic): PipelineBuilder = {
this.setBufferProperty(KafkaBuffer.SEMANTIC, semantic.toString)

this
}

/** Enable checkpointing for this pipeline.
*
* @param interval The interval to checkpoint on.
Expand Down
Expand Up @@ -32,8 +32,8 @@ class BsonSerdeTest extends FunSuite with BeforeAndAfter {
private var serde2: BsonSerde[ComplexCaseClass] = _

before {
serde = BsonSerde[SimpleCaseClassBson]
serde2 = BsonSerde[ComplexCaseClass]
serde = BsonSerde[SimpleCaseClassBson]()
serde2 = BsonSerde[ComplexCaseClass]()
}

test("Deserializes complex serialized values") {
Expand Down
Expand Up @@ -31,8 +31,8 @@ class KryoSerdeTest extends FunSuite with BeforeAndAfter {
private var serde2: KryoSerde[KryoComplexCaseClass] = _

before {
serde = KryoSerde[KryoSimpleCaseClass]
serde2 = KryoSerde[KryoComplexCaseClass]
serde = KryoSerde[KryoSimpleCaseClass]()
serde2 = KryoSerde[KryoComplexCaseClass]()
}

test("Deserializes complex serialized values") {
Expand Down

0 comments on commit 0426936

Please sign in to comment.