Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: patch KafkaStreamsInternalTopicsAccessor as KS internals changed #4621

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
package io.confluent.ksql.test.tools;

import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;

/**
* Hack to get around the fact that the {@link TopologyTestDriver} class does not expose its set of
* internal topics, which is needed to determine if any unexpected topics have been created.
*
* <p>Note: We should find a better way of doing this - this approach is very brittle
*/
final class KafkaStreamsInternalTopicsAccessor {

private static final Field INTERNAL_TOPICS_FIELD = getInternalTopicsField();
private static final Field INTERNAL_TOPOLOGY_BUILDER_FIELD = getInternalTopologyBuilderField();
private static final Field INTERNAL_TOPIC_NAMES_FIELD = getInternalTopicNamesField();

private KafkaStreamsInternalTopicsAccessor() {
}
Expand All @@ -35,20 +40,32 @@ static Set<String> getInternalTopics(
final TopologyTestDriver topologyTestDriver
) {
try {
return (Set<String>) INTERNAL_TOPICS_FIELD.get(topologyTestDriver);
final Object internalTopologyBuilder = INTERNAL_TOPOLOGY_BUILDER_FIELD
.get(topologyTestDriver);
// Note - there is no memory barrier here so we could end up reading stale data if
// the internal topics are updated
return new HashSet<>((Set<String>) INTERNAL_TOPIC_NAMES_FIELD.get(internalTopologyBuilder));
} catch (final IllegalAccessException e) {
throw new AssertionError("Failed to get internal topic names", e);
}
}

private static Field getInternalTopicsField() {
private static Field getInternalTopologyBuilderField() {
return getField("internalTopologyBuilder", TopologyTestDriver.class);
}

private static Field getInternalTopicNamesField() {
return getField("internalTopicNames", InternalTopologyBuilder.class);
}

private static Field getField(final String fieldName, final Class<?> clazz) {
try {
final Field field = TopologyTestDriver.class.getDeclaredField("internalTopics");
final Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (final NoSuchFieldException e) {
throw new AssertionError(
"Kafka Streams's TopologyTestDriver class has changed its internals", e);
"Kafka Streams's has changed its internals", e);
}
}
}