Skip to content

Commit

Permalink
added rabbitmq source/sink and modernized kafka src/sink, jdbc sink, …
Browse files Browse the repository at this point in the history
…elastic sink
  • Loading branch information
nextdude committed Dec 10, 2021
1 parent 988a283 commit 000e3d6
Show file tree
Hide file tree
Showing 16 changed files with 494 additions and 123 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
## Maven Dependency

`Flinkrunner 4` is [available on maven central](https://mvnrepository.com/artifact/io.epiphanous/flinkrunner_2.12),
built against Flink 1.13 with Scala 2.12 and JDK 11.
built against Flink 1.14 with Scala 2.12 and JDK 11.

```sbtshell
libraryDependencies += "io.epiphanous" %% "flinkrunner" % <flinkrunner-version>
Expand Down Expand Up @@ -201,10 +201,10 @@ abstract class BaseFlinkJob[DS, OUT <: FlinkEvent: TypeInformation] extends Lazy
* @param config implicit flink job config
* @return data output stream
*/
def flow()(implicit config: FlinkConfig, env: SEE): DataStream[OUT] =
def flow()(implicit config: FlinkConfig, env: StreamExecutionEnvironment): DataStream[OUT] =
source |> transform |# maybeSink

def run()(implicit config: FlinkConfig, env: SEE): Either[Iterator[OUT], Unit] = {
def run()(implicit config: FlinkConfig, env: StreamExecutionEnvironment): Either[Iterator[OUT], Unit] = {

logger.info(s"\nSTARTING FLINK JOB: ${config.jobName} ${config.jobArgs.mkString(" ")}\n")

Expand All @@ -222,7 +222,7 @@ abstract class BaseFlinkJob[DS, OUT <: FlinkEvent: TypeInformation] extends Lazy
* Returns source data stream to pass into [[transform()]]. This must be overridden by subclasses.
* @return input data stream
*/
def source()(implicit config: FlinkConfig, env: SEE): DS
def source()(implicit config: FlinkConfig, env: StreamExecutionEnvironment): DS

/**
* Primary method to transform the source data stream into the output data stream. The output of
Expand All @@ -232,15 +232,15 @@ abstract class BaseFlinkJob[DS, OUT <: FlinkEvent: TypeInformation] extends Lazy
* @param config implicit flink job config
* @return output data stream
*/
def transform(in: DS)(implicit config: FlinkConfig, env: SEE): DataStream[OUT]
def transform(in: DS)(implicit config: FlinkConfig, env: StreamExecutionEnvironment): DataStream[OUT]

/**
* Writes the transformed data stream to configured output sinks.
*
* @param out a transformed stream from [[transform()]]
* @param config implicit flink job config
*/
def sink(out: DataStream[OUT])(implicit config: FlinkConfig, env: SEE): Unit =
def sink(out: DataStream[OUT])(implicit config: FlinkConfig, env: StreamExecutionEnvironment): Unit =
config.getSinkNames.foreach(name => out.toSink(name))

/**
Expand All @@ -250,7 +250,7 @@ abstract class BaseFlinkJob[DS, OUT <: FlinkEvent: TypeInformation] extends Lazy
* @param out the output data stream to pass into [[sink()]]
* @param config implicit flink job config
*/
def maybeSink(out: DataStream[OUT])(implicit config: FlinkConfig, env: SEE): Unit =
def maybeSink(out: DataStream[OUT])(implicit config: FlinkConfig, env: StreamExecutionEnvironment): Unit =
if (!config.mockEdges) sink(out)

}
Expand All @@ -274,7 +274,7 @@ abstract class FlinkJob[IN <: FlinkEvent: TypeInformation, OUT <: FlinkEvent: Ty
* Returns source data stream to pass into [[transform()]]. This can be overridden by subclasses.
* @return input data stream
*/
def source()(implicit config: FlinkConfig, env: SEE): DataStream[IN] =
def source()(implicit config: FlinkConfig, env: StreamExecutionEnvironment): DataStream[IN] =
fromSource[IN](getEventSourceName) |# maybeAssignTimestampsAndWatermarks

}
Expand Down
25 changes: 15 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,21 @@ val V = new {

val flinkDeps =
Seq(
"org.apache.flink" %% s"flink-scala" % V.flink % Provided,
"org.apache.flink" %% s"flink-streaming-scala" % V.flink % Provided,
"org.apache.flink" %% s"flink-cep-scala" % V.flink % Provided,
"org.apache.flink" %% s"flink-connector-kafka" % V.flink,
"org.apache.flink" %% s"flink-connector-kinesis" % V.flink,
"org.apache.flink" %% s"flink-connector-cassandra" % V.flink,
"org.apache.flink" %% s"flink-connector-elasticsearch7" % V.flink,
"org.apache.flink" %% s"flink-statebackend-rocksdb" % V.flink,
"org.apache.flink" % s"flink-avro-confluent-registry" % V.flink,
"org.apache.flink" %% s"flink-test-utils" % V.flink % Test
"org.apache.flink" %% "flink-scala" % V.flink % Provided,
"org.apache.flink" %% "flink-streaming-scala" % V.flink % Provided,
"org.apache.flink" %% "flink-cep-scala" % V.flink % Provided,
"org.apache.flink" %% "flink-table-planner" % V.flink % Provided,
"org.apache.flink" %% "flink-connector-kafka" % V.flink,
"org.apache.flink" %% "flink-connector-kinesis" % V.flink,
"org.apache.flink" %% "flink-connector-cassandra" % V.flink,
"org.apache.flink" %% "flink-connector-elasticsearch7" % V.flink,
"org.apache.flink" %% "flink-connector-jdbc" % V.flink,
"org.apache.flink" %% "flink-connector-rabbitmq" % V.flink,
"org.apache.flink" % "flink-connector-files" % V.flink,
"org.apache.flink" %% "flink-table-api-scala-bridge" % V.flink,
"org.apache.flink" %% "flink-statebackend-rocksdb" % V.flink,
"org.apache.flink" % "flink-avro-confluent-registry" % V.flink,
"org.apache.flink" %% "flink-test-utils" % V.flink % Test
)

val loggingDeps = Seq(
Expand Down
Loading

0 comments on commit 000e3d6

Please sign in to comment.