Skip to content

Commit

Permalink
broadcast state pattern (#11)
Browse files Browse the repository at this point in the history
* implement broadcast state pattern

* implement getBroadcastStateDescriptor

* fix class bounds

* fix process function instantiation

* update runner factory

* fixed failure to propagate watermarks and cleaned up warnings

* clean up for release

* clean up comment references
  • Loading branch information
nextdude authored Sep 20, 2019
1 parent e5c27a6 commit 5b9809a
Show file tree
Hide file tree
Showing 31 changed files with 585 additions and 1,552 deletions.
133 changes: 101 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@

## Maven Dependency

`Flinkrunner` `v1.6.8` is now on maven central, built against Flink 1.8.0 with Scala 2.11 and JDK 8.
`Flinkrunner` `v2.0.0` is now on maven central, built against Flink 1.8.2 with Scala 2.11 and JDK 8.

```sbtshell
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.8"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "2.0.0"
```

>The apache flink project doesn't include its AWS Kinesis connector on maven
Expand All @@ -68,14 +68,14 @@ Flink kinesis connector) from source. To do so,
```

* Checkout the tag of `FlinkRunner` you want to build. The most recent stable version is
`v1.6.8`, but you can ensure you have the most recent tags with `git fetch --tags` and
`v2.0.0`, but you can ensure you have the most recent tags with `git fetch --tags` and
list tags with `git tag -l`, then

```bash
git checkout tags/v1.6.8 -b my-build-v1.6.8
git checkout tags/v2.0.0 -b my-build-v2.0.0
```

This will create a new local branch `my-build-v1.6.8` based on the `v1.6.8` tag release.
This will create a new local branch `my-build-v2.0.0` based on the `v2.0.0` tag release.

* Build `FlinkRunner` and install it locally, using the `-Dwith.kinesis=true` option

Expand All @@ -92,7 +92,7 @@ Flink kinesis connector) from source. To do so,
resolvers += "Local Maven Repository" at "file://" +
Path.userHome.absolutePath + "/.m2/repository"
...
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.8k"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "2.0.0k"
// notice no v here ---^^ ^^---k for kinesis
```
Expand Down Expand Up @@ -227,42 +227,111 @@ you don't have to write any code at all to setup sources and sinks, as these are
grokked by `FlinkRunner` from the config.
* Write your jobs! This is the fun part. You job will inherit from one of `FlinkRunner`'s
job classes. The top level `FlinkRunner` job class is called `FlinkJob` and has the
job classes. The top level `FlinkRunner` job class is called `BaseFlinkJob` and has the
following interface:
```scala
abstract class FlinkJob[IN <: FlinkEvent: TypeInformation, OUT <: FlinkEvent: TypeInformation] extends LazyLogging {
```scala
/**
* An abstract flink job to transform on an input stream into an output stream.
*
* @tparam DS The type of the input stream
* @tparam OUT The type of output stream elements
*/
abstract class BaseFlinkJob[DS, OUT <: FlinkEvent: TypeInformation] extends LazyLogging {
/**
* A pipeline for transforming a single stream. Passes the output of [[source()]]
* through [[transform()]] and the result of that into [[maybeSink()]], which may pass it
* into [[sink()]] if we're not testing. Ultimately, returns the output data stream to
* facilitate testing.
*
* @param config implicit flink job config
* @return data output stream
*/
def flow()(implicit config: FlinkConfig, env: SEE): DataStream[OUT] =
source |> transform |# maybeSink
def run()(implicit config: FlinkConfig, env: SEE): Either[Iterator[OUT], Unit] = {
logger.info(s"\nSTARTING FLINK JOB: ${config.jobName} ${config.jobArgs.mkString(" ")}\n")
val stream = flow
if (config.showPlan) logger.info(s"PLAN:\n${env.getExecutionPlan}\n")
if (config.mockEdges)
Left(DataStreamUtils.collect(stream.javaStream).asScala)
else
Right(env.execute(config.jobName))
}
// the entry point called by flink runner; invokes the flow
// and if mock.edges is true, returns an iterator of results
def run():Either[Iterator[OUT],Unit]
/**
* Returns source data stream to pass into [[transform()]]. This must be overridden by subclasses.
* @return input data stream
*/
def source()(implicit config: FlinkConfig, env: SEE): DS
/**
* Primary method to transform the source data stream into the output data stream. The output of
* this method is passed into [[sink()]]. This method must be overridden by subclasses.
*
* @param in input data stream created by [[source()]]
* @param config implicit flink job config
* @return output data stream
*/
def transform(in: DS)(implicit config: FlinkConfig, env: SEE): DataStream[OUT]
/**
* Writes the transformed data stream to configured output sinks.
*
* @param out a transformed stream from [[transform()]]
* @param config implicit flink job config
*/
def sink(out: DataStream[OUT])(implicit config: FlinkConfig, env: SEE): Unit =
config.getSinkNames.foreach(name => out.toSink(name))
/**
* The output stream will only be passed to [[sink()]] if [[FlinkConfig.mockEdges]] evaluates
* to false (ie, you're not testing).
*
* @param out the output data stream to pass into [[sink()]]
* @param config implicit flink job config
*/
def maybeSink(out: DataStream[OUT])(implicit config: FlinkConfig, env: SEE): Unit =
if (!config.mockEdges) sink(out)
}
```
// this is its actual code...sets up the execution plan
def flow():DataStream[OUT] =
source |> transform |# maybeSink
More commonly you'll extend from `FlinkJob[IN,OUT]` where `IN` and `OUT` are classes in your ADT.
// reads from first configured sink and assigns timestamps
// and watermarks if needed
def source():DataStream[IN]
```scala
/**
* An abstract flink job to transform on a stream of events from an algebraic data type (ADT).
*
* @tparam IN The type of input stream elements
* @tparam OUT The type of output stream elements
*/
abstract class FlinkJob[IN <: FlinkEvent: TypeInformation, OUT <: FlinkEvent: TypeInformation]
extends BaseFlinkJob[DataStream[IN], OUT] {
// called by source(), uses configuration to do its thing
def maybeAssignTimestampsAndWatermarks(in: DataStream[IN]):Unit
def getEventSourceName(implicit config: FlinkConfig) = config.getSourceNames.headOption.getOrElse("events")
// writes to first configured sink
def sink(out:DataStream[OUT]):Unit
/**
* Returns source data stream to pass into [[transform()]]. This can be overridden by subclasses.
* @return input data stream
*/
def source()(implicit config: FlinkConfig, env: SEE): DataStream[IN] =
fromSource[IN](getEventSourceName) |# maybeAssignTimestampsAndWatermarks
// only adds the sink to the execution plan of mock.edges is fals
def maybeSink():Unit
}
```
// no implementation provided...
// this is what your job implements
def transform(in:DataStream[IN]):DataStream[OUT]
}
```
Where as `FlinkJob` requires your input stream to be a `DataStream[IN]`, `BaseFlinkJob` let's you set the type of
input stream, allowing you to use other, more complicated stream types in Flink, such as `ConnectedStreams[IN1,IN2]` or `BroadcastStream[B,D]`.
While you're free to override any of these methods in your job,
usually you just need to provide a `transform()` method that
converts your `DataStream[IN]` to a `DataStream[OUT]`.
While you're free to override any of these methods in your job, usually you just need extend `FlinkJob` and provide a `transform()`
method that converts your `DataStream[IN]` to a `DataStream[OUT]`.
> TODO: Finish README.
27 changes: 18 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,22 @@ Test / fork := true
resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"

val V = new {
val flink = "1.8.0"
val flink = "1.8.2"
val logback = "1.2.3"
val log4jOverSlf4j = "1.7.26"
val scalaLogging = "3.9.2"
val scalaTest = "3.0.8"
val scalaCheck = "1.14.0"
val circe = "0.11.1"
val http4s = "0.20.10"
val enumeratum = "1.5.13"
val typesafeConfig = "1.3.4"
// val guava = "27.0.1-jre"
val guava = "24.1-jre"
val squants = "1.3.0"
val antlr4 = "4.7.1"
}


enablePlugins(Antlr4Plugin)
antlr4Version in Antlr4 := V.antlr4
antlr4PackageName in Antlr4 := Some("io.epiphanous.antlr4")

val withK = Seq("true","1","yes","y").exists(
_.equalsIgnoreCase(System.getProperty("with.kinesis", "false"))
)
Expand All @@ -51,8 +47,9 @@ val maybeKinesis = if (withK) Seq("connector-kinesis") else Seq.empty[String]

// post-process version to add k suffix if we're building with kinesis
val versionSuffix = if (withK) "k" else ""
version in ThisBuild ~= (v => v.replaceFirst("^(v?\\d(\\.\\d){2})(?=[^k])",s"$$1$versionSuffix") + versionSuffix)
dynver in ThisBuild ~= (v => v.replaceFirst("^(v?\\d(\\.\\d){2})(?=[^k])",s"$$1$versionSuffix") + versionSuffix)
def suffixedVersion(v:String) = v + versionSuffix
version in ThisBuild ~= suffixedVersion
dynver in ThisBuild ~= suffixedVersion

val flinkDeps = (
(Seq("scala", "streaming-scala", "cep-scala") ++ maybeKinesis).map(a =>
Expand Down Expand Up @@ -81,11 +78,23 @@ val otherDeps = Seq("com.beachape" %% "enumeratum" % V.enumeratum,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"org.scalactic" %% "scalactic" % V.scalaTest % Test,
"org.scalatest" %% "scalatest" % V.scalaTest % Test)
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck % Test
)

lazy val flink_runner =
(project in file(".")).settings(libraryDependencies ++= flinkDeps ++ loggingDeps ++ http4sDeps ++ otherDeps)

scalacOptions ++= Seq(
"-encoding","utf8",
"-deprecation",
"-Xfuture",
"-Ywarn-dead-code",
"-Ywarn-numeric-widen",
"-Ywarn-unused",
"-Ywarn-value-discard"
)

// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true
Expand Down
7 changes: 3 additions & 4 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.2")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.2.6")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.0")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.2.6")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.0")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")
Loading

0 comments on commit 5b9809a

Please sign in to comment.