Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public interface KStream<K, V> {
* (both key and value type can be altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation.
* This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
* stateful record transformation).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
* <pre>{@code
Expand All @@ -117,7 +118,7 @@ public interface KStream<K, V> {
* });
* }</pre>
* <p>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and the return value must not be {@code null}.
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good point. Maybe we should consider ignore nulls in maps if users found it not very friendly. Anyways, just a reminder for future changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not change anything with regard to null: if users want to change "cardinality" they should use "flatMap". If we want to do some reasoning about performance, it's good to know that map does not change cardinality: one in, one out.

* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@link KStream}. (cf. {@link #mapValues(ValueMapper)})
Expand All @@ -130,14 +131,17 @@ public interface KStream<K, V> {
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}).
* This is a stateless record-by-record operation (cf.
* {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
Expand All @@ -160,6 +164,7 @@ public interface KStream<K, V> {
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
*/
<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper);
Expand All @@ -169,7 +174,8 @@ public interface KStream<K, V> {
* can be altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
* This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)}).
* This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
* stateful record transformation).
* <p>
* The example below splits input records {@code <null:String>} containing sentences as values into their words
* and emit a record {@code <word:1>} for each word.
Expand Down Expand Up @@ -204,6 +210,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
*/
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);

Expand All @@ -214,7 +221,8 @@ public interface KStream<K, V> {
* stream (value type can be altered arbitrarily).
* The provided {@link ValueMapper} is applied to each input record and computes zero or more output values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* This is a stateless record-by-record operation.
* This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
* for stateful value transformation).
* <p>
* The example below splits input records {@code <null:String>} containing sentences as values into their words.
* <pre>{@code
Expand All @@ -240,6 +248,8 @@ public interface KStream<K, V> {
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
*/
<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
* <li>map from an input record to a new key (with arbitrary key type as specified by {@code VR})</li>
* </ul>
* This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each
* record of a stream.
* record of a stream (cf. {@link Transformer} for stateful record transformation).
* {@link KeyValueMapper} is a generalization of {@link ValueMapper}.
*
* @param <K> key type
* @param <V> value type
* @param <VR> mapped value type
* @see ValueMapper
* @see Transformer
* @see KStream#map(KeyValueMapper)
* @see KStream#flatMap(KeyValueMapper)
* @see KStream#selectKey(KeyValueMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface Merger<K, V> {

/**
* Compute a new aggregate from the key and two aggregates
* Compute a new aggregate from the key and two aggregates.
*
* @param aggKey the key of the record
* @param aggOne the first aggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,96 @@

package org.apache.kafka.streams.kstream;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;

/**
* A stateful {@link Transformer} interface to transform a key-value pair into a new value.
* The {@link Transformer} interface for stateful mapping of an input record to zero, one, or multiple new output
* records (both key and value type can be altered arbitrarily).
* This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for
* each record of a stream and can access and modify a state that is available beyond a single call of
* {@link #transform(Object, Object)} (cf. {@link KeyValueMapper} for stateless record transformation).
* Additionally, the interface can be called in regular intervals based on the processing progress
* (cf. {@link #punctuate(long)}.
* <p>
* Use {@link TransformerSupplier} to provide new instances of {@link Transformer} to Kafka Stream's runtime.
* <p>
* If only a record's value should be modified {@link ValueTransformer} can be used.
*
* @param <K> key type
* @param <V> value type
* @param <R> return type
* @param <K> key type
* @param <V> value type
* @param <R> {@link KeyValue} return type (both key and value type can be set
* arbitrarily)
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
* @see ValueTransformer
* @see KStream#map(KeyValueMapper)
* @see KStream#flatMap(KeyValueMapper)
*/
@InterfaceStability.Unstable
public interface Transformer<K, V, R> {

/**
* Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
* that contains it is initialized.
* Initialize this transformer.
* This is called once per instance when the topology gets initialized.
* <p>
* If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
* {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
* The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
* {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and
* to access attached {@link StateStore}s.
* <p>
* Note, that {@link ProcessorContext} is updated in the background with the current record's meta data.
* Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}.
*
* @param context the context; may not be null
* @param context the context
*/
void init(final ProcessorContext context);

/**
* Transform the record with the given key and value.
* Additionally, any {@link StateStore state} that is {@link KStream#transform(TransformerSupplier, String...)
* attached} to this operator can be accessed and modified
* arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}).
* <p>
* If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)},
* {@link ProcessorContext#forward(Object, Object, int)}, and
* {@link ProcessorContext#forward(Object, Object, String)} can be used.
* If not record should be forwarded downstream, {@code transform} can return {@code null}.
*
* @param key the key for the record
* @param value the value for the record
* @return new value; if null no key-value pair will be forwarded to down stream
* @return new {@link KeyValue} pair&mdash;if {@code null} no key-value pair will
* be forwarded to down stream
*/
R transform(final K key, final V value);

/**
* Perform any periodic operations and possibly generate a key, if this processor {@link ProcessorContext#schedule(long) schedules itself} with the context
* during {@link #init(ProcessorContext) initialization}.
* Perform any periodic operations and possibly generate new {@link KeyValue} pairs if this processor
* {@link ProcessorContext#schedule(long) schedules itself} with the context during
* {@link #init(ProcessorContext) initialization}.
* <p>
* To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
* {@link ProcessorContext#forward(Object, Object, int)}, and
* {@link ProcessorContext#forward(Object, Object, String)} can be used.
* <p>
* Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to
* timestamps return by the used {@link TimestampExtractor})
* and not based on wall-clock time.
*
* @param timestamp the stream time when this method is being called
* @return new value; if null it will not be forwarded to down stream
* @param timestamp the stream time when {@code punctuate} is being called
* @return must return {@code null}&mdash;otherwise, a {@link StreamsException exception} will be thrown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is correct. KStreamTransformProcessor does:
if (pair != null) context().forward(pair.key, pair.value);
so to the previous @return comment was correct

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java#L64-L67

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reporting this @mihbor (I guess this was a mix up with ValueTransformer). Can you open an PR to fix it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

*/
R punctuate(final long timestamp);

/**
* Close this processor and clean up any resources.
* <p>
* To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
* {@link ProcessorContext#forward(Object, Object, int)}, and
* {@link ProcessorContext#forward(Object, Object, String)} can be used.
*/
void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,28 @@

package org.apache.kafka.streams.kstream;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* A {@link TransformerSupplier} interface which can create one or more {@link Transformer} instances.
*
* @param <K> key type
* @param <V> value type
* @param <R> {@link org.apache.kafka.streams.KeyValue KeyValue} return type (both key and value type can be set
* arbitrarily)
* @see Transformer
* @see KStream#transform(TransformerSupplier, String...)
* @see ValueTransformer
* @see ValueTransformerSupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
*/
@InterfaceStability.Unstable
public interface TransformerSupplier<K, V, R> {

/**
* Return a new {@link Transformer} instance.
*
* @return a new {@link Transformer} instance
* @return a new {@link Transformer} instance
*/
Transformer<K, V, R> get();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
/**
* The {@link ValueMapper} interface for mapping a value to a new value of arbitrary type.
* This is a stateless record-by-record operation, i.e, {@link #apply(Object)} is invoked individually for each record
* of a stream.
* Thus, if {@link ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the
* record's key is preserved.
* of a stream (cf. {@link ValueTransformer} for stateful value transformation).
* If {@link ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the record's
* key is preserved.
* If a record's key and value should be modified {@link KeyValueMapper} can be used.
*
* @param <V> value type
* @param <VR> mapped value type
* @see KeyValueMapper
* @see ValueTransformer
* @see KStream#mapValues(ValueMapper)
* @see KStream#flatMapValues(ValueMapper)
* @see KTable#mapValues(ValueMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,96 @@

package org.apache.kafka.streams.kstream;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;

/**
* A stateful {@link ValueTransformer} interface to transform a value into a new value.
* The {@link ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type).
* This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each
* record of a stream and can access and modify a state that is available beyond a single call of
* {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value transformation).
* Additionally, the interface can be called in regular intervals based on the processing progress
* (cf. {@link #punctuate(long)}.
* If {@link ValueTransformer} is applied to a {@link KeyValue} pair record the record's key is preserved.
* <p>
* Use {@link ValueTransformerSupplier} to provide new instances of {@link ValueTransformer} to Kafka Stream's runtime.
* <p>
* If a record's key and value should be modified {@link Transformer} can be used.
*
* @param <V> value type
* @param <R> return type
* @param <V> value type
* @param <VR> transformed value type
* @see ValueTransformerSupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see Transformer
*/
public interface ValueTransformer<V, R> {
@InterfaceStability.Unstable
public interface ValueTransformer<V, VR> {

/**
* Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
* that contains it is initialized.
* Initialize this transformer.
* This is called once per instance when the topology gets initialized.
* <p>
* If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
* {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
* The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
* {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and
* to access attached {@link StateStore}s.
* <p>
* Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
* Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}.
* <p>
* Note that using {@link ProcessorContext#forward(Object, Object)},
* {@link ProcessorContext#forward(Object, Object, int)}, or
* {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
* {@code ValueTransformer} and will result in an {@link StreamsException exception}.
*
* @param context the context; may not be null
* @param context the context
*/
void init(final ProcessorContext context);

/**
* Transform the record with the given key and value.
* Transform the given value to a new value.
* Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerSupplier, String...)
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
* Note, that using {@link ProcessorContext#forward(Object, Object)},
* {@link ProcessorContext#forward(Object, Object, int)}, and
* {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
* will result in an {@link StreamsException exception}.
*
* @param value the value for the record
* @return new value
* @param value the value to be transformed
* @return the new value
*/
R transform(final V value);
VR transform(final V value);

/**
* Perform any periodic operations and possibly return a new value, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
* during {@link #init(ProcessorContext) initialization}.
* Perform any periodic operations if this processor {@link ProcessorContext#schedule(long) schedule itself} with
* the context during {@link #init(ProcessorContext) initialization}.
* <p>
* It is not possible to return any new output records within {@code punctuate}.
* Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
* or {@link ProcessorContext#forward(Object, Object, String)} will result in an
* {@link StreamsException exception}.
* Furthermore, {@code punctuate} must return {@code null}.
* <p>
* Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to
* timestamps return by the used {@link TimestampExtractor})
* and not based on wall-clock time.
*
* @param timestamp the stream time when this method is being called
* @return new value; if null it will not be forwarded to down stream
* @param timestamp the stream time when {@code punctuate} is being called
* @return must return {@code null}&mdash;otherwise, an {@link StreamsException exception} will be thrown
*/
R punctuate(final long timestamp);
VR punctuate(final long timestamp);

/**
* Close this processor and clean up any resources.
* <p>
* It is not possible to return any new output records within {@code close()}.
* Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
* or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
*/
void close();

Expand Down
Loading