Skip to content

Commit

Permalink
cleanup the build
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Jan 23, 2022
1 parent d53510d commit 8c4c242
Show file tree
Hide file tree
Showing 19 changed files with 216 additions and 943 deletions.
32 changes: 18 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ val V = new {
val scalaTest = "3.2.10"
val scalaCheck = "3.2.9.0"
val circe = "0.14.1"
val http4s = "0.21.29"
val http4s = "0.23.7"
val enumeratum = "1.7.0"
val typesafeConfig = "1.4.1"
val guava = "31.0.1-jre" //"29.0-jre"
Expand All @@ -44,20 +44,24 @@ val V = new {

val flinkDeps =
Seq(
"org.apache.flink" %% "flink-scala" % V.flink % Provided,
"org.apache.flink" %% "flink-streaming-scala" % V.flink % Provided,
"org.apache.flink" %% "flink-scala" % V.flink % Provided, // scala
"org.apache.flink" %% "flink-streaming-scala" % V.flink % Provided, // ds api scala
"org.apache.flink" %% "flink-table-api-scala-bridge" % V.flink % Provided, // table api scala
"org.apache.flink" %% "flink-statebackend-rocksdb" % V.flink % Provided,
"org.apache.flink" %% "flink-cep-scala" % V.flink % Provided,
"org.apache.flink" %% "flink-table-planner" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-kafka" % V.flink,
"org.apache.flink" %% "flink-connector-kinesis" % V.flink,
"org.apache.flink" %% "flink-connector-cassandra" % V.flink,
"org.apache.flink" %% "flink-connector-elasticsearch7" % V.flink,
"org.apache.flink" %% "flink-connector-jdbc" % V.flink,
"org.apache.flink" %% "flink-connector-rabbitmq" % V.flink,
"org.apache.flink" % "flink-connector-files" % V.flink,
"org.apache.flink" %% "flink-table-api-scala-bridge" % V.flink,
"org.apache.flink" %% "flink-statebackend-rocksdb" % V.flink,
"org.apache.flink" % "flink-avro-confluent-registry" % V.flink,
"org.apache.flink" %% "flink-table-planner" % V.flink % Provided, // table api
"org.apache.flink" % "flink-connector-base" % V.flink % Provided, // ds hybrid source
"org.apache.flink" % "flink-connector-files" % V.flink % Provided, // ds text files
"org.apache.flink" %% "flink-connector-kafka" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-kinesis" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-cassandra" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-elasticsearch7" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-jdbc" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-rabbitmq" % V.flink % Provided,
"org.apache.flink" % "flink-csv" % V.flink % Provided, // table api csv format
"org.apache.flink" % "flink-json" % V.flink % Provided, // table api json format
"org.apache.flink" % "flink-avro" % V.flink % Provided, // ds and table avro format
"org.apache.flink" % "flink-avro-confluent-registry" % V.flink % Provided, // ds and table avro registry format
"org.apache.flink" %% "flink-test-utils" % V.flink % Test
)

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.5
sbt.version=1.5.8
8 changes: 0 additions & 8 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.epiphanous.flinkrunner

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.avro.AvroCoder
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
import io.epiphanous.flinkrunner.util.BoundedLatenessWatermarkStrategy
Expand Down Expand Up @@ -216,13 +215,6 @@ class FlinkRunner[ADT <: FlinkEvent](
name: String): Option[RMQSinkPublishOptions[E]] =
factory.getRabbitPublishOptions[E](name, config)

@deprecated(
"Use the ConfluentAvroRegistryKafkaRecordSerialization and ...Deserialization classes instead",
"4.0.0"
)
def getAvroCoder(name: String): AvroCoder[_] =
factory.getAvroCoder(name, config)

val RESOURCE_PATTERN: Regex = "resource://(.*)".r

/**
Expand Down
165 changes: 161 additions & 4 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.epiphanous.flinkrunner

import io.epiphanous.flinkrunner.avro.AvroCoder
import io.epiphanous.flinkrunner.flink.BaseFlinkJob
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
Expand All @@ -25,64 +24,222 @@ import org.apache.flink.streaming.connectors.rabbitmq.{
RMQSinkPublishOptions
}

/**
* A trait for creating a factory of jobs and related serialization and
* deserialization schemas for those jobs. To successfully use this
* library, implement the required methods of this trait.
* @tparam ADT
* an algebraic data type fall all input and output job types
*/
trait FlinkRunnerFactory[ADT <: FlinkEvent] {

/**
* Provide an instance of a named flink job
* @param name
* name of the job
* @param runner
* a [[FlinkRunner]] [ADT]
* @tparam DS
* a flink data stream input type
* @tparam OUT
* an ADT type that is the output type of the job
* @return
* Subclass of [[BaseFlinkJob]] [DS, OUT, ADT]
*/
def getJobInstance[DS, OUT <: ADT](
name: String,
runner: FlinkRunner[ADT]): BaseFlinkJob[DS, OUT, ADT]

/**
* Provide a deserialization schema for a kafka or kinesis topic
* @param name
* name of the source
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[DeserializationSchema]] [E]
*/
def getDeserializationSchema[E <: ADT](
name: String,
config: FlinkConfig): DeserializationSchema[E] = ???

/**
* Provide a deserialization schema for a kafka source
* @param name
* name of the source
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[KafkaDeserializationSchema]] [E]
*/
def getKafkaDeserializationSchema[E <: ADT](
name: String,
config: FlinkConfig): KafkaDeserializationSchema[E] =
???

/**
* Provide a record serialization schema for a kafka sink
* @param name
* name of the sink
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[KafkaRecordSerializationSchema]] [E]
*/
def getKafkaRecordSerializationSchema[E <: ADT](
name: String,
config: FlinkConfig): KafkaRecordSerializationSchema[E] = ???

/**
* Provide a record deserialization schema for a kafka source
* @param name
* the name of the source
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[KafkaRecordDeserializationSchema]] [E]
*/
def getKafkaRecordDeserializationSchema[E <: ADT](
name: String,
config: FlinkConfig): KafkaRecordDeserializationSchema[E] = ???

/**
* Provide a deserialization schema for a kinesis source
* @param name
* name of the kinesis source
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[KinesisDeserializationSchema]] [E]
*/
def getKinesisDeserializationSchema[E <: ADT](
name: String,
config: FlinkConfig): KinesisDeserializationSchema[E] = ???

/**
* Provide a serialization schema for writing to a kafka or kinesis sink
* @param name
* name of the sink
* @param config
* a [[FlinkConfig]]
* @tparam E
* ad ADT type
* @return
* [[SerializationSchema]] [E]
*/
def getSerializationSchema[E <: ADT](
name: String,
config: FlinkConfig): SerializationSchema[E] = ???

/**
* Provide a kafka serialization schema for writing to a kafka sink
* @param name
* the name of the kafka sink
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[KafkaSerializationSchema]] [E]
*/
def getKafkaSerializationSchema[E <: ADT](
name: String,
config: FlinkConfig): KafkaSerializationSchema[E] = ???

/**
* Provide a kinesis serialization schema for writing to a kinesis sink
* @param name
* the name of the kinesis sink
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[KinesisSerializationSchema]] [E]
*/
def getKinesisSerializationSchema[E <: ADT](
name: String,
config: FlinkConfig): KinesisSerializationSchema[E] = ???

/**
* Provide an encoder to write to a streaming file sink
* @param name
* the name of the streaming file sink
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[Encoder]] [E]
*/
def getEncoder[E <: ADT](name: String, config: FlinkConfig): Encoder[E] =
???

/**
* Provide a function to add an event to a jdbc sink
* @param name
* name of the jdbc sink
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[AddToJdbcBatchFunction]] [E]
*/
def getAddToJdbcBatchFunction[E <: ADT](
name: String,
config: FlinkConfig): AddToJdbcBatchFunction[E] = ???

/**
* Provide a flink bucket assigner for writing to a streaming file sink
* @param name
* the name of the streaming file sink
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[BucketAssigner]] [E, String]
*/
def getBucketAssigner[E <: ADT](
name: String,
config: FlinkConfig): BucketAssigner[E, String] =
???

def getAvroCoder(name: String, config: FlinkConfig): AvroCoder[_] =
???

/**
* Provide a deserialization schema for reading from a rabbit mq source
* @param name
* the source name
* @param config
* a [[FlinkConfig]]
* @tparam E
* an ADT type
* @return
* [[RMQDeserializationSchema]] [E]
*/
def getRMQDeserializationSchema[E <: ADT](
name: String,
config: FlinkConfig): RMQDeserializationSchema[E] = ???

/**
* Provide an optional rabbit mq sink publish options object
* @param name
* Name of the rabbit sink
* @param config
* [[FlinkConfig]]
* @tparam E
* @return [[Option]] `[` [[RMQSinkPublishOptions]] [E] `]`
*/
def getRabbitPublishOptions[E <: ADT](
name: String,
config: FlinkConfig): Option[RMQSinkPublishOptions[E]] = None
Expand Down
64 changes: 0 additions & 64 deletions src/main/scala/io/epiphanous/flinkrunner/avro/AvroCoder.scala

This file was deleted.

This file was deleted.

Loading

0 comments on commit 8c4c242

Please sign in to comment.