Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5671 Followup: Remove reflections in unit test classes #3603

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions docs/streams/developer-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,12 @@ <h4><a id="streams_processor_describe" href="#streams_processor_describe">Descri
Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
<br />
Note that global stores are listed explicitly as they are accessible by all nodes without the need to explicitly connect them.
Furthermore, nodes are grouped by <code>Subtopology</code>.
Subtopologies are groups of nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
For execution, each <code>Subtopology</code> is executed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
Thus, each <code>Subtopology</code> describes an independent unit of works that can be executed by different threads in parallel.
Furthermore, nodes are grouped by <code>Sub-topologies</code>, where each sub-topology is a group of processor nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
During execution, each <code>Sub-topology</code> will be processed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
Thus, each <code>Sub-topology</code> describes an independent unit of works that can be executed by different threads in parallel.
<br />
Describing a <code>Topology</code> is helpful to reason about tasks and thus maximum parallelism.
It is also helpful to get insight into a <code>Topology</code> if it is not specified manually but via Kafka Streams DSL that is described in the next section.
Describing a <code>Topology</code> before starting your streams application with the specified topology is helpful to reason about tasks and thus maximum parallelism (we will talk about how to execute your written application later in this section).
It is also helpful to get insight into a <code>Topology</code> if it is not specified directly as described above but via Kafka Streams DSL (we will describe the DSL in the next section.
</p>

In the next section we present another way to build the processor topology: the Kafka Streams DSL.
Expand Down Expand Up @@ -425,6 +424,22 @@ <h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafk
</pre>
<br>

<h4><a id="streams_dsl_build" href="#streams_dsl_build">Generate the processor topology</a></h4>

<p>
Within the Streams DSL, while users are specifying the operations to create / transform various streams as described above, a <code>Topology</code> is constructed implicitly within the <code>StreamsBuilder</code>.
Users can generate the constructed topology at any given point in time by calling <code>build</code>:
</p>

<pre class="brush: java;">
Topology topology = builder.build();
</pre>

<p>
Users can investigate the generated <code>Topology</code> via its <code>describe</code> API, and continue building or modifying the topology until they are satisfied with it.
The topology then can be used to execute the application (we will talk about this later in this section).
</p>

<h3><a id="streams_interactive_queries" href="#streams_interactive_queries">Interactive Queries</a></h3>
<p>
Interactive queries let you get more from streaming than just the processing of data. This feature allows you to treat the stream processing layer as a lightweight embedded database and, more concretely, <i>to directly query the latest state</i> of your stream processing application, without needing to materialize that state to external databases or external storage first.
Expand Down Expand Up @@ -737,7 +752,7 @@ <h4><a id="streams_developer-guide_interactive-queries_custom-stores" href="#str
<code>StateStoreProvider#stores(String storeName, QueryableStoreType&lt;T&gt; queryableStoreType)</code> returns a <code>List</code> of state stores with the given <code>storeName</code> and of the type as defined by <code>queryableStoreType</code>.
</p>
<p>
An example implemention of the wrapper follows (Java 8+):
An example implementation of the wrapper follows (Java 8+):
</p>

<pre class="brush: java;">
Expand Down Expand Up @@ -1064,8 +1079,6 @@ <h4><a id="streams_topic_config" href="#streams_topic_config">Internal Topic Con
groupedStream.count(supplier)
</pre>



<h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams Application</a></h4>
<p>
You can call Kafka Streams from anywhere in your application code.
Expand All @@ -1074,9 +1087,8 @@ <h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams

<p>
First, you must create an instance of <code>KafkaStreams</code>.
The first argument of the <code>KafkaStreams</code> constructor takes a <code>Topology</code>
that is a logical topology description (you can create a <code>Topology</code> either directly or use
<code>StreamsBuilder</code> to create one).
The first argument of the <code>KafkaStreams</code> constructor takes an instance of <code>Topology</code>.
This topology can be either created directly following the <code>Processor</code> API or implicitly via the <code>StreamsBuilder</code> in the higher-level Streams DSL.
The second argument is an instance of <code>StreamsConfig</code> mentioned above.
</p>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public class StreamsBuilder {

/** The actual topology that is constructed by this StreamsBuilder. */
private final Topology topology = new Topology();

/** The topology's internal builder. */
private final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;

private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* @param <V> Type of values
* @see KTable
* @see KGroupedStream
* @see org.apache.kafka.streams.StreamsBuilder#stream(String...)
* @see StreamsBuilder#stream(String...)
*/
@InterfaceStability.Evolving
public interface KStream<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
Expand All @@ -51,17 +50,17 @@
* @deprecated Use {@link org.apache.kafka.streams.StreamsBuilder StreamsBuilder} instead
*/
@Deprecated
public class KStreamBuilder extends TopologyBuilder {
public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid importing the org.apache.kafka.streams.processor.TopologyBuilder which will introduce a WARNING during compilation; instead we will explicitly refer to the class within the class whenever we have to.


private final AtomicInteger index = new AtomicInteger(0);

private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);

private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
private Topology.AutoOffsetReset translateAutoOffsetReset(final org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset resetPolicy) {
if (resetPolicy == null) {
return null;
}
return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
return resetPolicy == org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@

public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {

// TODO: change to package-private after removing KStreamBuilder
public static final String SOURCE_NAME = "KSTREAM-SOURCE-";

static final String SINK_NAME = "KSTREAM-SINK-";

static final String REPARTITION_TOPIC_SUFFIX = "-repartition";

private static final String BRANCH_NAME = "KSTREAM-BRANCH-";

private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
Expand Down Expand Up @@ -88,10 +95,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V

private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";

static final String SINK_NAME = "KSTREAM-SINK-";

public static final String SOURCE_NAME = "KSTREAM-SOURCE-";

private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";

private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
Expand All @@ -100,8 +103,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V

private static final String FOREACH_NAME = "KSTREAM-FOREACH-";

static final String REPARTITION_TOPIC_SUFFIX = "-repartition";

private final KeyValueMapper<K, V, String> defaultKeyValueMapper;

private final boolean repartitionRequired;
Expand Down Expand Up @@ -439,7 +440,7 @@ public void to(final Serde<K> keySerde,
@Override
public void to(final Serde<K> keySerde,
final Serde<V> valSerde,
StreamPartitioner<? super K, ? super V> partitioner,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
Objects.requireNonNull(topic, "topic can't be null");
final String name = builder.newName(SINK_NAME);
Expand All @@ -449,10 +450,11 @@ public void to(final Serde<K> keySerde,

if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, windowedPartitioner, this.name);
} else {
builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
}

builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
*/
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {

// TODO: these two fields can be package-private after KStreamBuilder is removed
public static final String SOURCE_NAME = "KTABLE-SOURCE-";

public static final String STATE_STORE_NAME = "STATE-STORE-";

private static final String FILTER_NAME = "KTABLE-FILTER-";

private static final String FOREACH_NAME = "KTABLE-FOREACH-";
Expand All @@ -66,12 +71,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,

private static final String SELECT_NAME = "KTABLE-SELECT-";

public static final String SOURCE_NAME = "KTABLE-SOURCE-";

private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";

public static final String STATE_STORE_NAME = "STATE-STORE-";

private final ProcessorSupplier<?, ?> processorSupplier;

private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?

public synchronized Pattern earliestResetTopicsPattern() {
final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics);
final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns);

ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics);

Expand All @@ -925,6 +925,8 @@ public synchronized Pattern latestResetTopicsPattern() {
return latestPattern;
}

// TODO: we should check regex overlap at topology construction time and then throw TopologyException
// instead of at runtime. See KAFKA-5660
private void ensureNoRegexOverlap(final Pattern builtPattern,
final Set<Pattern> otherPatterns,
final Set<String> otherTopics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
Expand Down Expand Up @@ -55,13 +54,13 @@ public RecordCollector recordCollector() {
}

/**
* @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
* @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node
*/
@SuppressWarnings("deprecation")
@Override
public StateStore getStateStore(final String name) {
if (currentNode() == null) {
throw new TopologyBuilderException("Accessing from an unknown node");
throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto above.

}

final StateStore global = stateManager.getGlobalStore(name);
Expand All @@ -70,7 +69,7 @@ public StateStore getStateStore(final String name) {
}

if (!currentNode().stateStores.contains(name)) {
throw new TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name);
throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name);
}

return stateManager.getStore(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Rule;
import org.junit.Test;

import java.util.Collection;
import java.util.Set;

import static org.junit.Assert.assertEquals;

public class StreamsBuilderTest {
Expand Down Expand Up @@ -113,4 +117,13 @@ public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
builder.stream(Serdes.String(), Serdes.String(), null, null);
}

// TODO: these two static functions are added because some non-TopologyBuilder unit tests need to access the internal topology builder,
// which is usually a bad sign of design patterns between TopologyBuilder and StreamThread. We need to consider getting rid of them later
public static InternalTopologyBuilder internalTopologyBuilder(final StreamsBuilder builder) {
return builder.internalTopologyBuilder;
}

public static Collection<Set<String>> getCopartitionedGroups(final StreamsBuilder builder) {
return builder.internalTopologyBuilder.copartitionGroups();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,14 @@ public void shouldThrowExceptionOverlappingPattern() throws Exception {
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));

// TODO: we should check regex overlap at topology construction time and then throw TopologyException
// instead of at runtime. See KAFKA-5660
try {
builder.earliestResetTopicsPattern();
fail("Should have thrown TopologyException");
} catch (final TopologyException expected) { }
} catch (final TopologyException expected) {
// do nothing
}
}

@Test
Expand All @@ -263,7 +267,9 @@ public void shouldThrowExceptionOverlappingTopic() throws Exception {
try {
builder.stream(Topology.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
fail("Should have thrown TopologyException");
} catch (final org.apache.kafka.streams.errors.TopologyException expected) { }
} catch (final TopologyException expected) {
// do nothing
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testRegexMatchesTopicsAWhenCreated() throws Exception {
final StreamThread originalThread = streamThreads[0];

final TestStreamThread testStreamThread = new TestStreamThread(
InternalStreamsBuilderTest.internalTopologyBuilder(builder),
StreamsBuilderTest.internalTopologyBuilder(builder),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalThread.applicationId,
Expand Down Expand Up @@ -216,7 +216,7 @@ public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
final StreamThread originalThread = streamThreads[0];

final TestStreamThread testStreamThread = new TestStreamThread(
InternalStreamsBuilderTest.internalTopologyBuilder(builder),
StreamsBuilderTest.internalTopologyBuilder(builder),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalThread.applicationId,
Expand Down Expand Up @@ -363,7 +363,7 @@ public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception
final StreamThread originalLeaderThread = leaderStreamThreads[0];

final TestStreamThread leaderTestStreamThread = new TestStreamThread(
InternalStreamsBuilderTest.internalTopologyBuilder(builderLeader),
StreamsBuilderTest.internalTopologyBuilder(builderLeader),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalLeaderThread.applicationId,
Expand All @@ -389,7 +389,7 @@ public boolean conditionMet() {
final StreamThread originalFollowerThread = followerStreamThreads[0];

final TestStreamThread followerTestStreamThread = new TestStreamThread(
InternalStreamsBuilderTest.internalTopologyBuilder(builderFollower),
StreamsBuilderTest.internalTopologyBuilder(builderFollower),
streamsConfig,
new DefaultKafkaClientSupplier(),
originalFollowerThread.applicationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void apply(final String key, final String value) {
}

@Test
public void shouldLeftJoinWithStream() throws Exception {
public void shouldLeftJoinWithStream() {
stream.leftJoin(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
.foreach(action);

Expand All @@ -84,7 +84,7 @@ public void shouldLeftJoinWithStream() throws Exception {
}

@Test
public void shouldInnerJoinWithStream() throws Exception {
public void shouldInnerJoinWithStream() {
stream.join(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
.foreach(action);

Expand Down
Loading