Skip to content

Use custom streams implementation to simplify error handling, writing to topics and serde configuration#265

Merged
philipp94831 merged 80 commits intomasterfrom
feature/improved-kstream
Mar 11, 2025
Merged

Use custom streams implementation to simplify error handling, writing to topics and serde configuration#265
philipp94831 merged 80 commits intomasterfrom
feature/improved-kstream

Conversation

@philipp94831
Copy link
Copy Markdown
Member

@philipp94831 philipp94831 commented Jan 6, 2025

New KStreamX interface adds new methods to simplify topology creation. It inherits from KStream and therefore no changes to existing code are required.

Writing to topics

Before

final KStream<String, String> input = builder.streamInput();
input.to(builder.getTopics().getOutputTopic());

After

final KStreamX<String, String> input = builder.streamInput();
input.toOutputTopic();

Configuring serdes

Before

final Serde<SpecificRecord> serde = builder.createConfigurator().configureForValues(new SpecificAvroSerde<>());
final KStream<String, SpecificRecord> input = builder.streamInput(Consumed.with(null, serde));
input.to(builder.getTopics().getOutputTopic(), Produced.valueSerde(serde));

After

final Serde<SpecificRecord> serde = new SpecificAvroSerde<>();
final KStreamX<String, SpecificRecord> input = builder.streamInput(ConsumedX.valueSerde(serde));
input.toOutputTopic(ProducedX.valueSerde(serde));

Error handling

Before

final KStream<String, String> input = builder.streamInput();
final KStream<String, ProcessedValue<String, Integer>> processedInput =
        input.mapValues(ErrorCapturingValueMapper.captureErrors(value -> Integer.parseInt(value)));
final KStream<String, Integer> intStream = processedInput.flatMapValues(ProcessedValue::getValues);
intStream.to(builder.getTopics().getOutputTopic());
final KStream<String, String> deadLetters = processedInput.flatMapValues(ProcessedValue::getErrors)
        .processValues(ErrorHeaderProcessor.withErrorHeaders("Error parsing integers"));
deadLetters.to(builder.getTopics().getErrorTopic());

After

final KStreamX<String, String> input = builder.streamInput();
final KErrorStreamX<String, String, String, Integer> processedInput =
        input.mapValuesCapturingErrors(value -> Integer.parseInt(value));
final KStreamX<String, Integer> intStream = processedInput.values();
intStream.toOutputTopic();
final KStreamX<String, String> deadLetters = processedInput.errors()
        .processValues(ErrorHeaderProcessor.withErrorHeaders("Error parsing integers"));
deadLetters.toErrorTopic();

@philipp94831 philipp94831 self-assigned this Jan 6, 2025
@philipp94831 philipp94831 changed the title Simplify writing to output topics Use custom streams implementation to simplify error handling, writing to topics and serde configuration Jan 27, 2025
@philipp94831 philipp94831 marked this pull request as ready for review February 20, 2025 07:46
Comment thread README.md Outdated
Co-authored-by: Raphael <22345578+raphala@users.noreply.github.com>
jkbe
jkbe previously approved these changes Mar 4, 2025
Copy link
Copy Markdown
Member

@jkbe jkbe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new abstractions! In hindsight, I think doing a better job at hiding TopologyBuilder was actually missing from the library the whole time.

raphala
raphala previously approved these changes Mar 5, 2025
torbsto
torbsto previously approved these changes Mar 10, 2025
Comment thread streams-bootstrap-core/src/main/java/com/bakdata/kafka/KErrorStream.java Outdated
@philipp94831 philipp94831 dismissed stale reviews from torbsto, raphala, and jkbe via 5b81b3b March 10, 2025 18:28
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Mar 11, 2025

@philipp94831 philipp94831 merged commit 8d6f876 into master Mar 11, 2025
@philipp94831 philipp94831 deleted the feature/improved-kstream branch March 11, 2025 13:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants