Skip to content

Commit

Permalink
Merge d77e60e into 7198972
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Feb 11, 2019
2 parents 7198972 + d77e60e commit b735aa3
Show file tree
Hide file tree
Showing 16 changed files with 227 additions and 84 deletions.
18 changes: 0 additions & 18 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ sudo: required
dist: trusty
jdk: oraclejdk8

services:
- docker
- redis
- mongodb
- rabbitmq

before_cache:
# Cleanup the cached directories to avoid unnecessary cache updates
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
Expand All @@ -22,18 +16,6 @@ cache:
- $HOME/.ivy2/cache
- $HOME/.sbt

before_install:
- mkdir containers
- git clone https://github.com/wurstmeister/kafka-docker.git containers/kafka
- "sed -i 's/KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100/KAFKA_ADVERTISED_HOST_NAME: localhost/' containers/kafka/docker-compose-single-broker.yml"
- "sed -i 's/KAFKA_CREATE_TOPICS: \"test:1:1\"//' containers/kafka/docker-compose-single-broker.yml"
- "echo 'EXPOSE 9092' >> containers/kafka/Dockerfile"
- docker-compose -f "containers/kafka/docker-compose-single-broker.yml" up -d
- docker ps -a

before_script:
- sleep 10

script:
- sbt 'set parallelExecution in ThisBuild := false' clean coverage test coverageReport && sbt coverageAggregate

Expand Down
14 changes: 12 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ lazy val core = (project in file("codefeedr-core"))
dependencies.kryoChill,

// Avro schema exposure
dependencies.avro
dependencies.avro,

// Embedded redis to test key management
dependencies.embeddedRedis,

// Embedded kafka for integration tests
dependencies.embeddedKafka
)
)

Expand All @@ -110,7 +116,8 @@ lazy val pluginMongodb = (project in file("codefeedr-plugins/codefeedr-mongodb")
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
dependencies.mongo
dependencies.mongo,
dependencies.embeddedMongo
)
)
.dependsOn(
Expand Down Expand Up @@ -229,6 +236,9 @@ lazy val dependencies =
val scalatest = "org.scalatest" %% "scalatest" % "3.0.5" % Test
val scalamock = "org.scalamock" %% "scalamock" % "4.1.0" % Test
val mockito = "org.mockito" % "mockito-all" % "1.10.19" % Test
val embeddedRedis = "com.github.sebruck" %% "scalatest-embedded-redis" % "0.3.0" % Test
val embeddedKafka = "net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % Test
val embeddedMongo = "com.github.simplyscala" %% "scalatest-embedmongo" % "0.2.4" % Test

val avro = "org.apache.avro" % "avro" % "1.8.2"
val twitter = "com.danielasfregola" %% "twitter4s" % "5.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package org.codefeedr.buffer
import java.util
import java.util.{Date, Properties, UUID}

import com.github.sebruck.EmbeddedRedis
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.streaming.api.functions.sink.SinkFunction
Expand All @@ -32,14 +34,33 @@ import org.codefeedr.pipeline.PipelineBuilder
import org.codefeedr.stages.utilities.StringType
import org.codefeedr.stages.{InputStage, OutputStage, StageAttributes}
import org.codefeedr.testUtils.{JobFinishedException, SimpleSourcePipelineObject}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import redis.embedded.RedisServer

import scala.collection.JavaConversions._

class KafkaBufferTest extends FunSuite with BeforeAndAfter {
class KafkaBufferTest extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with EmbeddedKafka with EmbeddedRedis {

var client : AdminClient = _
var kafkaBuffer : KafkaBuffer[StringType] = _
var redis: RedisServer = null
var redisPort: Int = 0



override def beforeAll(): Unit = {
implicit val config = EmbeddedKafkaConfig(zooKeeperPort = 2181, kafkaPort = 9092)
EmbeddedKafka.start()

redis = startRedis()
redisPort = redis.ports().get(0)
}

override def afterAll(): Unit = {
EmbeddedKafka.stop()
stopRedis(redis)
}


before {
//set all the correct properties
Expand All @@ -50,7 +71,10 @@ class KafkaBufferTest extends FunSuite with BeforeAndAfter {
client = AdminClient.create(props)

//setup simple kafkabuffer
val pipeline = new PipelineBuilder().append(new SimpleSourcePipelineObject()).build()
val pipeline = new PipelineBuilder()
.append(new SimpleSourcePipelineObject())
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE_HOST, s"redis://localhost:$redisPort")
.build()
kafkaBuffer = new KafkaBuffer[StringType](pipeline, pipeline.bufferProperties, StageAttributes(),"test-subject", null)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@
*/
package org.codefeedr.buffer.serialization.schema_exposure

class RedisSchemaExposerTest extends SchemaExposerTest {
override def getSchemaExposer(): SchemaExposer = new RedisSchemaExposer("redis://localhost:6379")
import com.github.sebruck.EmbeddedRedis
import org.scalatest.BeforeAndAfterAll
import redis.embedded.RedisServer

class RedisSchemaExposerTest extends SchemaExposerTest with BeforeAndAfterAll with EmbeddedRedis {
var redis: RedisServer = null
var redisPort: Int = 0

// Before all tests, setup an embedded redis
override def beforeAll() = {
redis = startRedis()
redisPort = redis.ports().get(0)
}

// After all tests, stop embedded redis
override def afterAll() = {
stopRedis(redis)
}

override def getSchemaExposer(): SchemaExposer = new RedisSchemaExposer(s"redis://localhost:$redisPort")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@
*/
package org.codefeedr.buffer.serialization.schema_exposure

class ZookeeperSchemaExposerTest extends SchemaExposerTest {
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.BeforeAndAfterAll

class ZookeeperSchemaExposerTest extends SchemaExposerTest
with EmbeddedKafka
with BeforeAndAfterAll {

override def beforeAll(): Unit = {
implicit val config = EmbeddedKafkaConfig(zooKeeperPort = 2181)
EmbeddedKafka.start()
}

override def afterAll(): Unit = {
EmbeddedKafka.stop()
}

override def getSchemaExposer(): SchemaExposer = new ZookeeperSchemaExposer("localhost:2181")
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ package org.codefeedr.keymanager

import org.scalatest.FunSuite

class KeyManagerTest(keyManager: KeyManager) extends FunSuite {
abstract class KeyManagerTest extends FunSuite {

var keyManager : KeyManager = null

// Inject a keymanager into this test suite
def injectKeyManager(keyManager : KeyManager): Unit = {
this.keyManager = keyManager
}

test("An unknown target returns no keys") {
val key = keyManager.request("otherTarget", 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.codefeedr.keymanager

import org.scalatest.FunSuite

class StaticKeyManagerTest extends KeyManagerTest(new StaticKeyManager()) {
class StaticKeyManagerTest extends KeyManagerTest {

injectKeyManager(new StaticKeyManager())

test("An empty key manager returns no keys") {
val km = new StaticKeyManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,44 @@ package org.codefeedr.keymanager.redis
import java.net.URI
import java.util.Date

import com.github.sebruck.EmbeddedRedis
import com.redis.RedisClient
import org.codefeedr.keymanager.KeyManagerTest
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite, PrivateMethodTester}
import redis.embedded.RedisServer

class RedisKeyManagerTest extends KeyManagerTest(new RedisKeyManager("redis://localhost:6379", "cf_test"))
class RedisKeyManagerTest extends KeyManagerTest()
with BeforeAndAfter
with PrivateMethodTester {
with PrivateMethodTester
with EmbeddedRedis
with BeforeAndAfterAll {

var km: RedisKeyManager = _
var redis: RedisServer = null
var redisPort: Int = 0

// Before all tests, setup an embedded redis
override def beforeAll() = {
redis = startRedis()
redisPort = redis.ports().get(0)
}

// After all tests, stop embedded redis
override def afterAll() = {
stopRedis(redis)
}

before {
km = new RedisKeyManager("redis://localhost:6379", "cf_test")
km = new RedisKeyManager(s"redis://localhost:$redisPort", "cf_test")
this.injectKeyManager(km)
}

after {
km.deleteAll()
km.disconnect()
}

test("A set key should be retrievable" ) {
test("A set key should be retrievable") {
km.set("testTarget", "testKey", 10, 10000)

val key = km.request("testTarget", 1)
Expand All @@ -54,7 +72,7 @@ class RedisKeyManagerTest extends KeyManagerTest(new RedisKeyManager("redis://lo
assert(key.isEmpty)
}

test("The key with the best fitting number of calls should be used" ) {
test("The key with the best fitting number of calls should be used") {
km.set("testTarget", "testKey", 10, 10000)
km.set("testTarget", "testKey2", 4, 10000)

Expand Down Expand Up @@ -102,7 +120,7 @@ class RedisKeyManagerTest extends KeyManagerTest(new RedisKeyManager("redis://lo
}

test("RedisKeyManager has a valid default root") {
val km = new RedisKeyManager("redis://localhost:6379")
val km = new RedisKeyManager(s"redis://localhost:$redisPort")
val key = km.request("randomTarget", 1)

assert(key.isEmpty)
Expand All @@ -120,7 +138,7 @@ class RedisKeyManagerTest extends KeyManagerTest(new RedisKeyManager("redis://lo
assert(key.get.remainingCalls == (10 - 1))

// As per documentation
val uri = new URI("redis://localhost:6379")
val uri = new URI(s"redis://localhost:$redisPort")
val rc = new RedisClient(uri)
val refreshTime = rc.zscore("cf_test:testTarget:refreshTime", "testKey")
rc.disconnect
Expand Down
14 changes: 12 additions & 2 deletions codefeedr-core/src/test/scala/org/codefeedr/pipeline/JobTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,24 @@

package org.codefeedr.pipeline

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.streaming.api.scala.DataStream
import org.codefeedr.buffer.BufferType
import org.codefeedr.stages.utilities.{StringInput, StringType}
import org.codefeedr.stages.{OutputStage, OutputStage2, OutputStage3, OutputStage4}
import org.codefeedr.testUtils.CodeHitException
import org.scalatest.FunSuite
import org.scalatest.{BeforeAndAfterAll, FunSuite}

class JobTest extends FunSuite {
class JobTest extends FunSuite with BeforeAndAfterAll with EmbeddedKafka {

override def beforeAll(): Unit = {
implicit val config = EmbeddedKafkaConfig(zooKeeperPort = 2181, kafkaPort = 9092)
EmbeddedKafka.start()
}

override def afterAll(): Unit = {
EmbeddedKafka.stop()
}

class MyJob1 extends OutputStage[StringType] {
override def main(source: DataStream[StringType]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@

package org.codefeedr.pipeline

import net.manub.embeddedkafka.{EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.streaming.api.scala.DataStream
import org.codefeedr.buffer.BufferType
import org.codefeedr.stages.utilities.{StringInput, StringType}
import org.codefeedr.testUtils.CodeHitException
import org.scalatest.FunSuite
import org.scalatest.{BeforeAndAfterAll, FunSuite}

class PipelineObjectNTest extends FunSuite with BeforeAndAfterAll with EmbeddedKafka {

override def beforeAll(): Unit = {
implicit val config = EmbeddedKafkaConfig(zooKeeperPort = 2181, kafkaPort = 9092)
EmbeddedKafka.start()
}

override def afterAll(): Unit = {
EmbeddedKafka.stop()
}

class PipelineObjectNTest extends FunSuite {

class MyObject2 extends PipelineObject2[StringType, StringType, NoType] {
override def transform(source: DataStream[StringType], secondSource: DataStream[StringType]): DataStream[NoType] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,41 @@
*/
package org.codefeedr.pipeline

import com.github.sebruck.EmbeddedRedis
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.streaming.api.scala.DataStream
import org.codefeedr.buffer.{Buffer, BufferType, KafkaBuffer}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.client.JobExecutionException
import org.codefeedr.buffer.serialization.Serializer
import org.codefeedr.buffer.serialization.schema_exposure.{RedisSchemaExposer, ZookeeperSchemaExposer}
import org.codefeedr.stages.utilities.{JsonPrinterOutput, StringInput, StringType}
import org.codefeedr.testUtils._
import redis.embedded.RedisServer

import scala.collection.JavaConverters._

class PipelineTest extends FunSuite with BeforeAndAfter {
class PipelineTest extends FunSuite with BeforeAndAfter with EmbeddedKafka with EmbeddedRedis with BeforeAndAfterAll {

var builder: PipelineBuilder = _

var redis: RedisServer = null
var redisPort: Int = 0

override def beforeAll() = {
implicit val config = EmbeddedKafkaConfig(zooKeeperPort = 2181, kafkaPort = 9092)
EmbeddedKafka.start()(config)

redis = startRedis()
redisPort = redis.ports().get(0)
}

override def afterAll(): Unit = {
EmbeddedKafka.stop()
stopRedis(redis)
}

before {
builder = new PipelineBuilder()
CollectSink.result.clear()
Expand Down Expand Up @@ -97,13 +116,14 @@ class PipelineTest extends FunSuite with BeforeAndAfter {
val pipeline = simpleDAGPipeline(2)
.setBufferType(BufferType.Kafka)
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE, "true")
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE_HOST, s"redis://localhost:$redisPort")
.build()

assertThrows[JobExecutionException] {
pipeline.start(Array("-runtime", "local"))
}

val exposer = new RedisSchemaExposer("redis://localhost:6379")
val exposer = new RedisSchemaExposer(s"redis://localhost:$redisPort")

val schema1 = exposer.get("org.codefeedr.testUtils.SimpleSourcePipelineObject")
val schema2 = exposer.get("org.codefeedr.testUtils.SimpleTransformPipelineObject")
Expand All @@ -117,6 +137,7 @@ class PipelineTest extends FunSuite with BeforeAndAfter {
.setBufferType(BufferType.Kafka)
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE, "true")
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE_DESERIALIZATION, "true")
.setBufferProperty(KafkaBuffer.SCHEMA_EXPOSURE_HOST, s"redis://localhost:$redisPort")
.build()

assertThrows[JobExecutionException] {
Expand Down
Loading

0 comments on commit b735aa3

Please sign in to comment.