Skip to content

Commit

Permalink
fix confluent avro deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Apr 23, 2023
1 parent 9646809 commit 1d3f939
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 299 deletions.
150 changes: 88 additions & 62 deletions README.md
Expand Up @@ -40,7 +40,7 @@

## Dependencies

`Flinkrunner 4`
`FlinkRunner 4`
is [available on maven central](https://mvnrepository.com/artifact/io.epiphanous/flinkrunner_2.12)
, built against Flink 1.16 with Scala 2.12 and JDK 11. You can add it to your sbt project:

Expand All @@ -49,22 +49,25 @@ libraryDependencies += "io.epiphanous" %% "flinkrunner" % <flinkrunner-version>
```

replacing `<flinkrunner-version>` with the currently released version
of [`Flinkrunner` on maven](https://mvnrepository.com/artifact/io.epiphanous/flinkrunner_2.12)
of [`FlinkRunner` on maven](https://mvnrepository.com/artifact/io.epiphanous/flinkrunner_2.12)
.

### Connectors

Flinkrunner is built to support many common data sources and sinks, including:

| Connector | Dependency | Source | Sink |
|:--------------|:-------------------------------|:------:|:----:|
| file system | flink-connector-files | yes | yes |
| kafka | flink-connector-kafka | yes | yes |
| kinesis | flink-connector-kinesis | yes | yes |
| cassandra | flink-connector-cassandra | no | yes |
| elasticsearch | flink-connector-elasticsearch7 | no | yes |
| jdbc | flink-connector-jdbc | no | yes |
| rabbit mq | flink-connector-rabbitmq | yes | yes |
FlinkRunner is built to support many common data sources and sinks, including:

| Connector | Dependency | Source | Sink |
|:-----------------------|:--------------------------------------------|:------:|:----:|
| file system | flink-connector-files | yes | yes |
| kafka | flink-connector-kafka | yes | yes |
| kinesis streams source | flink-connector-kinesis | yes | no |
| kinesis streams sink | flink-connector-aws-kinesis-streams | no | yes |
| kinesis firehose | flink-connector-aws-kinesis-firehose | no | yes |
| iceberg | iceberg-flink-runtime-<flink.minor.version> | yes | yes |
| cassandra | flink-connector-cassandra | no | yes |
| elasticsearch | flink-connector-elasticsearch7 | no | yes |
| jdbc | flink-connector-jdbc | no | yes |
| rabbit mq | flink-connector-rabbitmq | yes | yes |

You can add a dependency for a connector by dropping the library into flink's `lib`
directory during deployment of your jobs. You should make sure to match the library's
Expand Down Expand Up @@ -111,8 +114,8 @@ replacing `<flink-version>` with the version of flink used in `FlinkRunner`.
### Avro Support

Flinkrunner supports reading and writing avro messages with kafka and file systems. For
kafka sources and sinks, Flinkrunner uses binary encoding with Confluent schema registry.
FlinkRunner supports reading and writing avro messages with kafka and file systems. For
kafka sources and sinks, FlinkRunner uses binary encoding with Confluent schema registry.
For file sources and sinks, you can select either standard or parquet avro encoding.

Add the following dependencies if you need Avro and Confluent schema registry support:
Expand All @@ -135,7 +138,7 @@ registry support.

#### Schema Registry Support

Flinkrunner provides automatic support for serializing and deserializing in and out of
FlinkRunner provides automatic support for serializing and deserializing in and out of
kafka using
Confluent's [Avro schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
. To make use of this support, you need to do three things:
Expand Down Expand Up @@ -181,14 +184,17 @@ build:
Add use the `format = parquet` directive in your file sink configuration (more details
below).

The [iceberg](https://iceberg.apache.org) source and sink also support parquet output by
default.

### Logging

`Flinkrunner` uses [scala-logging](https://github.com/lightbend/scala-logging) for logging
`FlinkRunner` uses [scala-logging](https://github.com/lightbend/scala-logging) for logging
on top of slf4j. In your implementation, you must provide a logging backend compatible
with slf4j, such as [logback](https://logback.qos.ch/):

```sbtshell
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.11"
libraryDependencies += "ch.qos.logback" % "logback-classic" % <logback-version>
```

### Complex Event Processing
Expand All @@ -201,13 +207,16 @@ If you want to use the complex event processing library, add this dependency:

## What is FlinkRunner?

`FlinkRunner` helps you think about your flink jobs at a high level, so you can focus on
the event pipeline, not the plumbing.
`FlinkRunner` helps you think about your datastream api flink jobs at a high level, so you
can focus on the event pipeline, not the plumbing. It is not as high-level as Flink SQL,
but when you need to write multiple related data stream API jobs, FlinkRunner helps you to
avoid repetitive boiler plate code, simplifies configuration and simplifies usage of many
common flink streaming patterns.

You have a set of related flink jobs that deal in a related set of data event
types. `Flinkrunner` helps you build one application to run those related jobs and
types. `FlinkRunner` helps you build one application to run those related jobs and
coordinate the types. It also simplifies setting up common sources and sinks, so you can
control them purely with configuration, not code. `Flinkrunner` supports a variety of
control them purely with configuration, not code. `FlinkRunner` supports a variety of
sources and sinks out of the box, including `kafka`, `kinesis`, `jdbc`
, `elasticsearch 7+` (sink only), `cassandra` (sink only),
`filesystems` (including `s3` using `parquet` as well as `delimited` and `json` text
Expand All @@ -220,7 +229,7 @@ makes it easy to test your transformation logic with property-based testing.
First, you need to define an [*algebraic data
type*](http://tpolecat.github.io/presentations/algebraic_types.html#1) (ADT) containing
the types that will be used in your flink jobs. Your top level type should inherit
from `FlinkEvent`. This requires your types to implement the following members:
the `FlinkEvent` trait. This requires your types to implement the following members:

* `$id: String` - a unique id for the event
* `$timestamp: Long` - the event time (millis since unix epoch)
Expand All @@ -230,18 +239,25 @@ Additionally, a `FlinkEvent` has three additional members that you can optionall
to enable/configure various features.

* `$bucketId: String` - for bucketing files written to a file sink
* `$dedupId: String` - for use with Flinkrunner's deduplication filter
* `$active:Boolean` - for use with Flinkrunner's filterByControl source stream algorithm
* `$dedupId: String` - for use with FlinkRunner's deduplication filter
* `$active:Boolean` - for use with FlinkRunner's filterByControl source stream algorithm

```
```scala
sealed trait MyEventADT extends FlinkEvent
final case class MyEventA(a:Int, $id:String, $key:String, $timestamp:Long)

final case class MyEventA(a:Int, $id:String, $key:String, $timestamp:Long)
extends MyEventADT

final case class MyEventB(b:Int, $id:String, $key:String, $timestamp:Long)
extends MyEventADT
```

Next, you should create your own runner subclass of the abstract `FlinkRunner` base class.

```
```scala
import io.epiphanous.flinkrunner.model.FlinkConfig
import io.epiphanous.flinkrunner.FlinkRunner

class MyRunner(config:FlinkConfig) extends FlinkRunner[MyEventADT](config) {

override def invoke(jobName:String):Unit = jobName matches {
Expand All @@ -268,6 +284,17 @@ While this sounds useless at first blush, and your `transform` method will usual
more interesting things, identity transforms like this can be useful to copy data from one
storage system to another.

You could something a little more exciting, say transform a stream of `MyEventA`
events into a stream of `MyEventB` events:

```scala
class MyJob2(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventB](runner) {

override def transform:DataStream[MyEventB] =
singleSource[MyEventA]().map { a:MyEventA => MyEventB(b = a.a * 2) }
}
```

Next, you need to configure your job. The following configuration defines a file source
that reads csv files from one s3 bucket and a file sink that writes json files to a
different s3 bucket.
Expand All @@ -277,20 +304,22 @@ jobs {
MyJob1 {
sources {
csv-file-source {
path = "s3://my-event-a-csv-files"
path = "s3://my-events-csv-files"
format = csv
}
}
sinks {
json-file-sink {
path = "s3://my-event-a-json-files"
path = "s3://my-events-json-files"
format = json
}
}
}
}
```

Note that this configuration would work for either `MyJob1` or `MyJob2`.

Next, wire up your runner to a main method.

```
Expand All @@ -309,7 +338,7 @@ flink run myRunner.jar MyJob1

## Flink Jobs

`Flinkrunner` provides a `StreamJob` base class from which you can build and run your
`FlinkRunner` provides a `StreamJob` base class from which you can build and run your
flink jobs. If your output event types require avro support, you should instead use the
`AvroStreamJob` base class to build and run your flink job.

Expand All @@ -322,10 +351,10 @@ class StreamJob[
```

A `StreamJob` must specify the output stream event type in its definition. That output
stream event type must be a subclass of your `Flinkrunner` algebraic data type (ADT). Your
job class will be passed an instance of your `Flinkrunner` type, from which you can access
stream event type must be a subclass of your `FlinkRunner` algebraic data type (ADT). Your
job class will be passed an instance of your `FlinkRunner` type, from which you can access

* `runner.config`: `Flinkrunner` configuration (instance of `FlinkConfig`).
* `runner.config`: `FlinkRunner` configuration (instance of `FlinkConfig`).
* `runner.env`: Flink's stream execution environment (instance of
`StreamExecutionEnvironment`), for interfacing with the DataStream API.
* `runner.tableEnv`: Flink's stream table environment (instance of
Expand All @@ -343,17 +372,17 @@ create source streams from your configuration:
type within your runner's `ADT`. The configured `name` parameter defaults to the first
configured source for convenience.

* `connectedSource[IN1 <: ADT, IN2 <: ADT, KEY](name1:String, name2:String, key1:IN1=>KEY, key2:IN2=>KEY): ConnectedStreams[IN1,IN2]`:
connects the two streams, configured under the provided names, producing a single stream
of type `ConnectedStreams[IN1,IN2]`. A connected stream is effectively a union of the
two types of stream. An event on the connected stream is either of type `IN1` or `IN2`.
The key functions are used for any `keyBy` operations done on the connected stream. It
is important to realize a connected stream is NOT a join between the two streams, and
the keys are not used to perform a join. The use case for a connected stream is where
the data on one stream dynamically impacts how you want to process the other stream.

* `filterByControlSource[CONTROL <: ADT, DATA <: ADT, KEY](controlName:String, dataName:String, key1:CONTROL=>KEY, key2:DATA=>KEY): DataStream[DATA]`:
is a special instance of connected sources, where the first source is a control stream
* `connectedSource[IN1 <: ADT, IN2 <: ADT, KEY](source1:DataStream[IN1], source2:DataStream[IN2], key1:IN1=>KEY, key2:IN2=>KEY): ConnectedStreams[IN1,IN2]`:
connects the two provided source streams producing a single stream of
type `ConnectedStreams[IN1,IN2]`. A connected stream combines the two streams. An event
on the connected stream is either of type `IN1` or `IN2`. The key functions are used for
any `keyBy` operations done on the connected stream. It is important to realize a
connected stream is NOT a join between the two streams, and the keys are not used to
perform a join. The use case for a connected stream is where the data on one stream
dynamically impacts how you want to process the other stream.

* `filterByControlSource[CONTROL <: ADT, DATA <: ADT, KEY](control:DataStream[CONTROL], data:DataStream[DATA], key1:CONTROL=>KEY, key2:DATA=>KEY): DataStream[DATA]`:
is a special instance of a connected source, where the first source is a control stream
and the second source is a data stream. The control events indicate when the data event
stream should be considered active, meaning any data events seen should be emitted. When
the control type's `$active`
Expand All @@ -365,9 +394,9 @@ create source streams from your configuration:

* `broadcastConnectedSource[
IN <: ADT: TypeInformation, BC <: ADT: TypeInformation, KEY: TypeInformation](
keyedSourceName: String, broadcastSourceName: String, keyedSourceGetKeyFunc: IN => KEY)
: BroadcastConnectedStream[IN, BC]`: another special connected source that implements
Flink's [Broadcast State Pattern](https://nightlies.apache.
keyedSource: DataStream[IN], broadcastSource: DataStream[BC], keyedSourceGetKeyFunc:
IN => KEY): BroadcastConnectedStream[IN, BC]`: another special connected source that
implements Flink's [Broadcast State Pattern](https://nightlies.apache.
org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/).
This `StreamJob` method keys and connects a regular data stream, with a so-called
broadcast data stream. A broadcast stream sends copies of all of its elements to all
Expand All @@ -382,16 +411,15 @@ create source streams from your configuration:
sources are Confluent schema-registry aware avro encoded Kafka streams, you should use the
avro aware versions of these factory methods. For instance, the `singleAvroSource()`
method can be used to produce such an input datatream. The signatures of these methods are
more complicated and rely on you to use Flinkrunner's `EmbeddedAvroRecord` and
more complicated and rely on you to use FlinkRunner's `EmbeddedAvroRecord` and
`EmbeddedAvroRecordFactory` traits when implementing your event types.

```
singleAvroSource[
IN <: ADT with EmbeddedAvroRecord[INA],
INA <: GenericRecord](
name: String)(implicit
fromKV: EmbeddedAvroRecordInfo[INA] => IN,
avroParquetRecordFormat: StreamFormat[INA]): DataStream[IN]
fromKV: EmbeddedAvroRecordInfo[INA] => IN): DataStream[IN]
```

Besides source factory methods, `StreamJob` also provides a method to easily perform
Expand Down Expand Up @@ -471,19 +499,11 @@ case class EmbeddedAvroRecordInfo[A <: GenericRecord](
headers: Map[String, String] = Map.empty)
```

#### EmbeddedAvroParquetRecordFactory

An avro parquet writer factory for types that wrap an avro record.

#### EmbeddedAvroParquetRecordFormat

A StreamFormat to read parquet avro files containing a type that embeds an avro record.
## FlinkRunner Configuration

## Flinkrunner Configuration

`Flinkrunner` uses [lightbend config](https://lightbend.github.io/config/) for its
`FlinkRunner` uses [lightbend config](https://lightbend.github.io/config/) for its
configuration, integrating environment variables and command line arguments to provide
easy, environment specific, 12-factor style configuration. `Flinkrunner` makes this
easy, environment specific, 12-factor style configuration. `FlinkRunner` makes this
configuration accessible through a `FlinkConfig` object that you construct and pass to
your runner.

Expand All @@ -499,6 +519,8 @@ trait SourceConfig[ADT <: FlinkEvent]

#### Hybrid Source

#### Iceberg Source

#### Kinesis Source

#### RabbitMQ Source
Expand All @@ -517,6 +539,10 @@ trait SinkConfig[ADT <: FlinkEvent]

#### File Sink

#### Firehose Sink

#### Iceberg Sink

#### Jdbc Sink

#### Kafka Sink
Expand Down
@@ -1,6 +1,7 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.config.ConfigObject
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import io.epiphanous.flinkrunner.util.ConfigToProps.RichConfigObject
import io.epiphanous.flinkrunner.util.StreamUtils.RichProps

Expand All @@ -15,8 +16,10 @@ case class SchemaRegistryConfig(
props: util.HashMap[String, String] = new util.HashMap()) {
val isSerializing: Boolean = !isDeserializing
props.put("schema.registry.url", url)
props.put("specific.avro.reader", "false") // don't make this true!
props.putIfAbsent("use.logical.type.converters", "true")
props.putIfAbsent(
KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG,
"true"
)
}
object SchemaRegistryConfig {
def apply(
Expand Down

0 comments on commit 1d3f939

Please sign in to comment.