Skip to content

Commit

Permalink
Merge b60eb0d into 0426936
Browse files Browse the repository at this point in the history
  • Loading branch information
AbeleMM committed Aug 12, 2020
2 parents 0426936 + b60eb0d commit 9971dd6
Show file tree
Hide file tree
Showing 82 changed files with 8,402 additions and 191 deletions.
90 changes: 85 additions & 5 deletions build.sbt
Expand Up @@ -39,6 +39,7 @@ parallelExecution in Test := false

val projectPrefix = "codefeedr-"
val pluginPrefix = projectPrefix + "plugin-"
val utilPrefix = projectPrefix + "util-"

lazy val root = (project in file("."))
.settings(settings ++ noPublishSettings)
Expand All @@ -48,7 +49,14 @@ lazy val root = (project in file("."))
pluginGitHub,
pluginRabbitMQ,
pluginGHTorrent,
pluginPypi)
pluginPypi,
pluginCargo,
pluginClearlyDefined,
pluginJSON,
pluginMaven,
pluginnpm,
utilSchemaExposure
)

lazy val core = (project in file("codefeedr-core"))
.settings(
Expand Down Expand Up @@ -88,9 +96,13 @@ lazy val core = (project in file("codefeedr-core"))
dependencies.embeddedRedis,

// Embedded kafka for integration tests
dependencies.embeddedKafka
dependencies.embeddedKafka,

// ZK schema registration
dependencies.avro4s
)
)
.dependsOn(utilSchemaExposure)

lazy val pluginMongodb = (project in file("codefeedr-plugins/codefeedr-mongodb"))
.settings(
Expand Down Expand Up @@ -162,6 +174,70 @@ lazy val pluginPypi = (project in file("codefeedr-plugins/codefeedr-pypi"))
)
).dependsOn(core)

lazy val pluginCargo = (project in file("codefeedr-plugins/codefeedr-cargo"))
.settings(
name := pluginPrefix + "cargo",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
dependencies.flinkTablePlanner,
dependencies.spray
)
).dependsOn(core)

lazy val pluginClearlyDefined = (project in file("codefeedr-plugins/codefeedr-clearlydefined"))
.settings(
name := pluginPrefix + "clearlydefined",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
dependencies.flinkTablePlanner
)
).dependsOn(core, pluginMaven)

lazy val pluginJSON = (project in file("codefeedr-plugins/codefeedr-json"))
.settings(
name := pluginPrefix + "json",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
)
).dependsOn(core)

lazy val pluginMaven = (project in file("codefeedr-plugins/codefeedr-maven"))
.settings(
name := pluginPrefix + "maven",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
dependencies.flinkTablePlanner
)
).dependsOn(core)

lazy val pluginnpm = (project in file("codefeedr-plugins/codefeedr-npm"))
.settings(
name := pluginPrefix + "npm",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
dependencies.flinkTablePlanner
)
).dependsOn(core)

lazy val utilSchemaExposure = (project in file("codefeedr-util/schema-exposure"))
.settings(
name := utilPrefix + "schema-exposure",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
dependencies.avro,
dependencies.zookeeper,
dependencies.redis,
dependencies.embeddedKafka,
dependencies.embeddedRedis
)
)

lazy val dependencies =
new {
val flinkVersion = "1.9.1"
Expand Down Expand Up @@ -206,6 +282,11 @@ lazy val dependencies =
//val embeddedRabbitMQ = "io.arivera.oss" %% "embedded-rabbitmq" % "1.3.0" % Test

val avro = "org.apache.avro" % "avro" % "1.8.2"
val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % "3.1.1"

val flinkTablePlanner = "org.apache.flink" %% "flink-table-planner" % flinkVersion

val spray = "io.spray" %% "spray-json" % "1.3.4"
}

lazy val commonDependencies = Seq(
Expand All @@ -230,9 +311,9 @@ lazy val commonSettings = Seq(
test in assembly := {},
scalacOptions ++= compilerOptions,
resolvers ++= Seq(
"confluent" at "http://packages.confluent.io/maven/",
"confluent" at "https://packages.confluent.io/maven/",
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
"Artima Maven Repository" at "http://repo.artima.com/releases",
"Artima Maven Repository" at "https://repo.artima.com/releases",
Resolver.mavenLocal,
Resolver.jcenterRepo
),
Expand Down Expand Up @@ -294,4 +375,3 @@ Global / cancelable := true

// exclude Scala library from assembly
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)

3 changes: 0 additions & 3 deletions codefeedr-core/src/main/scala/org/codefeedr/Properties.scala
Expand Up @@ -26,9 +26,6 @@ object Properties {
// Conversion implicits
implicit def stringToBoolean(str: String): Boolean = str.toBoolean
implicit def booleanToString(bool: Boolean): String = bool.toString



}

/** Object containing configuration properties. */
Expand Down
Expand Up @@ -18,23 +18,31 @@
*/
package org.codefeedr.buffer

import java.util.{Optional, Properties}
import java.util.Properties

import org.apache.avro.reflect.ReflectData
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer
}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.logging.log4j.scala.Logging
import org.codefeedr.buffer.serialization.schema_exposure.{RedisSchemaExposer, SchemaExposer, ZookeeperSchemaExposer}
import org.codefeedr.pipeline.Pipeline

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import org.codefeedr.Properties._
import org.codefeedr.util.schema_exposure.{
RedisSchemaExposer,
SchemaExposer,
ZookeeperSchemaExposer
}

import scala.language.implicitConversions

/** Holds Kafka property names. */
object KafkaBuffer {
Expand Down Expand Up @@ -110,7 +118,7 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
val START_TIMESTAMP = 0x0

//SEMANTIC
val SEMANTIC : Semantic = Semantic.AT_LEAST_ONCE
val SEMANTIC: Semantic = Semantic.AT_LEAST_ONCE
}

/** Get a Kafka Consumer as source for a stage.
Expand Down Expand Up @@ -169,11 +177,16 @@ class KafkaBuffer[T <: Serializable with AnyRef: ClassTag: TypeTag](
.getOrElse[String](KafkaBuffer.BROKER, KafkaBufferDefaults.BROKER))

// Check preferred partitioning
val semantic = properties.getOrElse[Semantic](KafkaBuffer.SEMANTIC, KafkaBufferDefaults.SEMANTIC)(stringToSemantic)
val semantic = properties.getOrElse[Semantic](
KafkaBuffer.SEMANTIC,
KafkaBufferDefaults.SEMANTIC)(stringToSemantic)

// Create Kafka producer.
val producer =
new FlinkKafkaProducer[T](topic, getSerializer(topic), getKafkaProperties, semantic)
new FlinkKafkaProducer[T](topic,
getSerializer(topic),
getKafkaProperties,
semantic)
producer.setWriteTimestampToKafka(true)

producer
Expand Down
Expand Up @@ -18,7 +18,10 @@
*/
package org.codefeedr.buffer.serialization

import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.serialization.{
AbstractDeserializationSchema,
SerializationSchema
}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
Expand All @@ -33,7 +36,8 @@ abstract class AbstractSerde[T <: Serializable: ClassTag](topic: String = "")
extends AbstractDeserializationSchema[T](
TypeExtractor.createTypeInfo(
classTag[T].runtimeClass.asInstanceOf[Class[T]]))
with SerializationSchema[T] with KafkaSerializationSchema[T] {
with SerializationSchema[T]
with KafkaSerializationSchema[T] {

// Get type of class
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
Expand Down
Expand Up @@ -35,7 +35,8 @@ import scala.reflect.runtime.universe._
*
* @tparam T Type of the SerDe.
*/
class BsonSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String = "")
class BsonSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](
topic: String = "")
extends AbstractSerde[T](topic) {

// Implicitly and lazily define the serialization to JSON.
Expand Down Expand Up @@ -79,6 +80,7 @@ class BsonSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String
object BsonSerde {

/** Creates new BSON Serde. */
def apply[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String = ""): BsonSerde[T] =
def apply[T <: Serializable with AnyRef: TypeTag: ClassTag](
topic: String = ""): BsonSerde[T] =
new BsonSerde[T](topic)
}
Expand Up @@ -33,7 +33,8 @@ import scala.reflect.runtime.universe._
*
* @tparam T Type of the SerDe.
*/
class JSONSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String = "")
class JSONSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](
topic: String = "")
extends AbstractSerde[T](topic) {

// Implicitly and lazily define the serialization to JSON.
Expand Down Expand Up @@ -61,7 +62,9 @@ class JSONSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String
/** Serialize as Kafka Producer Record.
* @return ProducerRecord.
*/
override def serialize(element: T, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
override def serialize(
element: T,
timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord(topic, serialize(element))
}
}
Expand All @@ -70,6 +73,7 @@ class JSONSerde[T <: Serializable with AnyRef: TypeTag: ClassTag](topic: String
object JSONSerde {

/** Creates new JSON Serde. */
def apply[T <: Serializable with AnyRef: ClassTag: TypeTag](topic: String = ""): JSONSerde[T] =
def apply[T <: Serializable with AnyRef: ClassTag: TypeTag](
topic: String = ""): JSONSerde[T] =
new JSONSerde[T](topic)
}
Expand Up @@ -30,7 +30,8 @@ import scala.reflect.runtime.universe._
*
* @tparam T Type of the SerDe.
*/
class KryoSerde[T <: Serializable: TypeTag: ClassTag](topic: String = "") extends AbstractSerde[T](topic) {
class KryoSerde[T <: Serializable: TypeTag: ClassTag](topic: String = "")
extends AbstractSerde[T](topic) {

// Lazily retrieve kryo instance.
private lazy val kryo: KryoBase = getKryo
Expand Down Expand Up @@ -70,7 +71,7 @@ class KryoSerde[T <: Serializable: TypeTag: ClassTag](topic: String = "") extend
* @return ProducerRecord.
*/
override def serialize(element: T, timestamp: lang.Long) = {
val buffer : Array[Byte] = new Array[Byte](KryoSerde.BUFFER_SIZE)
val buffer: Array[Byte] = new Array[Byte](KryoSerde.BUFFER_SIZE)
val output = new Output(buffer)
kryo.writeObject(output, element)

Expand All @@ -83,6 +84,7 @@ object KryoSerde {
val BUFFER_SIZE = 4096

/** Creates new Kryo Serde. */
def apply[T <: Serializable: ClassTag: TypeTag](topic: String = ""): KryoSerde[T] =
def apply[T <: Serializable: ClassTag: TypeTag](
topic: String = ""): KryoSerde[T] =
new KryoSerde[T](topic)
}
Expand Up @@ -55,7 +55,9 @@ object Serializer extends Enumeration {
* @tparam T the type which has to be serialized/deserialized.
* @return the serde instance.
*/
def getSerde[T <: Serializable with AnyRef: ClassTag: TypeTag](name: String, topic: String = "") =
def getSerde[T <: Serializable with AnyRef: ClassTag: TypeTag](name: String,
topic: String =
"") =
name match {
case "JSON" => JSONSerde[T](topic)
case "BSON" => BsonSerde[T](topic)
Expand Down
Expand Up @@ -24,7 +24,10 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.logging.log4j.scala.Logging
import org.codefeedr.Properties
Expand Down

0 comments on commit 9971dd6

Please sign in to comment.