Skip to content

Commit

Permalink
Merge pull request #10 from epiphanous/add-custom-bucketer
Browse files Browse the repository at this point in the history
custom bucket assigner feature
  • Loading branch information
nextdude committed Jul 14, 2019
2 parents cfa026b + 1bd7298 commit 8a57804
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 23 deletions.
34 changes: 17 additions & 17 deletions README.md
Expand Up @@ -16,7 +16,7 @@
</a>
<!-- maven central -->
<a href="https://mvnrepository.com/artifact/io.epiphanous/flinkrunner">
<img src="https://img.shields.io/maven-central/v/io.epiphanous/flinkrunner.svg" alt="maven" />
<img src="https://img.shields.io/maven-central/v/io.epiphanous/flinkrunner_2.11.svg" alt="maven" />
</a>
<!-- last commit -->
<a href="https://github.com/epiphanous/flinkrunner/commits" title="Last Commit">
Expand All @@ -40,10 +40,10 @@

## Maven Dependency

`Flinkrunner` `v1.6.5` is now on maven central, built against Flink 1.7.2 with Scala 2.11 and JDK 8.
`Flinkrunner` `v1.6.6` is now on maven central, built against Flink 1.7.2 with Scala 2.11 and JDK 8.

```sbtshell
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.5"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.6"
```

>The apache flink project doesn't include its AWS Kinesis connector on maven
Expand All @@ -55,10 +55,10 @@ In order to use Kinesis with `FlinkRunner`, please follow the instructions in th
Due to licensing restrictions, if you want to use AWS Kinesis with `FlinkRunner` you have to build it (and the
Flink kinesis connector) from source. To do so,

* First, you'll need to build a local copy of Flink's kinesis connector. See
* First, you'll need to build a local copy of Flink's kinesis connector. See
[these instructions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html)
for details on how to accomplish that.

> Note that building Flink with Kinesis can take over 30 minutes! It's a very big project.
* Clone the `FlinkRunner` repo:
Expand All @@ -68,34 +68,34 @@ Flink kinesis connector) from source. To do so,
```

* Checkout the tag of `FlinkRunner` you want to build. The most recent stable version is
`v1.6.5`, but you can ensure you have the most recent tags with `git fetch --tags` and
`v1.6.6`, but you can ensure you have the most recent tags with `git fetch --tags` and
list tags with `git tag -l`, then

```bash
git checkout tags/v1.6.5 -b my-build-v1.6.5
git checkout tags/v1.6.6 -b my-build-v1.6.6
```
This will create a new local branch `my-build-v1.6.5` based on the `v1.6.5` tag release.
* Build `FlinkRunner` and install it locally, using the `--with.kinesis=true` option

This will create a new local branch `my-build-v1.6.6` based on the `v1.6.6` tag release.

* Build `FlinkRunner` and install it locally, using the `-Dwith.kinesis=true` option

```bash
sbt --with.kinesis=true publishLocal
sbt -Dwith.kinesis=true publishLocal
```

This will install `FlinkRunner` with kinesis support in your local repo.

* In your project's build file, add a resolver to your local repo and add the local
`FlinkRunner` dependency:

```sbtshell
resolvers += "Local Maven Repository" at "file://" +
Path.userHome.absolutePath + "/.m2/repository"
Path.userHome.absolutePath + "/.m2/repository"
...
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.5k"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.6k"
// notice no v here ---^^ ^^---k for kinesis
```


## What is FlinkRunner?

Expand Down
@@ -1,8 +1,12 @@
package io.epiphanous.flinkrunner
import java.util.Properties

import io.epiphanous.flinkrunner.flink.FlinkJob
import io.epiphanous.flinkrunner.model.FlinkEvent
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema}
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.util.serialization.{KeyedDeserializationSchema, KeyedSerializationSchema}

trait FlinkRunnerFactory[ADT <: FlinkEvent] {
Expand All @@ -20,4 +24,6 @@ trait FlinkRunnerFactory[ADT <: FlinkEvent] {
def getEncoder: Encoder[ADT] = ???

def getAddToJdbcBatchFunction: AddToJdbcBatchFunction[ADT] = ???

def getBucketAssigner(props: Properties): BucketAssigner[ADT, String] = ???
}
Expand Up @@ -147,6 +147,7 @@ class FlinkConfig(
def getEncoder = factory.getEncoder
def getAddToJdbcBatchFunction =
factory.getAddToJdbcBatchFunction
def getBucketAssigner(p: Properties) = factory.getBucketAssigner(p)

def getSourceConfig(name: String): SourceConfig = SourceConfig(name, this)
def getSinkConfig(name: String): SinkConfig = SinkConfig(name, this)
Expand Down
11 changes: 5 additions & 6 deletions src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala
Expand Up @@ -11,11 +11,11 @@ import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.{
BasePathBucketAssigner,
DateTimeBucketAssigner
}
import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink}
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.{
DefaultRollingPolicy,
OnCheckpointRollingPolicy
Expand Down Expand Up @@ -344,16 +344,15 @@ object StreamUtils extends LazyLogging {
sinkConfig: FileSinkConfig
)(implicit config: FlinkConfig
): DataStreamSink[E] = {

val path = sinkConfig.path
val p = sinkConfig.properties
val bucketCheckInterval = p.getProperty("bucket.check.interval", s"${60000}").toLong
val bucketAssigner = p.getProperty("bucket.assigner", "datetime") match {
val bucketAssigner = p.getProperty("bucket.assigner.type", "datetime") match {
case "none" => new BasePathBucketAssigner[E]()
case "datetime" =>
val bucketFormat = p.getProperty("bucket.assigner.datetime.format", "yyyy/MM/dd/HH")
new DateTimeBucketAssigner[E](bucketFormat)
case assigner => throw new IllegalArgumentException(s"Unknown bucket assigner type: '$assigner'")
new DateTimeBucketAssigner[E](p.getProperty("bucket.assigner.datetime.format", "YYYY/MM/DD/HH"))
case "custom" => config.getBucketAssigner(p).asInstanceOf[BucketAssigner[E, String]]
case other => throw new IllegalArgumentException(s"Unknown bucket assigner type '$other'.")
}
val encoderFormat = p.getProperty("encoder.format", "row")
val sink = encoderFormat match {
Expand Down

0 comments on commit 8a57804

Please sign in to comment.