Skip to content

Commit

Permalink
Merge cfd3862 into c3c0d08
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Sep 8, 2022
2 parents c3c0d08 + cfd3862 commit a5897a0
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 90 deletions.
110 changes: 108 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

<div align="center">
<sub>Built by
<a href="https://twitter.com/epiphanous">nextdude@epiphanous.io</a> and
<a href="https://twitter.com/epiphanous">nextdude</a> and
<a href="https://github.com/epiphanous/flinkrunner/graphs/contributors">
contributors
</a></sub>
Expand Down Expand Up @@ -307,6 +307,10 @@ flink run myRunner.jar MyJob1

## Flink Jobs

`Flinkrunner` provides a `StreamJob` base class from which you can build and run your
flink jobs. If your output event types require avro support, you should instead use the
`AvroStreamJob` base class to build and run your flink job.

### StreamJob

```
Expand All @@ -315,6 +319,106 @@ class StreamJob[
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
```

A `StreamJob` must specify the output stream event type in its definition. That output
stream event type must be a subclass of your `Flinkrunner` algebraic data type (ADT). Your
job class will be passed an instance of your `Flinkrunner` type, from which you can access

* `runner.config`: `Flinkrunner` configuration (instance of `FlinkConfig`).
* `runner.env`: Flink's stream execution environment (instance of
`StreamExecutionEnvironment`), for interfacing with the DataStream API.
* `runner.tableEnv`: Flink's stream table environment (instance of
`StreamTableEnvironment`), for interfacing with the Table API.
* `runner.mockEdges`: A boolean indicating if your runner instance will mock interaction
with sources and sinks and redirect transformed output to your
specified `CheckResults.checkOutputEvents` method.

Your `StreamJob` must provide a `transform` method that defines and transforms your
sources into an output event stream. `StreamJob` provides several factory methods to
create source streams from your configuration:

* `singleSource[IN <: ADT](name:String)`: produces a single input source stream,
configured under the provided name, of type `DataStream[IN]`, where `IN` is an event
type within your runner's `ADT`. The configured `name` parameter defaults to the first
configured source for convenience.

* `connectedSource[IN1 <: ADT, IN2 <: ADT, KEY](name1:String, name2:String, key1:IN1=>KEY, key2:IN2=>KEY): ConnectedStreams[IN1,IN2]`:
connects the two streams, configured under the provided names, producing a single stream
of type `ConnectedStreams[IN1,IN2]`. A connected stream is effectively a union of the
two types of stream. An event on the connected stream is either of type `IN1` or `IN2`.
The key functions are used for any `keyBy` operations done on the connected stream. It
is important to realize a connected stream is NOT a join between the two streams, and
the keys are not used to perform a join. The use case for a connected stream is where
the data on one stream dynamically impacts how you want to process the other stream.

* `filterByControlSource[CONTROL <: ADT, DATA <: ADT, KEY](controlName:String, dataName:String, key1:CONTROL=>KEY, key2:DATA=>KEY): DataStream[DATA]`:
is a special instance of connected sources, where the first source is a control stream
and the second source is a data stream. The control events indicate when the data event
stream should be considered active, meaning any data events seen should be emitted. When
the control type's `$active`
method returns true, the data stream will be considered active, and any data events seen
on the connected stream will be emitted to the output. Conversely, when the control
type's `$active` method returns
`false`, the data stream will be considered inactive, and any data events seen on the
connected stream will not be emitted to the output.

* `broadcastConnectedSource[
IN <: ADT: TypeInformation, BC <: ADT: TypeInformation, KEY: TypeInformation](
keyedSourceName: String, broadcastSourceName: String, keyedSourceGetKeyFunc: IN => KEY)
: BroadcastConnectedStream[IN, BC]`: another special connected source that implements
Flink's [Broadcast State Pattern](https://nightlies.apache.
org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/).
This `StreamJob` method keys and connects a regular data stream, with a so-called
broadcast data stream. A broadcast stream sends copies of all of its elements to all
downstream tasks. So in this case, we key the source data function and effectively send
a connected stream of the data and broadcast elements to each keyed task. A common use
case broadcasts a stream of rule changes that impact how the data stream should be
processed. The `BroadcastConnectedStream[IN,BC]` should be processed with a special type
of `CoProcessFunction` called a `KeyedBroadcastProcessFunction[KEY, IN, BC, OUT]`, which
produces your transformed output data stream of type `DataStream[OUT]`.

`StreamJob` also provides avro versions of all these source factory methods. If your
sources are Confluent schema-registry aware avro encoded Kafka streams, you should use the
avro aware versions of these factory methods. For instance, the `singleAvroSource()`
method can be used to produce such an input datatream. The signatures of these methods are
more complicated and rely on you to use Flinkrunner's `EmbeddedAvroRecord` and
`EmbeddedAvroRecordFactory` traits when implementing your event types.

```
singleAvroSource[
IN <: ADT with EmbeddedAvroRecord[INA],
INA <: GenericRecord](
name: String)(implicit
fromKV: EmbeddedAvroRecordInfo[INA] => IN,
avroParquetRecordFormat: StreamFormat[INA]): DataStream[IN]
```

Besides source factory methods, `StreamJob` also provides a method to easily perform
[windowed aggregations](https://nightlies.apache.
org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/).

```
def windowedAggregation[
E <: ADT: TypeInformation,
KEY: TypeInformation,
WINDOW <: Window: TypeInformation,
AGG <: Aggregate: TypeInformation,
QUANTITY <: Quantity[QUANTITY]: TypeInformation,
PWF_OUT <: ADT: TypeInformation](
source: KeyedStream[E, KEY],
initializer: WindowedAggregationInitializer[
E,
KEY,
WINDOW,
AGG,
QUANTITY,
PWF_OUT,
ADT
]): DataStream[PWF_OUT]
```

Finally, `StreamJob` also provides a `run()` method that builds and executes the flink job
graph defined by your `transform` method.

### AvroStreamJob

```
Expand All @@ -324,6 +428,9 @@ class AvroStreamJob[
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
```

An `AvroStreamJob` is a specialized `StreamJob` class to support outputting to an avro
encoded sink (kafka or parquet-avro files).

#### EmbeddedAvroRecord

```
Expand Down Expand Up @@ -366,7 +473,6 @@ case class EmbeddedAvroRecordInfo[A <: GenericRecord](

An avro parquet writer factory for types that wrap an avro record.


#### EmbeddedAvroParquetRecordFormat

A StreamFormat to read parquet avro files containing a type that embeds an avro record.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package io.epiphanous.flinkrunner.serde

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.ser.std.StdSerializer
import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation

import scala.annotation.tailrec
import scala.collection.JavaConverters._

/** A simple custom jackson serializer to handle serializing avro records
* (Generic or Specific)
*/
class AvroJsonSerializer
extends StdSerializer[GenericRecord](classOf[GenericRecord])
with LazyLogging {
override def serialize(
record: GenericRecord,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
gen.writeStartObject()
record.getSchema.getFields.asScala.foreach { f =>
_serializeAvroValue(
f.name(),
record.get(f.name()),
f.schema(),
gen,
provider
)
}
gen.writeEndObject()
}

@tailrec
private def _serializeAvroValue[T: TypeInformation](
name: String,
value: T,
schema: Schema,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
(schema.getType, value) match {
case (NULL, _) => gen.writeNullField(name)
case (_, null | None) => gen.writeNullField(name)
case (_, Some(v)) =>
_serializeAvroValue(name, v, schema, gen, provider)
case (RECORD, record: GenericRecord) =>
gen.writeFieldName(name)
serialize(record, gen, provider)
case (ENUM, ord: Int) =>
gen.writeStringField(name, schema.getEnumSymbols.get(ord))
case (ARRAY, seq: Seq[_]) =>
gen.writeArrayFieldStart(name)
seq.foreach { e =>
_serializeElement(e, schema.getElementType, gen, provider)
}
gen.writeEndArray()
case (MAP, map: Map[String, _] @unchecked) =>
gen.writeObjectFieldStart(name)
map.foreach { case (k, e) =>
gen.writeFieldName(k)
_serializeElement(e, schema.getValueType, gen, provider)
}
gen.writeEndObject()
case (UNION, _) =>
// todo: not a very sophisticated way to process unions, but it covers common case of [null, type]
val nonNullTypes =
schema.getTypes.asScala.filterNot(s => s.getType == NULL)
if (nonNullTypes.size > 1) {
throw new RuntimeException(
s"field $name of type union has more than one non-null types: $nonNullTypes"
)
}
_serializeAvroValue(
name,
value,
nonNullTypes.head,
gen,
provider
)
case (FIXED | BYTES, bytes: Array[Byte]) =>
gen.writeBinaryField(name, bytes)
case (STRING, string: String) =>
gen.writeStringField(name, string)
case (INT, int: Int) =>
gen.writeNumberField(name, int)
case (LONG, long: Long) => gen.writeNumberField(name, long)
case (FLOAT, float: Float) => gen.writeNumberField(name, float)
case (DOUBLE, double: Double) => gen.writeNumberField(name, double)
case (BOOLEAN, boolean: Boolean) =>
gen.writeBooleanField(name, boolean)
case _ =>
gen.writeFieldName(name)
provider
.findValueSerializer(
implicitly[TypeInformation[T]].getTypeClass
)
.asInstanceOf[JsonSerializer[T]]
.serialize(value, gen, provider)
}
}

private def _serializeElement(
value: Any,
schema: Schema,
gen: JsonGenerator,
provider: SerializerProvider): Unit = {
(schema.getType, value) match {
case (_, null | None) => gen.writeNull()
case (_, Some(v)) => _serializeElement(v, schema, gen, provider)
case (RECORD, record: GenericRecord) =>
serialize(record, gen, provider)
case (ENUM, ord: Int) =>
gen.writeString(schema.getEnumSymbols.get(ord))
case (ARRAY, seq: Seq[_]) =>
seq.foreach { e =>
_serializeElement(e, schema.getElementType, gen, provider)
}
case (MAP, _) => gen.writeObject(value)
case (UNION, _) => // todo
case (STRING, string: String) => gen.writeString(string)
case (INT, int: Int) => gen.writeNumber(int)
case (LONG, long: Long) => gen.writeNumber(long)
case (DOUBLE, double: Double) => gen.writeNumber(double)
case (BOOLEAN, boolean: Boolean) => gen.writeBoolean(boolean)
case _ =>
logger.error(
s"no serializer for array element type ${schema.getType.name()}"
)
// todo
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@ package io.epiphanous.flinkrunner.serde

import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.model.{EmbeddedAvroRecord, FlinkEvent}
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{EncoderFactory, JsonEncoder}
import org.apache.avro.specific.SpecificRecord
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.avro.generic.GenericRecord
import org.apache.flink.api.common.serialization.Encoder
import org.apache.flink.api.common.typeinfo.TypeInformation

import java.io.OutputStream
import java.nio.charset.StandardCharsets
import scala.util.Try

/** A thin wrapper to emit an embedded avro record from events into an
* output stream in a json (lines) format.
/** A JSON lines encoder for events with embedded avro records.
*
* @param pretty
* true if you want to indent the json code (default = false)
* true if you want to indent the json output (default = false)
* @param sortKeys
* true if you want to sort keys in the json output (default = false)
* @tparam E
* the ADT event type that embeds an avro record of type A
* @tparam A
Expand All @@ -29,62 +25,19 @@ import scala.util.Try
class EmbeddedAvroJsonFileEncoder[
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation,
ADT <: FlinkEvent](schemaOpt: Option[Schema] = None)
ADT <: FlinkEvent](pretty: Boolean = false, sortKeys: Boolean = false)
extends Encoder[E]
with LazyLogging {

@transient
lazy val avroClass: Class[A] =
implicitly[TypeInformation[A]].getTypeClass

@transient
lazy val encoderWriterPairOpt
: Option[(JsonEncoder, GenericDatumWriter[GenericRecord])] = {
if (
schemaOpt.nonEmpty || classOf[SpecificRecord].isAssignableFrom(
avroClass
)
) {
val schema = schemaOpt.getOrElse(
avroClass.getConstructor().newInstance().getSchema
)
Some(
EncoderFactory
.get()
.jsonEncoder(schema, new ByteArrayOutputStream()),
new GenericDatumWriter[GenericRecord](schema)
lazy val rowEncoder = new JsonRowEncoder[A](pretty, sortKeys)

override def encode(element: E, stream: OutputStream): Unit =
rowEncoder
.encode(element.$record)
.fold(
t => logger.error(s"failed to json encode $element", t),
s => stream.write(s.getBytes(StandardCharsets.UTF_8))
)
} else None
}

lazy val lineEndBytes: Array[Byte] =
System.lineSeparator().getBytes(StandardCharsets.UTF_8)

override def encode(element: E, stream: OutputStream): Unit = {
val record = element.$record

val (encoder, writer) =
encoderWriterPairOpt match {
case None =>
val schema = record.getSchema
(
EncoderFactory.get().jsonEncoder(schema, stream),
new GenericDatumWriter[GenericRecord](schema)
)
case Some(
(enc: JsonEncoder, wr: GenericDatumWriter[GenericRecord])
) =>
(enc.configure(stream), wr)
}

Try {
writer.write(record, encoder)
encoder.flush()
stream.write(lineEndBytes)
}.fold(
error =>
logger.error(s"Failed to encode avro record $record", error),
_ => ()
)
}
}
Loading

0 comments on commit a5897a0

Please sign in to comment.