Skip to content

Commit

Permalink
Merge 4965795 into 0aad95c
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Jan 11, 2020
2 parents 0aad95c + 4965795 commit acf136f
Show file tree
Hide file tree
Showing 33 changed files with 1,079 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -2,7 +2,7 @@ language: scala
scala:
- 2.12.8
sudo: required
dist: trusty
dist: xenial
jdk: openjdk11

before_cache:
Expand Down
24 changes: 18 additions & 6 deletions build.sbt
Expand Up @@ -28,7 +28,7 @@ ThisBuild / description := "CodeFeedr provides an infrastructure on top of Apach
ThisBuild / licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
ThisBuild / homepage := Some(url("https://github.com/codefeedr/codefeedr"))

ThisBuild / version := "0.1.2"
ThisBuild / version := "0.1.4"
ThisBuild / organization := "org.codefeedr"
ThisBuild / scalaVersion := scala212

Expand All @@ -47,7 +47,8 @@ lazy val root = (project in file("."))
pluginElasticSearch,
pluginGitHub,
pluginRabbitMQ,
pluginGHTorrent)
pluginGHTorrent,
pluginPypi)

lazy val core = (project in file("codefeedr-core"))
.settings(
Expand Down Expand Up @@ -152,9 +153,18 @@ lazy val pluginGHTorrent = (project in file("codefeedr-plugins/codefeedr-ghtorre
)
).dependsOn(core)

lazy val pluginPypi = (project in file("codefeedr-plugins/codefeedr-pypi"))
.settings(
name := pluginPrefix + "pypi",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
)
).dependsOn(core)

lazy val dependencies =
new {
val flinkVersion = "1.7.0"
val flinkVersion = "1.9.1"
val json4sVersion = "3.6.4"
val log4jVersion = "2.11.0"
val log4jScalaVersion = "11.0"
Expand All @@ -172,7 +182,7 @@ lazy val dependencies =
val flinkRabbitMQ = "org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion

val redis = "net.debasishg" %% "redisclient" % "3.6"
val kafkaClient = "org.apache.kafka" % "kafka-clients" % "1.0.0"
val kafkaClient = "org.apache.kafka" % "kafka-clients" % "2.4.0"
val zookeeper = "org.apache.zookeeper" % "zookeeper" % "3.4.9"

val json4s = "org.json4s" %% "json4s-scalap" % json4sVersion
Expand All @@ -190,7 +200,8 @@ lazy val dependencies =
val scalamock = "org.scalamock" %% "scalamock" % "4.1.0" % Test
val mockito = "org.mockito" % "mockito-all" % "1.10.19" % Test
val embeddedRedis = "com.github.sebruck" %% "scalatest-embedded-redis" % "0.3.0" % Test
val embeddedKafka = "net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % Test

val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "2.4.0" % Test
val embeddedMongo = "com.github.simplyscala" %% "scalatest-embedmongo" % "0.2.4" % Test
//val embeddedRabbitMQ = "io.arivera.oss" %% "embedded-rabbitmq" % "1.3.0" % Test

Expand Down Expand Up @@ -222,7 +233,8 @@ lazy val commonSettings = Seq(
"confluent" at "http://packages.confluent.io/maven/",
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
"Artima Maven Repository" at "http://repo.artima.com/releases",
Resolver.mavenLocal
Resolver.mavenLocal,
Resolver.jcenterRepo
),
publishMavenStyle in ThisBuild := true,
publishTo in ThisBuild := Some(
Expand Down
16 changes: 16 additions & 0 deletions codefeedr-core/src/main/scala/org/codefeedr/Properties.scala
Expand Up @@ -87,6 +87,22 @@ class Properties(private val contents: Map[String, String] = Map()) {
def set[T](key: String, value: T)(implicit convert: T => String): Properties =
new Properties(contents + (key -> value))

/** Merges two Properties object.
*
* @param properties the second properties object. These properties will override the keys of *this*.
* @return new properties.
*/
def merge(properties: Properties): Properties = {
val keys = properties.keys()
var newProps = this

for (key <- keys) {
newProps = newProps.set(key, properties.get(key).get)
}

newProps
}

/**
* Get a set if keys in this properties.
*
Expand Down
Expand Up @@ -70,6 +70,9 @@ abstract class Buffer[T <: Serializable with AnyRef: ClassTag: TypeTag](

Serializer.getSerde[T](serializer)
}

/** Returns the properties of this buffer. */
def getProperties = properties
}

/**
Expand Down
Expand Up @@ -65,10 +65,11 @@ class BufferFactory[+In <: Serializable with AnyRef,
pipeline.bufferType match {
case BufferType.Kafka => {
val cleanedSubject = subject.replace("$", "-")
new KafkaBuffer[T](pipeline,
pipeline.bufferProperties,
cleanedSubject,
groupIdFinal)
new KafkaBuffer[T](
pipeline,
pipeline.bufferProperties.merge(stage.getContext.stageProperties),
cleanedSubject,
groupIdFinal)
}
case x if BufferFactory.registry.exists(_._1 == x) => {
val tt = typeTag[T]
Expand All @@ -80,16 +81,22 @@ class BufferFactory[+In <: Serializable with AnyRef,
.get
.runtimeClass
.getConstructors()(0)
.newInstance(pipeline, pipeline.bufferProperties, subject, ct, tt)
.newInstance(
pipeline,
pipeline.bufferProperties.merge(stage.getContext.stageProperties),
subject,
ct,
tt)
.asInstanceOf[Buffer[T]]
}
case _ => {
//Switch to Kafka.
val cleanedSubject = subject.replace("$", "-")
new KafkaBuffer[T](pipeline,
pipeline.bufferProperties,
cleanedSubject,
groupIdFinal)
new KafkaBuffer[T](
pipeline,
pipeline.bufferProperties.merge(stage.getContext.stageProperties),
cleanedSubject,
groupIdFinal)
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions codefeedr-core/src/main/scala/org/codefeedr/buffer/FakeStage.scala
@@ -0,0 +1,11 @@
package org.codefeedr.buffer

import org.codefeedr.stages.InputStage

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

/** Fake stage to link up already running stages. */
abstract class FakeStage[T <: Serializable with AnyRef: ClassTag: TypeTag](
stageId: String)
extends InputStage[T](Some(stageId)) {}
Expand Up @@ -59,6 +59,14 @@ object KafkaBuffer {
val AMOUNT_OF_REPLICAS = "AMOUNT_OF_REPLICAS"
val COMPRESSION_TYPE = "compression.type"

//OFFSETS
val START_POSITION = "START_POSITION"
val START_TIMESTAMP = "START_TIMESTAMP"
val GROUP_OFFSETS = "GROUP_OFFSETS"
val TIMESTAMP = "TIMESTAMP"
val LATEST = "LATEST"
val EARLIEST = "EARLIEST"

}

/** The implementation for the Kafka buffer. This buffer is the default.
Expand Down Expand Up @@ -96,6 +104,10 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
val AMOUNT_OF_PARTITIONS = 1
val AMOUNT_OF_REPLICAS = 1
val COMPRESSION_TYPE = "none"

//OFFSETS
val START_POSITION = KafkaBuffer.GROUP_OFFSETS
val START_TIMESTAMP = 0x0
}

/** Get a Kafka Consumer as source for a stage.
Expand All @@ -111,9 +123,27 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
.get[String](KafkaBuffer.BROKER)
.getOrElse(KafkaBufferDefaults.BROKER))

val kafkaConsumer =
new FlinkKafkaConsumer[T](topic, serde, getKafkaProperties)
val startPosition = properties.getOrElse[String](
KafkaBuffer.START_POSITION,
KafkaBufferDefaults.START_POSITION)

/** Configure the starting point FromGroupOffsets. */
startPosition match {
case KafkaBuffer.EARLIEST => kafkaConsumer.setStartFromEarliest()
case KafkaBuffer.LATEST => kafkaConsumer.setStartFromLatest()
case KafkaBuffer.TIMESTAMP =>
kafkaConsumer.setStartFromTimestamp(
properties.getOrElse[Long](
KafkaBuffer.START_TIMESTAMP,
KafkaBufferDefaults.START_TIMESTAMP)(_.toLong))
case KafkaBuffer.GROUP_OFFSETS => kafkaConsumer.setStartFromGroupOffsets()
case _ => kafkaConsumer.setStartFromGroupOffsets()
}

// Add a source.
pipeline.environment.addSource(
new FlinkKafkaConsumer[T](topic, serde, getKafkaProperties))
pipeline.environment.addSource(kafkaConsumer)
}

/** Get a Kafka Producer as sink to the buffer.
Expand Down
@@ -0,0 +1,16 @@
package org.codefeedr.buffer

import org.apache.flink.streaming.api.scala.DataStream
import org.codefeedr.pipeline.Context

import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

/** KafkaTopic stage which only embodies the topic name, to let a linked stage read from this topic. */
class KafkaTopic[T <: Serializable with AnyRef: ClassTag: TypeTag](
topic: String)
extends FakeStage[T](topic) {

/** This will never be run. */
override def main(context: Context): DataStream[T] = ???
}
41 changes: 27 additions & 14 deletions codefeedr-core/src/main/scala/org/codefeedr/pipeline/Pipeline.scala
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala._
import org.codefeedr.Properties
import org.codefeedr.buffer.BufferType.BufferType
import org.codefeedr.buffer.FakeStage
import org.codefeedr.keymanager.KeyManager
import org.codefeedr.pipeline.RuntimeType.RuntimeType

Expand Down Expand Up @@ -59,6 +60,20 @@ case class Pipeline(var name: String,
graph: DirectedAcyclicGraph,
objectProperties: Map[String, Properties]) {

/** Prepare the pipeline by adding this instance to every stage.*/
def prepare() = {
val nodes = getNodes

// Run all setups.
for (obj <- nodes) {
obj.setUp(this)
}

}

/** Run this immediately. */
prepare()

/** The mutable StreamExecutionEnvironment. */
var _environment: StreamExecutionEnvironment = null

Expand Down Expand Up @@ -175,6 +190,7 @@ case class Pipeline(var name: String,
def showList(asException: Boolean): Unit = {
if (asException) {
val contents = getNodes
.filter(!_.isInstanceOf[FakeStage[_]])
.map { item =>
'"' + item.getContext.stageId + '"'
}
Expand All @@ -183,7 +199,9 @@ case class Pipeline(var name: String,

throw PipelineListException(json)
} else {
getNodes.foreach(item => println(item.getContext.stageId))
getNodes
.filter(!_.isInstanceOf[FakeStage[_]])
.foreach(item => println(item.getContext.stageId))
}
}

Expand Down Expand Up @@ -215,15 +233,11 @@ case class Pipeline(var name: String,

val nodes = getNodes

// Run all setups.
for (nodes <- nodes) {
nodes.setUp(this)
}

// Connect each object by getting a starting buffer, if any, and sending it to the next.
var buffer: DataStream[Serializable with AnyRef] = null
for (obj <- nodes) {
buffer = obj.transform(buffer)
if (!obj.isInstanceOf[FakeStage[_]])
buffer = obj.transform(buffer)
}

// Execute in one environment.
Expand All @@ -236,11 +250,6 @@ case class Pipeline(var name: String,
def startLocal(): Unit = {
val nodes = getNodes

// Run all setups.
for (obj <- nodes) {
obj.setUp(this)
}

// For each PO, make buffers and run.
for (obj <- nodes) {
runStage(obj)
Expand Down Expand Up @@ -273,8 +282,7 @@ case class Pipeline(var name: String,
val obj = optObj.get
.asInstanceOf[Stage[Serializable with AnyRef, Serializable with AnyRef]]

// Setup and run object.
obj.setUp(this)
// Run object.
runStage(obj, groupId)

// Run stage in one environment.
Expand All @@ -291,6 +299,11 @@ case class Pipeline(var name: String,
stage: Stage[Serializable with AnyRef, Serializable with AnyRef],
groupId: String = null): Unit = {

/** We won't run a fake stage. */
if (stage.isInstanceOf[FakeStage[_]]) {
return
}

// Find the source lazily.
lazy val source =
if (stage.hasMainSource) stage.getMainSource(groupId) else null
Expand Down
27 changes: 27 additions & 0 deletions codefeedr-core/src/test/scala/org/codefeedr/PropertiesTest.scala
Expand Up @@ -125,4 +125,31 @@ class PropertiesTest extends FunSuite {
assert(props.has("foo"))
assert(!props.has("bar"))
}

test("Two property maps should be able to merge") {
var props = new Properties()
var props2 = new Properties()

props = props.set("ha", "na")
props2 = props2.set("bla", "na")

val finalProps = props.merge(props2)

assert(finalProps.has("ha"))
assert(finalProps.has("bla"))
}

test("Two property maps should be able to merge and overridden") {
var props = new Properties()
var props2 = new Properties()

props = props.set("ha", "na")
props2 = props2.set("ha", "ba")

val finalProps = props.merge(props2)

assert(finalProps.has("ha"))
assert(!finalProps.has("bla"))
assert(finalProps.get("ha").get.equals("ba"))
}
}

0 comments on commit acf136f

Please sign in to comment.