diff --git a/README.md b/README.md index 2ca5549..95efe13 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ - maven + maven @@ -40,10 +40,10 @@ ## Maven Dependency -`Flinkrunner` `v1.6.5` is now on maven central, built against Flink 1.7.2 with Scala 2.11 and JDK 8. +`Flinkrunner` `v1.6.6` is now on maven central, built against Flink 1.7.2 with Scala 2.11 and JDK 8. ```sbtshell -libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.5" +libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.6" ``` >The apache flink project doesn't include its AWS Kinesis connector on maven @@ -55,10 +55,10 @@ In order to use Kinesis with `FlinkRunner`, please follow the instructions in th Due to licensing restrictions, if you want to use AWS Kinesis with `FlinkRunner` you have to build it (and the Flink kinesis connector) from source. To do so, -* First, you'll need to build a local copy of Flink's kinesis connector. See +* First, you'll need to build a local copy of Flink's kinesis connector. See [these instructions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html) for details on how to accomplish that. - + > Note that building Flink with Kinesis can take over 30 minutes! It's a very big project. * Clone the `FlinkRunner` repo: @@ -68,21 +68,21 @@ Flink kinesis connector) from source. To do so, ``` * Checkout the tag of `FlinkRunner` you want to build. The most recent stable version is - `v1.6.5`, but you can ensure you have the most recent tags with `git fetch --tags` and + `v1.6.6`, but you can ensure you have the most recent tags with `git fetch --tags` and list tags with `git tag -l`, then - + ```bash - git checkout tags/v1.6.5 -b my-build-v1.6.5 + git checkout tags/v1.6.6 -b my-build-v1.6.6 ``` - - This will create a new local branch `my-build-v1.6.5` based on the `v1.6.5` tag release. - -* Build `FlinkRunner` and install it locally, using the `--with.kinesis=true` option + + This will create a new local branch `my-build-v1.6.6` based on the `v1.6.6` tag release. + +* Build `FlinkRunner` and install it locally, using the `-Dwith.kinesis=true` option ```bash - sbt --with.kinesis=true publishLocal + sbt -Dwith.kinesis=true publishLocal ``` - + This will install `FlinkRunner` with kinesis support in your local repo. * In your project's build file, add a resolver to your local repo and add the local @@ -90,12 +90,12 @@ Flink kinesis connector) from source. To do so, ```sbtshell resolvers += "Local Maven Repository" at "file://" + - Path.userHome.absolutePath + "/.m2/repository" + Path.userHome.absolutePath + "/.m2/repository" ... - libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.5k" + libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.6k" // notice no v here ---^^ ^^---k for kinesis ``` - + ## What is FlinkRunner? diff --git a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala index f3db4ad..73f674e 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala @@ -1,8 +1,12 @@ package io.epiphanous.flinkrunner +import java.util.Properties + import io.epiphanous.flinkrunner.flink.FlinkJob import io.epiphanous.flinkrunner.model.FlinkEvent import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema} +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner import org.apache.flink.streaming.util.serialization.{KeyedDeserializationSchema, KeyedSerializationSchema} trait FlinkRunnerFactory[ADT <: FlinkEvent] { @@ -20,4 +24,6 @@ trait FlinkRunnerFactory[ADT <: FlinkEvent] { def getEncoder: Encoder[ADT] = ??? def getAddToJdbcBatchFunction: AddToJdbcBatchFunction[ADT] = ??? + + def getBucketAssigner(props: Properties): BucketAssigner[ADT, String] = ??? } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala index 3693a4b..44c8920 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala @@ -147,6 +147,7 @@ class FlinkConfig( def getEncoder = factory.getEncoder def getAddToJdbcBatchFunction = factory.getAddToJdbcBatchFunction + def getBucketAssigner(p: Properties) = factory.getBucketAssigner(p) def getSourceConfig(name: String): SourceConfig = SourceConfig(name, this) def getSinkConfig(name: String): SinkConfig = SinkConfig(name, this) diff --git a/src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala b/src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala index 3761ecc..3739d2f 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala @@ -11,11 +11,11 @@ import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.datastream.DataStreamSink -import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.{ BasePathBucketAssigner, DateTimeBucketAssigner } +import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink} import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.{ DefaultRollingPolicy, OnCheckpointRollingPolicy @@ -344,16 +344,15 @@ object StreamUtils extends LazyLogging { sinkConfig: FileSinkConfig )(implicit config: FlinkConfig ): DataStreamSink[E] = { - val path = sinkConfig.path val p = sinkConfig.properties val bucketCheckInterval = p.getProperty("bucket.check.interval", s"${60000}").toLong - val bucketAssigner = p.getProperty("bucket.assigner", "datetime") match { + val bucketAssigner = p.getProperty("bucket.assigner.type", "datetime") match { case "none" => new BasePathBucketAssigner[E]() case "datetime" => - val bucketFormat = p.getProperty("bucket.assigner.datetime.format", "yyyy/MM/dd/HH") - new DateTimeBucketAssigner[E](bucketFormat) - case assigner => throw new IllegalArgumentException(s"Unknown bucket assigner type: '$assigner'") + new DateTimeBucketAssigner[E](p.getProperty("bucket.assigner.datetime.format", "YYYY/MM/DD/HH")) + case "custom" => config.getBucketAssigner(p).asInstanceOf[BucketAssigner[E, String]] + case other => throw new IllegalArgumentException(s"Unknown bucket assigner type '$other'.") } val encoderFormat = p.getProperty("encoder.format", "row") val sink = encoderFormat match {