Skip to content

Commit

Permalink
Merge 2b432a5 into 0fb76ca
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Feb 13, 2019
2 parents 0fb76ca + 2b432a5 commit 9961e91
Show file tree
Hide file tree
Showing 127 changed files with 2,506 additions and 1,462 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ script:
- sbt 'set parallelExecution in ThisBuild := false' clean coverage test coverageReport && sbt coverageAggregate

after_success:
- sbt scalafmtCheck
- sbt coveralls

notifications:
Expand Down
31 changes: 17 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
CodeFeedr
--------------------------

[![Build Status](https://travis-ci.org/codefeedr/codefeedr.svg?branch=develop)](https://travis-ci.org/codefeedr/codefeedr)
[![BCH compliance](https://bettercodehub.com/edge/badge/codefeedr/codefeedr?branch=develop)](https://bettercodehub.com/)
[![Coverage Status](https://coveralls.io/repos/github/codefeedr/codefeedr/badge.svg?branch=develop)](https://coveralls.io/github/codefeedr/codefeedr?branch=develop)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Build
Status](https://travis-ci.org/codefeedr/codefeedr.svg?branch=develop)](https://travis-ci.org/codefeedr/codefeedr) [![BCH compliance](https://bettercodehub.com/edge/badge/codefeedr/codefeedr?branch=develop)](https://bettercodehub.com/)
[![Coverage
Status](https://coveralls.io/repos/github/codefeedr/codefeedr/badge.svg?branch=develop)](https://coveralls.io/github/codefeedr/codefeedr?branch=develop) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![University](https://img.shields.io/badge/university-tudelft-blue.svg)](http://swerl.tudelft.nl/bin/view/Main/WebHome)

A framework for easily building Flink streaming programs. Documentation can be found [here](https://codefeedr.readthedocs.io).
A framework for easily building Flink streaming programs. Documentation
can be found [here](https://codefeedr.readthedocs.io).
## Contributors
Main contributors:
- [Wouter Zorgdrager](https://www.linkedin.com/in/wouter-zorgdrager-a4746512a/)
- [Wouter
Zorgdrager](https://www.linkedin.com/in/wouter-zorgdrager-a4746512a/)

Former contributors:
- [Jos Kuijpers](https://nl.linkedin.com/in/jos-kuijpers-4b714032)
- [Joris Quist](https://www.linkedin.com/in/joris-quist-a44245170)
- [Joris Quist](https://www.linkedin.com/in/joris-quist-a44245170)


## Configuring the build environment
Expand All @@ -39,16 +41,17 @@ Run `sbt`. Then at the SBT console:

### From IntelliJ

Install the latest IntelliJ. Go to the IntelliJ preferences and install the
Scala plugin. Then
Install the latest IntelliJ. Go to the IntelliJ preferences and install
the Scala plugin. Then

1. File -> New -> Project with existing sources from within IntelliJ or "Import project" from the
IntelliJ splash screen
1. File -> New -> Project with existing sources from within IntelliJ or
"Import project" from the IntelliJ splash screen
2. Select the top level checkout directory for CodeFeedr.
3. On the dialog that appears, select "SBT"
4. Click Next and then Finish
5. From the "SBT Projet Data To Import", select all modules

In order to run your application from within IntelliJ, you have to select the classpath of the
'mainRunner' module in the run/debug configurations. Simply open 'Run -> Edit configurations...'
and then select 'mainRunner' from the "Use classpath of module" dropbox.
In order to run your application from within IntelliJ, you have to
select the classpath of the 'mainRunner' module in the run/debug
configurations. Simply open 'Run -> Edit configurations...' and then
select 'mainRunner' from the "Use classpath of module" dropbox.
12 changes: 7 additions & 5 deletions codefeedr-core/src/main/scala/org/codefeedr/Properties.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Properties {
/**
* Object containing configuration properties.
*/
class Properties(private val contents: Map[String,String] = Map()) {
class Properties(private val contents: Map[String, String] = Map()) {

/**
* Get a value converted to an implicitly converted type.
Expand Down Expand Up @@ -57,7 +57,8 @@ class Properties(private val contents: Map[String,String] = Map()) {
* @tparam T Type of the value
* @return Value
*/
def getOrElse[T](key: String, default: T)(implicit convert: String => T): T = {
def getOrElse[T](key: String, default: T)(
implicit convert: String => T): T = {
val option = contents.get(key)
if (option.isEmpty) {
return default
Expand Down Expand Up @@ -96,7 +97,7 @@ class Properties(private val contents: Map[String,String] = Map()) {

override def equals(that: Any): Boolean = that match {
case that: Properties => that.contents == contents
case _ => false
case _ => false
}

/**
Expand All @@ -109,8 +110,9 @@ class Properties(private val contents: Map[String,String] = Map()) {
def toJavaProperties: java.util.Properties = {
val props = new java.util.Properties()

contents.foreach { case (key, value) =>
props.setProperty(key, value)
contents.foreach {
case (key, value) =>
props.setProperty(key, value)
}

props
Expand Down
10 changes: 6 additions & 4 deletions codefeedr-core/src/main/scala/org/codefeedr/buffer/Buffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ import scala.reflect.{ClassTag, classTag}
* @param properties Buffer properties
* @tparam T Element type of the buffer
*/
abstract class Buffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeline: Pipeline, properties: org.codefeedr.Properties) {
abstract class Buffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
pipeline: Pipeline,
properties: org.codefeedr.Properties) {

//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

//get TypeInformation of generic (case) class
implicit val typeInfo = TypeInformation.of(inputClassType)


/**
* Get the source for this buffer. The buffer ereads from this
*
Expand All @@ -65,7 +66,8 @@ abstract class Buffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeli
* @return Serializer
*/
def getSerializer: AbstractSerde[T] = {
val serializer = properties.getOrElse[String](Buffer.SERIALIZER, Serializer.JSON)
val serializer =
properties.getOrElse[String](Buffer.SERIALIZER, Serializer.JSON)

Serializer.getSerde[T](serializer)
}
Expand All @@ -76,4 +78,4 @@ abstract class Buffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeli
*/
object Buffer {
val SERIALIZER = "SERIALIZER"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.codefeedr.buffer

import org.codefeedr.pipeline.{Pipeline, PipelineObject}
import org.codefeedr.pipeline.{Pipeline, Stage}

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
Expand All @@ -34,8 +34,13 @@ import scala.reflect.runtime.universe._
* @param pipeline Pipeline
* @param sinkObject Object that writes to the buffer
*/
class BufferFactory[U <: Serializable with AnyRef,V <: Serializable with AnyRef, X <: Serializable with AnyRef, Y <: Serializable with AnyRef]
(pipeline: Pipeline, stage: PipelineObject[X, Y], sinkObject: PipelineObject[U, V], groupId: String = null) {
class BufferFactory[U <: Serializable with AnyRef,
V <: Serializable with AnyRef,
X <: Serializable with AnyRef,
Y <: Serializable with AnyRef](pipeline: Pipeline,
stage: Stage[X, Y],
sinkObject: Stage[U, V],
groupId: String = null) {

/**
* Create a new buffer
Expand All @@ -45,20 +50,29 @@ class BufferFactory[U <: Serializable with AnyRef,V <: Serializable with AnyRef,
* @throws IllegalArgumentException When sinkObject is null
* @throws IllegalStateException When buffer could not be instantiated due to bad configuration
*/
def create[T <: Serializable with AnyRef : ClassTag : TypeTag](): Buffer[T] = {
require(sinkObject != null, "Buffer factory requires a sink object to determine buffer location")
def create[T <: Serializable with AnyRef: ClassTag: TypeTag](): Buffer[T] = {
require(
sinkObject != null,
"Buffer factory requires a sink object to determine buffer location")

val subject = sinkObject.getSinkSubject

pipeline.bufferType match {
case BufferType.Kafka => {
val cleanedSubject = subject.replace("$", "-")
val kafkaGroupId = if(groupId != null) groupId else stage.id
new KafkaBuffer[T](pipeline, pipeline.bufferProperties, stage.attributes, cleanedSubject, kafkaGroupId)
val kafkaGroupId = if (groupId != null) groupId else stage.id
new KafkaBuffer[T](pipeline,
pipeline.bufferProperties,
stage.attributes,
cleanedSubject,
kafkaGroupId)
}
case BufferType.RabbitMQ => {
new RabbitMQBuffer[T](pipeline, pipeline.bufferProperties, stage.attributes, subject)
new RabbitMQBuffer[T](pipeline,
pipeline.bufferProperties,
stage.attributes,
subject)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ package org.codefeedr.buffer
object BufferType extends Enumeration {
type BufferType = Value
val Kafka, RabbitMQ = Value
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
package org.codefeedr.buffer

final case class SchemaNotFoundException(private val message: String = "",
private val cause: Throwable = None.orNull)
extends Exception(message, cause)
private val cause: Throwable =
None.orNull)
extends Exception(message, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ import java.util.Properties
import org.apache.avro.reflect.ReflectData
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer011,
FlinkKafkaProducer011
}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.logging.log4j.scala.Logging
import org.codefeedr.Properties._
import org.codefeedr.buffer.serialization.schema_exposure.{RedisSchemaExposer, SchemaExposer, ZookeeperSchemaExposer}
import org.codefeedr.buffer.serialization.schema_exposure.{
RedisSchemaExposer,
SchemaExposer,
ZookeeperSchemaExposer
}
import org.codefeedr.pipeline.Pipeline
import org.codefeedr.stages.StageAttributes

Expand All @@ -36,6 +43,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

object KafkaBuffer {

/**
* PROPERTIES
*/
Expand All @@ -49,8 +57,14 @@ object KafkaBuffer {
val SCHEMA_EXPOSURE_DESERIALIZATION = "SCHEMA_EXPOSURE_SERIALIZATION"
}

class KafkaBuffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeline: Pipeline, properties: org.codefeedr.Properties, stageAttributes: StageAttributes, topic: String, groupId: String)
extends Buffer[T](pipeline, properties) with Logging {
class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
pipeline: Pipeline,
properties: org.codefeedr.Properties,
stageAttributes: StageAttributes,
topic: String,
groupId: String)
extends Buffer[T](pipeline, properties)
with Logging {

private object KafkaBufferDefaults {
//KAFKA RELATED
Expand All @@ -76,11 +90,13 @@ class KafkaBuffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeline:
val serde = getSerializer

//make sure the topic already exists
checkAndCreateSubject(topic, properties.get[String](KafkaBuffer.BROKER).
getOrElse(KafkaBufferDefaults.BROKER))
checkAndCreateSubject(topic,
properties
.get[String](KafkaBuffer.BROKER)
.getOrElse(KafkaBufferDefaults.BROKER))

pipeline.environment.
addSource(new FlinkKafkaConsumer011[T](topic, serde, getKafkaProperties))
pipeline.environment.addSource(
new FlinkKafkaConsumer011[T](topic, serde, getKafkaProperties))
}

/**
Expand All @@ -90,13 +106,15 @@ class KafkaBuffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeline:
*/
override def getSink: SinkFunction[T] = {
//check if a schema should be exposed
if (properties.get[Boolean](KafkaBuffer.SCHEMA_EXPOSURE)
.getOrElse(KafkaBufferDefaults.SCHEMA_EXPOSURE)) {
if (properties
.get[Boolean](KafkaBuffer.SCHEMA_EXPOSURE)
.getOrElse(KafkaBufferDefaults.SCHEMA_EXPOSURE)) {

exposeSchema()
}

val producer = new FlinkKafkaProducer011[T](topic, getSerializer, getKafkaProperties)
val producer =
new FlinkKafkaProducer011[T](topic, getSerializer, getKafkaProperties)
producer.setWriteTimestampToKafka(true)

producer
Expand All @@ -112,7 +130,8 @@ class KafkaBuffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeline:
kafkaProp.put("bootstrap.servers", KafkaBufferDefaults.BROKER)
kafkaProp.put("zookeeper.connect", KafkaBufferDefaults.ZOOKEEPER)
kafkaProp.put("auto.offset.reset", KafkaBufferDefaults.AUTO_OFFSET_RESET)
kafkaProp.put("auto.commit.interval.ms", KafkaBufferDefaults.AUTO_COMMIT_INTERVAL_MS)
kafkaProp.put("auto.commit.interval.ms",
KafkaBufferDefaults.AUTO_COMMIT_INTERVAL_MS)
kafkaProp.put("enable.auto.commit", KafkaBufferDefaults.ENABLE_AUTO_COMMIT)
kafkaProp.put("group.id", groupId)

Expand Down Expand Up @@ -163,8 +182,8 @@ class KafkaBuffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeline:
* Creates a new topic
*/
private def createTopic(client: AdminClient, topic: NewTopic): Unit = {
client.
createTopics(List(topic).asJavaCollection)
client
.createTopics(List(topic).asJavaCollection)
.all()
.get() //this blocks the method until the topic is created
}
Expand All @@ -186,21 +205,20 @@ class KafkaBuffer[T <: Serializable with AnyRef : ClassTag : TypeTag](pipeline:
* @return a schema exposer.
*/
def getExposer: SchemaExposer = {
val exposeName = properties.
get[String](KafkaBuffer.SCHEMA_EXPOSURE_SERVICE).
getOrElse(KafkaBufferDefaults.SCHEMA_EXPOSURE_SERVICE)
val exposeName = properties
.get[String](KafkaBuffer.SCHEMA_EXPOSURE_SERVICE)
.getOrElse(KafkaBufferDefaults.SCHEMA_EXPOSURE_SERVICE)

val exposeHost = properties.
get[String](KafkaBuffer.SCHEMA_EXPOSURE_HOST).
getOrElse(KafkaBufferDefaults.SCHEMA_EXPOSURE_HOST)
val exposeHost = properties
.get[String](KafkaBuffer.SCHEMA_EXPOSURE_HOST)
.getOrElse(KafkaBufferDefaults.SCHEMA_EXPOSURE_HOST)

//get exposer
val exposer = exposeName match {
case "zookeeper" => new ZookeeperSchemaExposer(exposeHost)
case _ => new RedisSchemaExposer(exposeHost) //default is redis
case _ => new RedisSchemaExposer(exposeHost) //default is redis
}

exposer
}
}

Loading

0 comments on commit 9961e91

Please sign in to comment.