Skip to content

KAFKA-20224: Refactor Streams iterator adapters to reduce duplication#21830

Open
nileshkumar3 wants to merge 3 commits intoapache:trunkfrom
nileshkumar3:KAFKA-20224-iterator-adapter-refactor
Open

KAFKA-20224: Refactor Streams iterator adapters to reduce duplication#21830
nileshkumar3 wants to merge 3 commits intoapache:trunkfrom
nileshkumar3:KAFKA-20224-iterator-adapter-refactor

Conversation

@nileshkumar3
Copy link
Contributor

@nileshkumar3 nileshkumar3 commented Mar 20, 2026

We had three almost-identical iterators that only wrapped the inner
iterator and ran a different HeadersBytesStore conversion on the value
bytes. I pulled that into a small MappingKeyValueIteratorAdapter that
takes a Function<byte[], byte[]> and left the three existing adapter
classes as thin subclasses so names and call sites stay the same.
Behavior should be unchanged; the usual ToHeaders iterator/store/window
tests still pass.

@github-actions github-actions bot added triage PRs from the community streams labels Mar 20, 2026
@nileshkumar3 nileshkumar3 changed the title KAFKA-20224: Deduplicate ToHeaders iterator adapters with MappingKeyValueIteratorAdapter KAFKA-20224: Refactor Streams iterator adapters to reduce duplication Mar 20, 2026
@mjsax mjsax added kip Requires or implements a KIP ci-approved and removed triage PRs from the community labels Mar 20, 2026

class PlainToHeadersIteratorAdapter<K> implements KeyValueIterator<K, byte[]> {
private final KeyValueIterator<K, byte[]> innerIterator;
class PlainToHeadersIteratorAdapter<K> extends MappingKeyValueIteratorAdapter<K> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need to keep these sub-classes? Or could the calling code just do newMappingKeyValueIteratorAdapter(...) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. These subclasses no longer add any behavior beyond selecting the mapping function, so they could be removed and the call could instantiate MappingKeyValueIteratorAdapter directly. I kept them earlier here mainly to limit churn, but I cleaned this.

…ueIteratorAdapter factories

Remove PlainToHeadersIteratorAdapter, TimestampedToHeadersIteratorAdapter, and
SessionToHeadersIteratorAdapter. Call sites use plainToHeaders, timestampedToHeaders,
and sessionToHeaders on MappingKeyValueIteratorAdapter. Window store iterator
adapters extend MappingKeyValueIteratorAdapter directly. Update tests.

Made-with: Cursor
Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR
only some nits.

}

/**
* Ensures backward compatibility between {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace full qualified with import


/**
* Ensures backward compatibility between {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders}
* and plain {@link org.apache.kafka.streams.state.KeyValueStore}: values are wrapped with empty headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace full qualified with import


/**
* Ensures backward compatibility between {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders}
* and {@link org.apache.kafka.streams.state.TimestampedKeyValueStore}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace full qualified with import

Comment on lines +66 to +67
* Ensures backward compatibility between {@link org.apache.kafka.streams.state.SessionStoreWithHeaders}
* and {@link org.apache.kafka.streams.state.SessionStore}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace full qualified with import

@nileshkumar3
Copy link
Contributor Author

Thanks for the PR only some nits.

Thanks for reviewing, replaced with import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants