Skip to content

Commit

Permalink
Merge pull request #173 from codefeedr/refactor/properties
Browse files Browse the repository at this point in the history
Rework properties
  • Loading branch information
wzorgdrager committed Feb 19, 2019
2 parents 2a69b2a + fad4d11 commit a79d7f1
Show file tree
Hide file tree
Showing 57 changed files with 127 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import scala.reflect.{ClassTag, classTag}
*
* @param pipeline The pipeline for which we use this Buffer.
* @param properties The properties of this buffer.
* @param relatedStageName The name of the related stage the buffer is linked to. We need this to for instance set the topic id.
* @tparam T Type of this buffer.
*/
abstract class Buffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
pipeline: Pipeline,
properties: org.codefeedr.Properties) {
properties: org.codefeedr.Properties,
relatedStageName: String) {

//Get type of the class at run time.
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class BufferFactory[In <: Serializable with AnyRef,
"Buffer factory requires a sink object to determine buffer location")

/** Get the id to read from or write to. */
val subject = relatedStage.getSinkSubject
val subject = relatedStage.getId

// Create the correct buffer.
pipeline.bufferType match {
Expand All @@ -66,7 +66,6 @@ class BufferFactory[In <: Serializable with AnyRef,
val kafkaGroupId = if (groupId != null) groupId else stage.id
new KafkaBuffer[T](pipeline,
pipeline.bufferProperties,
stage.attributes,
cleanedSubject,
kafkaGroupId)
}
Expand All @@ -80,16 +79,15 @@ class BufferFactory[In <: Serializable with AnyRef,
.get
.runtimeClass
.getConstructors()(0)
.newInstance(pipeline, pipeline.bufferProperties, ct, tt)
.newInstance(pipeline, pipeline.bufferProperties, subject, ct, tt)
.asInstanceOf[Buffer[T]]
}
case _ => {
//Switch to Kafka
//Switch to Kafka.
val cleanedSubject = subject.replace("$", "-")
val kafkaGroupId = if (groupId != null) groupId else stage.id
new KafkaBuffer[T](pipeline,
pipeline.bufferProperties,
stage.attributes,
cleanedSubject,
kafkaGroupId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.codefeedr.buffer.serialization.schema_exposure.{
ZookeeperSchemaExposer
}
import org.codefeedr.pipeline.Pipeline
import org.codefeedr.stages.StageAttributes

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
Expand All @@ -60,18 +59,16 @@ object KafkaBuffer {
*
* @param pipeline The pipeline for which we use this Buffer.
* @param properties The properties of this Buffer.
* @param stageAttributes The attributes of this stage.
* @param topic The topic to write to, which is basically the subject.
* @param groupId The group id, to specify the consumer group in Kafka.
* @tparam T Type of the data in this Buffer.
*/
class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
pipeline: Pipeline,
properties: org.codefeedr.Properties,
stageAttributes: StageAttributes,
topic: String,
groupId: String)
extends Buffer[T](pipeline, properties)
extends Buffer[T](pipeline, properties, topic)
with Logging {

/** Default settings for the Kafka buffer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import scala.reflect._
import scala.reflect.runtime.universe._

/** Keeps track of all types of SerDes and creates instances. */
object Serializer {
object Serializer extends Enumeration {

type SerializerType = String

/** JSON serde support.
* See: http://json4s.org/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import org.apache.flink.streaming.api.scala.{
}
import org.apache.logging.log4j.scala.Logging
import org.codefeedr.Properties
import org.codefeedr.buffer.BufferType
import org.codefeedr.buffer.{Buffer, BufferType}
import org.codefeedr.buffer.BufferType.BufferType
import org.codefeedr.buffer.serialization.Serializer
import org.codefeedr.buffer.serialization.Serializer.SerializerType
import org.codefeedr.keymanager.KeyManager
import org.codefeedr.pipeline.PipelineType.PipelineType
import org.codefeedr.stages.{InputStage, OutputStage}
Expand Down Expand Up @@ -182,6 +184,17 @@ class PipelineBuilder extends Logging {
this
}

/** Sets the serializer type for the buffer.
*
* @param serializer The serializer type (which is basically a string).
* @return This builder instance.
*/
def setSerializer(serializer: SerializerType) = {
this.setBufferProperty(Buffer.SERIALIZER, serializer)

this
}

/** Append a [[Stage]] in a sequential pipeline.
*
* @param stage The new stage to add.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.codefeedr.Properties
import org.codefeedr.buffer.BufferFactory
import org.codefeedr.stages.StageAttributes

import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe._
Expand All @@ -34,7 +33,7 @@ import scala.reflect.runtime.universe._
*/
abstract class Stage[In <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
val attributes: StageAttributes = StageAttributes()) {
val stageId: Option[String] = None) {

/** The pipeline this stage belongs to. */
var pipeline: Pipeline = _
Expand All @@ -43,7 +42,7 @@ Out <: Serializable with AnyRef: ClassTag: TypeTag](
def environment = pipeline.environment

/** Get the id of this stage */
def id: String = attributes.id.getOrElse(getClass.getName)
def id: String = stageId.getOrElse(getClass.getName)

/** Get the type of IN */
def getInType = classTag[In].runtimeClass.asInstanceOf[Class[In]]
Expand Down Expand Up @@ -152,7 +151,7 @@ Out <: Serializable with AnyRef: ClassTag: TypeTag](
*
* @return Sink subject which is basically the stage id.
*/
def getSinkSubject: String = this.id
def getId: String = this.id

/** Returns the buffer source of this stage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.codefeedr.pipeline

import org.apache.flink.streaming.api.scala.DataStream
import org.codefeedr.stages.StageAttributes

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
Expand All @@ -32,8 +31,8 @@ import scala.reflect.runtime.universe._
abstract class Stage2[In <: Serializable with AnyRef: ClassTag: TypeTag,
In2 <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage[In, Out](attributes) {
stageId: Option[String] = None)
extends Stage[In, Out](stageId) {

/** Transforms from type In to type Out.
*
Expand Down Expand Up @@ -82,8 +81,8 @@ abstract class Stage3[In <: Serializable with AnyRef: ClassTag: TypeTag,
In2 <: Serializable with AnyRef: ClassTag: TypeTag,
In3 <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage2[In, In2, Out](attributes) {
stageId: Option[String] = None)
extends Stage2[In, In2, Out](stageId) {

/** Transforms from type In and In2 to type Out.
*
Expand Down Expand Up @@ -140,8 +139,8 @@ In2 <: Serializable with AnyRef: ClassTag: TypeTag,
In3 <: Serializable with AnyRef: ClassTag: TypeTag,
In4 <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage3[In, In2, In3, Out](attributes) {
stageId: Option[String] = None)
extends Stage3[In, In2, In3, Out](stageId) {

/** Transforms from type In, In2, In3 to type Out.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import scala.reflect.runtime.universe._
* @tparam Out The output type of the job.
*/
abstract class InputStage[Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage[NoType, Out](attributes) {
stageId: Option[String] = None)
extends Stage[NoType, Out](stageId) {

/** Transforms the stage from its input type to its output type.
* This requires using the Flink DataStream API.
Expand All @@ -50,4 +50,5 @@ abstract class InputStage[Out <: Serializable with AnyRef: ClassTag: TypeTag](
* @return A newly created DataStream.
*/
def main(): DataStream[Out]

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import scala.reflect.runtime.universe._
* @tparam In the input type of the job.
*/
abstract class OutputStage[In <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage[In, NoType](attributes) {
stageId: Option[String] = None)
extends Stage[In, NoType](stageId) {

/** Transforms the stage from its input type to its output type.
* This requires using the Flink DataStream API.
Expand Down Expand Up @@ -62,8 +62,8 @@ abstract class OutputStage[In <: Serializable with AnyRef: ClassTag: TypeTag](
*/
abstract class OutputStage2[In <: Serializable with AnyRef: ClassTag: TypeTag,
In2 <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage2[In, In2, NoType](attributes) {
stageId: Option[String] = None)
extends Stage2[In, In2, NoType](stageId) {

/** Transforms the stage from its input type to its output type.
* This requires using the Flink DataStream API.
Expand Down Expand Up @@ -99,8 +99,8 @@ In2 <: Serializable with AnyRef: ClassTag: TypeTag](
abstract class OutputStage3[In <: Serializable with AnyRef: ClassTag: TypeTag,
In2 <: Serializable with AnyRef: ClassTag: TypeTag,
In3 <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage3[In, In2, In3, NoType](attributes) {
stageId: Option[String] = None)
extends Stage3[In, In2, In3, NoType](stageId) {

/** Transforms the stage from its input type to its output type.
* This requires using the Flink DataStream API.
Expand Down Expand Up @@ -143,8 +143,8 @@ abstract class OutputStage4[In <: Serializable with AnyRef: ClassTag: TypeTag,
In2 <: Serializable with AnyRef: ClassTag: TypeTag,
In3 <: Serializable with AnyRef: ClassTag: TypeTag,
In4 <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage4[In, In2, In3, In4, NoType](attributes) {
stageId: Option[String] = None)
extends Stage4[In, In2, In3, In4, NoType](stageId) {

/** Transforms the stage from its input type to its output type.
* This requires using the Flink DataStream API.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import scala.reflect.runtime.universe._
*/
abstract class TransformStage[In <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage[In, Out](attributes)
stageId: Option[String] = None)
extends Stage[In, Out](stageId)

/** This class represents a TransformStage within a pipeline with two inputs.
*
Expand All @@ -42,8 +42,8 @@ abstract class TransformStage2[
In <: Serializable with AnyRef: ClassTag: TypeTag,
In2 <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage2[In, In2, Out](attributes)
stageId: Option[String] = None)
extends Stage2[In, In2, Out](stageId)

/** This class represents a TransformStage within a pipeline with three inputs.
*
Expand All @@ -57,8 +57,8 @@ abstract class TransformStage3[
In2 <: Serializable with AnyRef: ClassTag: TypeTag,
In3 <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage3[In, In2, In3, Out](attributes)
stageId: Option[String] = None)
extends Stage3[In, In2, In3, Out](stageId)

/** This class represents a TransformStage within a pipeline with four inputs.
*
Expand All @@ -74,5 +74,5 @@ abstract class TransformStage4[
In3 <: Serializable with AnyRef: ClassTag: TypeTag,
In4 <: Serializable with AnyRef: ClassTag: TypeTag,
Out <: Serializable with AnyRef: ClassTag: TypeTag](
attributes: StageAttributes = StageAttributes())
extends Stage4[In, In2, In3, In4, Out](attributes)
stageId: Option[String] = None)
extends Stage4[In, In2, In3, In4, Out](stageId)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.codefeedr.buffer.serialization.Serializer
import org.codefeedr.stages.{InputStage, StageAttributes}
import org.codefeedr.stages.InputStage

import scala.reflect.runtime.universe._
import scala.reflect.{ClassTag, classTag}
Expand All @@ -34,15 +34,14 @@ import scala.reflect.{ClassTag, classTag}
* @param topic The topic to read from.
* @param properties Kafka properties, see https://kafka.apache.org/documentation/#consumerconfigs
* @param serializer The serializer to use for deserialization of the data, see [[Serializer]].
* @param stageAttributes Attributes of this stage.
* @tparam T Type of data in Kafka.
*/
class KafkaInput[T <: Serializable with AnyRef: ClassTag: TypeTag](
topic: String,
properties: Properties,
serializer: String = Serializer.JSON,
stageAttributes: StageAttributes = StageAttributes())
extends InputStage[T] {
stageId: Option[String] = None)
extends InputStage[T](stageId) {
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Properties
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.codefeedr.buffer.serialization.Serializer
import org.codefeedr.stages.{OutputStage, StageAttributes}
import org.codefeedr.stages.OutputStage

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
Expand All @@ -33,15 +33,14 @@ import scala.reflect.runtime.universe._
* @param topic The topic to send to.
* @param properties Kafka properties, see https://kafka.apache.org/documentation/#consumerconfigs
* @param serializer The serializer to use for serialization of the data, see [[Serializer]].
* @param stageAttributes Attributes of this stage.
* @tparam T Type of data in Kafka.
*/
class KafkaOutput[T <: Serializable with AnyRef: ClassTag: TypeTag](
topic: String,
properties: Properties,
serializer: String = Serializer.JSON,
stageAttributes: StageAttributes = StageAttributes())
extends OutputStage[T] {
stageId: Option[String] = None)
extends OutputStage[T](stageId) {

//get correct serde, will fallback to JSON
private val serde = Serializer.getSerde[T](serializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.codefeedr.stages.utilities

import org.apache.flink.streaming.api.scala.{DataStream, _}
import org.codefeedr.stages.{InputStage, StageAttributes}
import org.codefeedr.stages.InputStage

/** Simple String wrapper case class. */
case class StringType(value: String)
Expand All @@ -28,9 +28,8 @@ case class StringType(value: String)
*
* @param str The string to split.
*/
class StringInput(str: String = "",
stageAttributes: StageAttributes = StageAttributes())
extends InputStage[StringType](stageAttributes) {
class StringInput(str: String = "", stageId: Option[String] = None)
extends InputStage[StringType](stageId) {

/** Splits a String into elements of [[StringType]].
*
Expand Down
Loading

0 comments on commit a79d7f1

Please sign in to comment.