Skip to content

Latest commit

 

History

History
275 lines (184 loc) · 12.6 KB

kafka-streams-KStreamImpl.adoc

File metadata and controls

275 lines (184 loc) · 12.6 KB

KStreamImpl

KStreamImpl is the one and only KStream in Kafka Streams {{ book.kafka_version }}.

KStreamImpl is created every time a transformation is executed and when…​FIXME

Table 1. KStreamImpl’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

defaultKeyValueMapper

KeyValueMapper that simply concatenates the key and the value of a record when executed (i.e. applied to a record stream)

doStreamTableJoin Internal Method

<V1, R> KStream<K, R> doStreamTableJoin(
  final KTable<K, V1> other,
  final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
  final boolean leftJoin)

doStreamTableJoin…​FIXME

Note
doStreamTableJoin is used when…​FIXME

doJoin Internal Method

<V1, R> KStream<K, R> doJoin(
  final KStream<K, V1> other,
  final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
  final JoinWindows windows,
  final Joined<K, V, V1> joined,
  final KStreamImplJoin join)

doJoin…​FIXME

Note
doJoin is used when…​FIXME

Transforming Values with Optional State — transformValues Method

KStream<K, V1> transformValues(
  final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
  final String... stateStoreNames)
KStream<K, VR> transformValues(
  final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
  final String... stateStoreNames)
Note
transformValues is part of KStream Contract for a stateful record-by-record value transformation.

transformValues reports a NullPointerException when either ValueTransformerSupplier or ValueTransformerWithKeySupplier is null.

process Method

void process(
  final ProcessorSupplier<? super K, ? super V> processorSupplier,
  final String... stateStoreNames)
Note
process is part of KStream Contract for…​FIXME

process…​FIXME

Stateful Record Transformation — transform Operator

KStream<K1, V1> transform(
  final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
  final String... stateStoreNames)
Note
transform is part of KStream Contract to…​FIXME.

transform requests InternalStreamsBuilder for a new processor name with KSTREAM-TRANSFORM prefix.

transform then creates a new KStreamTransform (for the input transformerSupplier) and requests InternalTopologyBuilder to register a new processor supplier under the name.

Note
transform uses InternalStreamsBuilder to access the current InternalTopologyBuilder.

transform requests InternalTopologyBuilder to connect the processor supplier with state stores if specified.

In the end, transform creates a new KStreamImpl for the processor name and repartitionRequired flag turned on.

transform reports a NullPointerException when transformerSupplier is null.

Registering Processor Node with KStreamMapValues Processor Supplier and KSTREAM-MAPVALUES Prefix — mapValues Operator

<V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper)
Note
mapValues is part of KStream Contract to…​FIXME.

mapValues creates a new KStreamImpl for a new processor (with the current InternalStreamsBuilder, source nodes and repartitionRequired flag).

Internally, mapValues requests InternalStreamsBuilder for a new processor name with KSTREAM-MAPVALUES prefix and registers a KStreamMapValues processor supplier under the name.

Note
mapValues uses InternalStreamsBuilder to access the current InternalTopologyBuilder.

through Operator

KStream<K, V> through(final String topic) // (1)
KStream<K, V> through(final String topic, final Produced<K, V> produced)
  1. Calls the other to with Produced of nulls

Note
through is part of KStream Contract to…​FIXME.

through…​FIXME

Registering Processor Node with KStreamPrint Processor Supplier and KSTREAM-PRINTER Prefix — print Operator

void print(final Printed<K, V> printed)
Note
print is part of KStream Contract to…​FIXME.

print creates a PrintedInternal for the input Printed.

print requests InternalStreamsBuilder for a new processor name with KSTREAM-PRINTER prefix and registers a KStreamPrint (with PrintForeachAction) processor supplier under the name.

Note
print uses InternalStreamsBuilder to access the current InternalTopologyBuilder.

Registering Sink Node (with KSTREAM-SINK Prefix) — to Operator

void to(final String topic) // (1)
void to(final String topic, final Produced<K, V> produced)
  1. Calls the other to with Produced of nulls

Note
to is part of KStream Contract to…​FIXME.

to merely passes the call on to the internal to with a new ProducedInternal for the input Produced.

Registering Sink Node with KSTREAM-SINK Prefix — to Internal Method

void to(final String topic, final ProducedInternal<K, V> produced)

to requests the InternalStreamsBuilder for a new processor name with KSTREAM-SINK prefix.

Note
to uses the input ProducedInternal to access the key and value serializers, and the StreamPartitioner.

to requests the InternalStreamsBuilder for the InternalTopologyBuilder and requests it to register a new sink node under the new processor name.

Note
to uses InternalStreamsBuilder to access the current InternalTopologyBuilder.
Note
to uses WindowedStreamPartitioner when the input ProducedInternal defines no StreamPartitioner and uses WindowedSerializer for the key serializer.
Note
to is used in to and through operators.

repartitionForJoin Internal Method

KStreamImpl<K, V> repartitionForJoin(
  final Serde<K> keySerde,
  final Serde<V> valSerde)

repartitionForJoin…​FIXME

Note
repartitionForJoin is used when…​FIXME

Creating KStreamImpl Instance

KStreamImpl takes the following when created:

  • InternalStreamsBuilder that created the KStreamImpl

  • Name of the source processor node

  • Source nodes

  • Flag whether repartitioning is required or not

KStreamImpl initializes the internal registries and counters.

Transforming Values with State — transformValues Internal Method

private <VR> KStream<K, VR> transformValues(
  final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> internalValueTransformerWithKeySupplier,
  final String... stateStoreNames)

transformValues requests InternalStreamsBuilder for a new processor name with KSTREAM-TRANSFORMVALUES prefix.

transformValues then creates a new KStreamTransformValues (for the input internalValueTransformerWithKeySupplier) and requests InternalTopologyBuilder to register a new processor supplier under the name.

Note
transformValues uses InternalStreamsBuilder to access the current InternalTopologyBuilder.

transformValues requests InternalTopologyBuilder to connect the processor supplier with state stores if specified.

In the end, transformValues creates a new KStreamImpl for the processor name.

Note
transformValues is used exclusively when KStreamImpl is requested to transformValues.

createReparitionedSource Static Method

String createReparitionedSource(
  final InternalStreamsBuilder builder,
  final Serde<K1> keySerde,
  final Serde<V1> valSerde,
  final String topicNamePrefix,
  final String name)

createReparitionedSource requests the input InternalStreamsBuilder for the InternalTopologyBuilder and does the following:

  • Requests the InternalTopologyBuilder to addInternalTopic with the topic name as the input topicNamePrefix (if defined) or the input name and -repartition suffix

  • Requests the InternalStreamsBuilder for a new processor name with KSTREAM-FILTER- prefix and requests the InternalTopologyBuilder to addProcessor with the new processor name and a new KStreamFilter (that filters out null keys) and the name predecessor

  • Requests the InternalStreamsBuilder for a new processor name with KSTREAM-SINK- prefix and requests the InternalTopologyBuilder to add a sink node with the new processor name, the repartition topic and the new KStreamFilter as a predecessor

  • Requests the InternalStreamsBuilder for a new processor name with KSTREAM-SOURCE- prefix (aka sourceName) and requests the InternalTopologyBuilder to add a source node with the new processor name, a FailOnInvalidTimestamp and the repartition topic

In the end, createReparitionedSource returns the source name.

// CAUTION: FIXME Example
Note

createReparitionedSource is used when:

createWindowedStateStore Internal Static Method

<K, V> StoreBuilder<WindowStore<K, V>> createWindowedStateStore(
  final JoinWindows windows,
  final Serde<K> keySerde,
  final Serde<V> valueSerde,
  final String storeName)

createWindowedStateStore…​FIXME

Note
createWindowedStateStore is used when…​FIXME