Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom bucket assigner feature #10

Merged
merged 1 commit into from
Jul 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 17 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</a>
<!-- maven central -->
<a href="https://mvnrepository.com/artifact/io.epiphanous/flinkrunner">
<img src="https://img.shields.io/maven-central/v/io.epiphanous/flinkrunner.svg" alt="maven" />
<img src="https://img.shields.io/maven-central/v/io.epiphanous/flinkrunner_2.11.svg" alt="maven" />
</a>
<!-- last commit -->
<a href="https://github.com/epiphanous/flinkrunner/commits" title="Last Commit">
Expand All @@ -40,10 +40,10 @@

## Maven Dependency

`Flinkrunner` `v1.6.5` is now on maven central, built against Flink 1.7.2 with Scala 2.11 and JDK 8.
`Flinkrunner` `v1.6.6` is now on maven central, built against Flink 1.7.2 with Scala 2.11 and JDK 8.

```sbtshell
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.5"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.6"
```

>The apache flink project doesn't include its AWS Kinesis connector on maven
Expand All @@ -55,10 +55,10 @@ In order to use Kinesis with `FlinkRunner`, please follow the instructions in th
Due to licensing restrictions, if you want to use AWS Kinesis with `FlinkRunner` you have to build it (and the
Flink kinesis connector) from source. To do so,

* First, you'll need to build a local copy of Flink's kinesis connector. See
* First, you'll need to build a local copy of Flink's kinesis connector. See
[these instructions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html)
for details on how to accomplish that.

> Note that building Flink with Kinesis can take over 30 minutes! It's a very big project.

* Clone the `FlinkRunner` repo:
Expand All @@ -68,34 +68,34 @@ Flink kinesis connector) from source. To do so,
```

* Checkout the tag of `FlinkRunner` you want to build. The most recent stable version is
`v1.6.5`, but you can ensure you have the most recent tags with `git fetch --tags` and
`v1.6.6`, but you can ensure you have the most recent tags with `git fetch --tags` and
list tags with `git tag -l`, then

```bash
git checkout tags/v1.6.5 -b my-build-v1.6.5
git checkout tags/v1.6.6 -b my-build-v1.6.6
```
This will create a new local branch `my-build-v1.6.5` based on the `v1.6.5` tag release.
* Build `FlinkRunner` and install it locally, using the `--with.kinesis=true` option

This will create a new local branch `my-build-v1.6.6` based on the `v1.6.6` tag release.

* Build `FlinkRunner` and install it locally, using the `-Dwith.kinesis=true` option

```bash
sbt --with.kinesis=true publishLocal
sbt -Dwith.kinesis=true publishLocal
```

This will install `FlinkRunner` with kinesis support in your local repo.

* In your project's build file, add a resolver to your local repo and add the local
`FlinkRunner` dependency:

```sbtshell
resolvers += "Local Maven Repository" at "file://" +
Path.userHome.absolutePath + "/.m2/repository"
Path.userHome.absolutePath + "/.m2/repository"
...
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.5k"
libraryDependencies += "io.epiphanous" %% "flinkrunner" % "1.6.6k"
// notice no v here ---^^ ^^---k for kinesis
```


## What is FlinkRunner?

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.epiphanous.flinkrunner
import java.util.Properties

import io.epiphanous.flinkrunner.flink.FlinkJob
import io.epiphanous.flinkrunner.model.FlinkEvent
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema}
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.util.serialization.{KeyedDeserializationSchema, KeyedSerializationSchema}

trait FlinkRunnerFactory[ADT <: FlinkEvent] {
Expand All @@ -20,4 +24,6 @@ trait FlinkRunnerFactory[ADT <: FlinkEvent] {
def getEncoder: Encoder[ADT] = ???

def getAddToJdbcBatchFunction: AddToJdbcBatchFunction[ADT] = ???

def getBucketAssigner(props: Properties): BucketAssigner[ADT, String] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class FlinkConfig(
def getEncoder = factory.getEncoder
def getAddToJdbcBatchFunction =
factory.getAddToJdbcBatchFunction
def getBucketAssigner(p: Properties) = factory.getBucketAssigner(p)

def getSourceConfig(name: String): SourceConfig = SourceConfig(name, this)
def getSinkConfig(name: String): SinkConfig = SinkConfig(name, this)
Expand Down
11 changes: 5 additions & 6 deletions src/main/scala/io/epiphanous/flinkrunner/util/StreamUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.{
BasePathBucketAssigner,
DateTimeBucketAssigner
}
import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink}
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.{
DefaultRollingPolicy,
OnCheckpointRollingPolicy
Expand Down Expand Up @@ -344,16 +344,15 @@ object StreamUtils extends LazyLogging {
sinkConfig: FileSinkConfig
)(implicit config: FlinkConfig
): DataStreamSink[E] = {

val path = sinkConfig.path
val p = sinkConfig.properties
val bucketCheckInterval = p.getProperty("bucket.check.interval", s"${60000}").toLong
val bucketAssigner = p.getProperty("bucket.assigner", "datetime") match {
val bucketAssigner = p.getProperty("bucket.assigner.type", "datetime") match {
case "none" => new BasePathBucketAssigner[E]()
case "datetime" =>
val bucketFormat = p.getProperty("bucket.assigner.datetime.format", "yyyy/MM/dd/HH")
new DateTimeBucketAssigner[E](bucketFormat)
case assigner => throw new IllegalArgumentException(s"Unknown bucket assigner type: '$assigner'")
new DateTimeBucketAssigner[E](p.getProperty("bucket.assigner.datetime.format", "YYYY/MM/DD/HH"))
case "custom" => config.getBucketAssigner(p).asInstanceOf[BucketAssigner[E, String]]
case other => throw new IllegalArgumentException(s"Unknown bucket assigner type '$other'.")
}
val encoderFormat = p.getProperty("encoder.format", "row")
val sink = encoderFormat match {
Expand Down