From 24686accdea180beabd098e84fcfe884bf8e6c86 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 7 Jan 2017 10:45:34 -0800 Subject: [PATCH] MINOR: some public JavaDoc cleanup --- .../kafka/streams/kstream/Aggregator.java | 4 +- .../kafka/streams/kstream/ForeachAction.java | 2 +- .../kafka/streams/kstream/Initializer.java | 2 + .../kafka/streams/kstream/KGroupedStream.java | 44 ++++++++++--------- .../kafka/streams/kstream/KeyValueMapper.java | 4 +- .../apache/kafka/streams/kstream/Merger.java | 4 +- .../kafka/streams/kstream/Predicate.java | 2 +- .../apache/kafka/streams/kstream/Reducer.java | 4 +- .../kafka/streams/kstream/Transformer.java | 6 +-- .../kafka/streams/kstream/ValueJoiner.java | 2 +- .../kafka/streams/kstream/ValueMapper.java | 2 +- .../streams/kstream/ValueTransformer.java | 6 +-- 12 files changed, 46 insertions(+), 36 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java index 9afce57f57889..e433ea7334558 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java @@ -35,6 +35,8 @@ * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) + * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String) + * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier) * @see Reducer */ @InterfaceStability.Unstable @@ -48,5 +50,5 @@ public interface Aggregator { * @param aggregate the current aggregate value * @return the new aggregate value */ - VA apply(K key, V value, VA aggregate); + VA apply(final K key, final V value, final VA aggregate); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java index 59f6fabde75d4..e68bf8dbe75e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java @@ -40,7 +40,7 @@ public interface ForeachAction { * @param key the key of the record * @param value the value of the record */ - void apply(K key, V value); + void apply(final K key, final V value); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java index bee598aeb9a3c..96a6995605f8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java @@ -29,6 +29,8 @@ * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String) * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) + * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String) + * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier) */ @InterfaceStability.Unstable public interface Initializer { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index fc2881ac9d2b9..c3e2f1e6677f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -263,7 +263,7 @@ KTable, Long> count(final SessionWindows sessionWindows, * aggregate and the record's value. * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's * value as-is. - * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. *

* To query the local {@link KeyValueStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: @@ -303,7 +303,8 @@ KTable reduce(final Reducer reducer, * aggregate and the record's value. * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's * value as-is. - * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or + * max. *

* To query the local {@link KeyValueStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. @@ -349,7 +350,7 @@ KTable reduce(final Reducer reducer, * aggregate and the record's value. * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's * value as-is. - * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions like sum, min, or max. *

* To query the local windowed {@link KeyValueStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: @@ -397,7 +398,8 @@ KTable, V> reduce(final Reducer reducer, * aggregate and the record's value. * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's * value as-is. - * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum, + * min, or max. *

* To query the local windowed {@link KeyValueStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. @@ -445,7 +447,8 @@ KTable, V> reduce(final Reducer reducer, * aggregate and the record's value. * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's * value as-is. - * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + * Thus, {@code reduce(Reducer, SessionWindows, String)} can be used to compute aggregate functions like sum, min, + * or max. *

* To query the local {@link SessionStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: @@ -495,7 +498,8 @@ KTable, V> reduce(final Reducer reducer, * aggregate and the record's value. * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's * value as-is. - * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like + * sum, min, or max. *

* To query the local {@link SessionStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: @@ -547,13 +551,13 @@ KTable, V> reduce(final Reducer reducer, * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. - * Thus, {@link #aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like - * count (c.f. {@link #count(String)}) TODO add more examples. + * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like + * count (c.f. {@link #count(String)}) *

* To query the local {@link KeyValueStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: *

{@code
-     * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+     * KafkaStreams streams = ... // some aggregation on value type double
      * ReadOnlyKeyValueStore localStore = streams.store(storeName, QueryableStoreTypes.keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -595,14 +599,14 @@  KTable aggregate(final Initializer initializer,
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
-     * like count (c.f. {@link #count(String)}) TODO add more examples.
+     * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
+     * like count (c.f. {@link #count(String)})
      * 

* To query the local {@link KeyValueStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. * Use {@link StateStoreSupplier#name()} to get the store name: *

{@code
-     * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+     * KafkaStreams streams = ... // some aggregation on value type double
      * ReadOnlyKeyValueStore localStore = streams.store(storeName, QueryableStoreTypes.keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
@@ -646,13 +650,13 @@  KTable aggregate(final Initializer initializer,
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+     * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+     * functions like count (c.f. {@link #count(String)})
      * 

* To query the local windowed {@link KeyValueStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: *

{@code
-     * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
      * ReadOnlyWindowStore localWindowStore = streams.store(storeName, QueryableStoreTypes.windowStore());
      * String key = "some-key";
      * long fromTime = ...;
@@ -703,7 +707,7 @@  KTable, VR> aggregate(final Initializer i
      * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+     * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate
      * functions like count (c.f. {@link #count(String)}) TODO add more examples.
      * 

* To query the local windowed {@link KeyValueStore} it must be obtained via @@ -750,8 +754,8 @@ KTable, VR> aggregate(final Initializer i * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. - * Thus, {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute aggregate - * functions like count (c.f. {@link #count(String)}) + * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute + * aggregate functions like count (c.f. {@link #count(String)}) *

* To query the local {@link SessionStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. @@ -800,8 +804,8 @@ KTable, T> aggregate(final Initializer initializer, * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. - * Thus, {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute aggregate - * functions like count (c.f. {@link #count(String)}) + * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used + * to compute aggregate functions like count (c.f. {@link #count(String)}) *

* To query the local {@link SessionStore} it must be obtained via * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java index 6fcf61c051524..d6d1defb9fdd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java @@ -41,7 +41,7 @@ * @see KStream#groupBy(KeyValueMapper) * @see KStream#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) * @see KTable#groupBy(KeyValueMapper) - * @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serdes, org.apache.kafka.common.serialization.Serde) + * @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) * @see KTable#toStream(KeyValueMapper) */ @InterfaceStability.Unstable @@ -54,5 +54,5 @@ public interface KeyValueMapper { * @param value the value of the record * @return the new value */ - VR apply(K key, V value); + VR apply(final K key, final V value); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java index fcdb20fbab676..5a70f21366559 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java @@ -23,7 +23,7 @@ * The interface for merging aggregate values for {@link SessionWindows} with the given key. * * @param key type - * @param aggregate value type + * @param aggregate value type */ @InterfaceStability.Unstable public interface Merger { @@ -36,5 +36,5 @@ public interface Merger { * @param aggTwo the second aggregate * @return the new aggregate value */ - V apply(K aggKey, V aggOne, V aggTwo); + V apply(final K aggKey, final V aggOne, final V aggTwo); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java index b46e60f56c29c..24563ba9c2cd0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java @@ -43,5 +43,5 @@ public interface Predicate { * @return {@code true} if the {@link org.apache.kafka.streams.KeyValue key-value pair} satisfies the * predicate—{@code false} otherwise */ - boolean test(K key, V value); + boolean test(final K key, final V value); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java index 5791d9c3c25f4..d19c1eb1b68b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java @@ -33,6 +33,8 @@ * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier) * @see KGroupedStream#reduce(Reducer, Windows, String) * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier) + * @see KGroupedStream#reduce(Reducer, SessionWindows, String) + * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier) * @see Aggregator */ @InterfaceStability.Unstable @@ -45,5 +47,5 @@ public interface Reducer { * @param value2 the second value for the aggregation * @return the aggregated value */ - V apply(V value1, V value2); + V apply(final V value1, final V value2); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index e9363e8b53f4c..95d822ad56e16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -37,7 +37,7 @@ public interface Transformer { * * @param context the context; may not be null */ - void init(ProcessorContext context); + void init(final ProcessorContext context); /** * Transform the record with the given key and value. @@ -46,7 +46,7 @@ public interface Transformer { * @param value the value for the record * @return new value; if null no key-value pair will be forwarded to down stream */ - R transform(K key, V value); + R transform(final K key, final V value); /** * Perform any periodic operations and possibly generate a key, if this processor {@link ProcessorContext#schedule(long) schedules itself} with the context @@ -55,7 +55,7 @@ public interface Transformer { * @param timestamp the stream time when this method is being called * @return new value; if null it will not be forwarded to down stream */ - R punctuate(long timestamp); + R punctuate(final long timestamp); /** * Close this processor and clean up any resources. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java index e51889bc334e9..ab91bb4f7e327 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java @@ -51,5 +51,5 @@ public interface ValueJoiner { * @param value2 the second value for joining * @return the joined value */ - VR apply(V1 value1, V2 value2); + VR apply(final V1 value1, final V2 value2); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java index 7d1a096a05853..5099ac71e2ac3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java @@ -42,5 +42,5 @@ public interface ValueMapper { * @param value the value to be mapped * @return the new value */ - VR apply(V value); + VR apply(final V value); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index f92d9a155d0ef..063c35233999c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -36,7 +36,7 @@ public interface ValueTransformer { * * @param context the context; may not be null */ - void init(ProcessorContext context); + void init(final ProcessorContext context); /** * Transform the record with the given key and value. @@ -44,7 +44,7 @@ public interface ValueTransformer { * @param value the value for the record * @return new value */ - R transform(V value); + R transform(final V value); /** * Perform any periodic operations and possibly return a new value, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context @@ -53,7 +53,7 @@ public interface ValueTransformer { * @param timestamp the stream time when this method is being called * @return new value; if null it will not be forwarded to down stream */ - R punctuate(long timestamp); + R punctuate(final long timestamp); /** * Close this processor and clean up any resources.