Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean schema registry subjects of named foreign key joins #153

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/main/java/com/bakdata/kafka/util/TopologyInformation.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
* Representation of the nodes of a Kafka Streams topology
*/
public class TopologyInformation {
private static final String SUBSCRIPTION_REGISTRATION_SUFFIX = "-subscription-registration-topic";
private static final String SUBSCRIPTION_RESPONSE_SUFFIX = "subscription-response-topic";
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
private static final String CHANGELOG_SUFFIX = "-changelog";
private static final String REPARTITION_SUFFIX = "-repartition";
private static final String FILTER_SUFFIX = "-filter";
Expand Down Expand Up @@ -107,7 +109,7 @@ private static Stream<Processor> getAllProcessors(final Collection<Node> nodes)
}

private static Stream<String> createPseudoTopics(final String topic) {
if (topic.contains("FK-JOIN-SUBSCRIPTION-REGISTRATION")) {
if (isSubscriptionRegistrationTopic(topic)) {
return PSEUDO_TOPIC_SUFFIXES.stream().map(suffix -> String.format("%s%s", topic, suffix));
}
return Stream.empty();
Expand Down Expand Up @@ -175,6 +177,14 @@ public List<String> getIntermediateTopics(final Collection<String> allTopics) {
.collect(Collectors.toList());
}

private static boolean isSubscriptionResponseTopic(final String topic) {
return topic.endsWith(SUBSCRIPTION_RESPONSE_SUFFIX) || topic.contains("FK-JOIN-SUBSCRIPTION-RESPONSE");
}

private static boolean isSubscriptionRegistrationTopic(final String topic) {
return topic.endsWith(SUBSCRIPTION_REGISTRATION_SUFFIX) || topic.contains("FK-JOIN-SUBSCRIPTION-REGISTRATION");
}

private boolean isInternalTopic(final String topic) {
if (topic.startsWith("KSTREAM-") || topic.startsWith("KTABLE-")) {
return true;
Expand All @@ -187,7 +197,10 @@ private boolean isInternalTopic(final String topic) {
final List<String> repartitionTopics = this.getRepartitionTopics().collect(Collectors.toList());
return repartitionTopics.contains(topic);
}
return false;
if (isSubscriptionRegistrationTopic(topic)) {
return true;
}
return isSubscriptionResponseTopic(topic);
}

private boolean isExternalTopic(final String topic) {
Expand Down
26 changes: 24 additions & 2 deletions src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.TableJoined;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -136,7 +137,7 @@ void shouldReturnAllInternalTopics() {
}

@Test
void shouldReturnAllPseudoInternalTopics() {
void shouldReturnAllPseudoInternalTopicsForForeignKeyJoin() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KTable<String, Object> t1 = streamsBuilder.table("t1");
final KTable<Integer, Object> t2 = streamsBuilder.table("t2");
Expand All @@ -150,7 +151,28 @@ void shouldReturnAllPseudoInternalTopics() {
"id-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic",
"id-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk",
"id-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk",
"id-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh"
"id-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh",
"id-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic"
);
}

@Test
void shouldReturnAllPseudoInternalTopicsForNamedForeignKeyJoin() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KTable<String, Object> t1 = streamsBuilder.table("t1");
final KTable<Integer, Object> t2 = streamsBuilder.table("t2");
t1
.leftJoin(t2, ignored -> 1, (o1, o2) -> o1, TableJoined.as("join"))
.toStream()
.to("output");
final TopologyInformation topologyInformation = new TopologyInformation(streamsBuilder.build(), "id");
assertThat(topologyInformation.getInternalTopics())
.contains(
"id-join-subscription-registration-topic",
"id-join-subscription-registration-topic-fk",
"id-join-subscription-registration-topic-pk",
"id-join-subscription-registration-topic-vh",
"id-join-subscription-response-topic"
);
}

Expand Down