Skip to content

Latest commit

 

History

History
193 lines (157 loc) · 5.46 KB

kafka-streams-KStream.adoc

File metadata and controls

193 lines (157 loc) · 5.46 KB

KStream

KStream is an abstraction of a record stream of key-value pairs (where each record is an independent entity or event in the real world).

KStream can be created directly from one or many Kafka topics (using StreamsBuilder.stream operator) or as a result of transformations on an existing KStream.

import org.apache.kafka.streams.StreamsBuilder
val builder = new StreamsBuilder

import org.apache.kafka.streams.kstream.KStream
// Use type annotation to describe the stream, i.e. KStream[String, String]
// Else...Scala type inferencer gives us a stream of "nothing", i.e. KStream[Nothing, Nothing]
val input: KStream[String, String] = builder.stream("input")

KStream comes with a rich set of operators (aka KStream API) that allow for building topologies to consume, process and produce key-value records.

Note
KStreamImpl is the one and only known implementation of KStream Contract in Kafka Streams {{ book.kafka_version }}.
Table 1. KStream Operators
Operator Description

flatMap

KStream<KR, VR> flatMap(
  KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);

groupBy

KGroupedStream<KR, V> groupBy(
  final KeyValueMapper<? super K, ? super V, KR> selector);
KGroupedStream<KR, V> groupBy(
  final KeyValueMapper<? super K, ? super V, KR> selector,
  final Serialized<KR, V> serialized);

groupByKey

KGroupedStream<K, V> groupByKey();
KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);

join

KStream<K, RV> join(
  final GlobalKTable<GK, GV> globalKTable,
  final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
  final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);
KStream<K, VR> join(
  final KStream<K, VO> otherStream,
  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  final JoinWindows windows);
KStream<K, VR> join(
  final KStream<K, VO> otherStream,
  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  final JoinWindows windows,
  final Joined<K, V, VO> joined);
KStream<K, VR> join(
  final KTable<K, VT> table,
  final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);
KStream<K, VR> join(
  final KTable<K, VT> table,
  final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
  final Joined<K, V, VT> joined);

leftJoin

KStream<K, RV> leftJoin(
  final GlobalKTable<GK, GV> globalKTable,
  final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
  final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);
KStream<K, VR> leftJoin(
  final KStream<K, VO> otherStream,
  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  final JoinWindows windows);
KStream<K, VR> leftJoin(
  final KStream<K, VO> otherStream,
  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  final JoinWindows windows,
  final Joined<K, V, VO> joined);
KStream<K, VR> leftJoin(
  final KTable<K, VT> table,
  final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);
KStream<K, VR> leftJoin(
  final KTable<K, VT> table,
  final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
  final Joined<K, V, VT> joined);

transformValues

Stateful record-by-record value transformation

KStream<K, VR> transformValues(
  final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
  final String... stateStoreNames)
KStream<K, VR> transformValues(
  final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
  final String... stateStoreNames)

transformValues uses ValueTransformerSupplier to create a ValueTransformer that is used for a stateful transformation of record values in a stream.

map

KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);

mapValues

KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper);
KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);

print

void print(final Printed<K, V> printed);

selectKey

KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper);

through

KStream<K, V> through(final String topic);
KStream<K, V> through(final String topic, final Produced<K, V> produced);

transform

Stateful record transformation

KStream<K1, V1> transform(
  final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
  final String... stateStoreNames)

to

void to(final String topic);
void to(final String topic, final Produced<K, V> produced);
Note
The specified topic should be manually created before the Kafka Streams application is started.