Skip to content

Commit

Permalink
better avro file support, config processing, docs
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Sep 18, 2022
1 parent 4955b18 commit 5cfcac9
Show file tree
Hide file tree
Showing 51 changed files with 1,181 additions and 621 deletions.
39 changes: 21 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<div align="center">
<br />
<!-- license -->
<a href="https://github.com/epiphanous/flinkrunner/blob/master/LICENSE" title="License">
<a href="https://github.com/epiphanous/flinkrunner/blob/main/LICENSE" title="License">
<img src="https://img.shields.io/badge/license-MIT-brightgreen.svg" alt="license"/>
</a>
<!-- release -->
Expand All @@ -27,7 +27,8 @@
<img src="https://img.shields.io/travis/com/epiphanous/flinkrunner.svg" alt="build" />
</a>
<!-- coverage -->
<a href='https://coveralls.io/github/epiphanous/flinkrunner?branch=master'><img src='https://coveralls.io/repos/github/epiphanous/flinkrunner/badge.svg?branch=master' alt='Coverage Status' /></a>
<a href='https://coveralls.io/github/epiphanous/flinkrunner?branch=main'><img
src='https://coveralls.io/repos/github/epiphanous/flinkrunner/badge.svg?branch=master' alt='Coverage Status' /></a>
</div>

<div align="center">
Expand Down Expand Up @@ -66,12 +67,12 @@ Flinkrunner is built to support many common data sources and sinks, including:
| jdbc | flink-connector-jdbc | no | yes |
| rabbit mq | flink-connector-rabbitmq | yes | yes |

You can add a dependency for a connector you want to use by dropping the library into
flink's `lib` directory during deployment of your jobs. You should make sure to match the
library's version with the compiled flink and scala versions of `FlinkRunner`.
You can add a dependency for a connector by dropping the library into flink's `lib`
directory during deployment of your jobs. You should make sure to match the library's
version with the compiled flink and scala versions of `FlinkRunner`.

To run tests locally in your IDE, you can add a connector library to your dependencies
like this:
To run tests locally in your IDE, add a connector library to your dependencies in provided
scope, like this:

```
"org.apache.flink" % "flink-connector-kafka" % <flink-version> % Provided
Expand All @@ -82,8 +83,7 @@ replacing `<flink-version>` with the version of flink used in `FlinkRunner`.
### S3 Support

S3 configuration is important for most flink usage scenarios. Flink has two different
implementations to support S3:
`flink-s3-fs-presto` and `flink-s3-fs-hadoop`.
implementations to support S3: `flink-s3-fs-presto` and `flink-s3-fs-hadoop`.

* `flink-s3-fs-presto` is registered under the schemes `s3://` and `s3p://` and is
preferred for checkpointing to s3.
Expand All @@ -95,24 +95,27 @@ the `plugins` directory:

```bash
cd $FLINK_DIR
mkdir -p ./plugins/s3
cp ./opt/flink-s3-fs-presto-<flink-version>.jar .plugins/s3-fs-preso
mkdir -p ./plugins/s3-fs-presto
cp ./opt/flink-s3-fs-presto-<flink-version>.jar .plugins/s3-fs-presto
mkdir -p ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-<flink-version>.jar .plugins/s3-fs-hadoop
```

replacing `<flink-version>` with the version of flink used in `FlinkRunner`.

> *NOTE*: Do not copy them into flink's `lib` directory, as this will not work! They need
> to be in their own, > individual subdirectories of flink's deployed `plugins` directory.
> to be in their own, individual subdirectories of flink's deployed `plugins` directory.
> *NOTE
> You will also need
>
to [configure access](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/filesystems/s3/#configure-access-credentials)
> *NOTE*: You will also need to
> [configure access](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/filesystems/s3/#configure-access-credentials)
> from your job to AWS S3. That is outside the scope of this readme.
### Avro Support

Flinkrunner supports reading and writing avro messages with kafka and file systems. For
kafka sources and sinks, Flinkrunner uses binary encoding with Confluent schema registry.
For file sources and sinks, you can select either standard or parquet avro encoding.

Add the following dependencies if you need Avro and Confluent schema registry support:

```
Expand Down Expand Up @@ -428,8 +431,8 @@ class AvroStreamJob[
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
```

An `AvroStreamJob` is a specialized `StreamJob` class to support outputting to an avro
encoded sink (kafka or parquet-avro files).
An `AvroStreamJob` is a specialized `StreamJob` class to support outputting to an avro
encoded sink (kafka or parquet-avro files).

#### EmbeddedAvroRecord

Expand Down
1 change: 0 additions & 1 deletion src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ abstract class FlinkRunner[ADT <: FlinkEvent: TypeInformation](
env.fromCollection(mockEvents).name(lbl).uid(lbl)
case _ =>
sourceConfig match {
case s: MockSourceConfig[ADT] => s.getSourceStream[E](env)
case s: FileSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KafkaSourceConfig[ADT] => s.getSourceStream[E](env)
case s: KinesisSourceConfig[ADT] => s.getSourceStream[E](env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import com.google.common.hash.{Funnel, HashFunction}

import scala.collection.immutable

/**
* Implements hyperloglog cardinality estimate based on paper by P.
* Flajolet, È. Fusy, O. Gandouet, F. Meiunier. HyperLogLog: the analysis
* of a near-optimal cardinality estimation algorithm. Proceedings of
* Discrete Mathematics and Theoretical Computer Science. Pages 127-146.
* 2007.
*/
/** Implements hyperloglog cardinality estimate based on paper by P.
* Flajolet, È. Fusy, O. Gandouet, F. Meiunier. HyperLogLog: the analysis
* of a near-optimal cardinality estimation algorithm. Proceedings of
* Discrete Mathematics and Theoretical Computer Science. Pages 127-146.
* 2007.
*/
case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

require(b >= 4 && b <= 16, "b must be an integer in [4,16]")
Expand Down Expand Up @@ -48,15 +47,14 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {
/** True if data has been added to the registers */
def nonEmpty: Boolean = cardinality > 0

/**
* Incorporates an item into the registers, updates the cardinality
* estimate and returns it.
*
* @param item
* the item to add
* @return
* Long
*/
/** Incorporates an item into the registers, updates the cardinality
* estimate and returns it.
*
* @param item
* the item to add
* @return
* Long
*/
def add(item: T): Long = {
val x = hash(item)
val j = 1 + (x & (m - 1))
Expand All @@ -65,12 +63,11 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {
estimateCardinality
}

/**
* Compute the current distinct cardinality estimate.
*
* @return
* Long
*/
/** Compute the current distinct cardinality estimate.
*
* @return
* Long
*/
private def estimateCardinality: Long = {
val E = am2 / M.map(i => 1 / math.pow(2d, i.toDouble)).sum
// small range correction
Expand All @@ -88,13 +85,12 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {
cardinality
}

/**
* Merge another HyperLogLog[T] instance into this instance. Note the
* other instance must have the same b parameter as this instance.
*
* @param another
* the other HyperLogLog[T] instance
*/
/** Merge another HyperLogLog[T] instance into this instance. Note the
* other instance must have the same b parameter as this instance.
*
* @param another
* the other HyperLogLog[T] instance
*/
def merge(another: HyperLogLog[T]): HyperLogLog[T] = {
if (another.nonEmpty) {
require(another.m == m, s"Can only merge HLL with same b=$b")
Expand All @@ -106,28 +102,26 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {
this
}

/**
* Computes positive integer hash of item
*
* @param item
* item to hash
* @return
* Int
*/
/** Computes positive integer hash of item
*
* @param item
* item to hash
* @return
* Int
*/
private def hash(item: T): Int = {
val h = hasher.hashObject(item, funnel).asInt()
if (h < 0) ~h else h
}

/**
* Computes most significant set bit of an integer, where returned bit in
* [0,32].
*
* @param i
* the non-negative Int to examine
* @return
* Int
*/
/** Computes most significant set bit of an integer, where returned bit
* in [0,32].
*
* @param i
* the non-negative Int to examine
* @return
* Int
*/
private def rho(i: Int): Int = {
require(i >= 0, "i must be non-negative integer")
(32 - HyperLogLog.MASKS.lastIndexWhere(_ <= i)) % 33
Expand Down
Loading

0 comments on commit 5cfcac9

Please sign in to comment.