Skip to content

KAFKA-20259: Verify IQv1 and IQv2 do not break with headers-aware state stores#21906

Open
Shekharrajak wants to merge 8 commits intoapache:trunkfrom
Shekharrajak:KAFKA-20259-iq-headers-verification
Open

KAFKA-20259: Verify IQv1 and IQv2 do not break with headers-aware state stores#21906
Shekharrajak wants to merge 8 commits intoapache:trunkfrom
Shekharrajak:KAFKA-20259-iq-headers-verification

Conversation

@Shekharrajak
Copy link
Copy Markdown

KAFKA-20259: Verify IQv1 and IQv2 do not break with headers-aware state stores

Add verification that IQv1 and IQv2 query interfaces work correctly
against the headers-aware state stores introduced by KIP-1271/KIP-1285.

@github-actions github-actions bot added triage PRs from the community streams tests Test fixes (including flaky tests) labels Mar 31, 2026
@Shekharrajak Shekharrajak force-pushed the KAFKA-20259-iq-headers-verification branch from 5a719d0 to d685766 Compare March 31, 2026 09:29
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 8, 2026

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Shekharrajak for the PR.
Please run ./gradlew spotlessApply and then ./gradlew checkstyleMain checkstyleTest spotlessCheck

Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Shekharrajak for the PR! I made a pass.
Did you run QueryableStateIntegrationTest.java to see if the IQv2 queries really work? Since we do not support IQv2 for headers-store yet!

About Iqv1 testing, I was wondering if this PR can also verify that headers are actually preserved through the query path? Currently it only verifies that queries work, but doesn't check if headers are accessible via any query API.


// IQv1 verification for headers-aware stores: ensure ReadOnly store interfaces
// are accessible and return correct results when dsl.store.format=HEADERS is used.
if (storeToTest.isHeadersAware()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume headers-aware store are tested in the former if clauses as well!

// are accessible and return correct results when dsl.store.format=HEADERS is used.
if (storeToTest.isHeadersAware()) {
if (storeToTest.keyValue()) {
shouldHandleIQv1KeyValueQuery();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is intended exclusively for IQv2, so it's better not to mix in any IQv1-related items.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. updating.

@github-actions github-actions bot added the small Small PRs label Apr 14, 2026
@Shekharrajak
Copy link
Copy Markdown
Author

Removed the three headers-aware store variants(TIME_ROCKS_KV_HEADERS, TIME_ROCKS_WINDOW_HEADERS, ROCKS_SESSION_HEADERS) from IQv2StoreIntegrationTest - since it is failing. Their IQv1 coverage remains in QueryableStateIntegrationTest.

@github-actions github-actions bot removed needs-attention triage PRs from the community labels Apr 15, 2026
Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The build is failing due to this test: QueryableStateIntegrationTest > shouldBeAbleToQueryHeadersAwareStoreWithIQv1(TestInfo)

}
}


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please revert all changes of this file? thanks.

Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @Shekharrajak, for updating the PR. Please run the tests locally before pushing, so we can be sure they are actually passing.

Your IQv1 test looks good, but we need more comprehensive verification. Here is the list I suggest we cover:

                                                                                                                                                 
  Basic Operations                                                                                                                               
                                                                                                                                                 
  - shouldQueryTimestampedKeyValueStoreWithHeaders_singleKey() - verify get() returns value, timestamp, AND headers                              
  - shouldQueryTimestampedKeyValueStoreWithHeaders_nullKey() - verify null handling
  - shouldQueryTimestampedKeyValueStoreWithHeaders_nonExistentKey() - verify missing key returns null                                            
  - shouldQueryTimestampedKeyValueStoreWithHeaders_emptyHeaders() - verify records with no headers                                               
  - shouldQueryTimestampedKeyValueStoreWithHeaders_multipleHeaders() - verify multiple headers per record                                        
                                                                                                                                                 
  Range/Iteration Operations                                                                                                                     
                                                                                                                                                 
  - shouldQueryTimestampedKeyValueStoreWithHeaders_range() - verify range(from, to) preserves headers                                            
  - shouldQueryTimestampedKeyValueStoreWithHeaders_all() - verify all() iterator preserves headers
  - shouldQueryTimestampedKeyValueStoreWithHeaders_prefixScan() - verify prefixScan() preserves headers                                          
  - shouldQueryTimestampedKeyValueStoreWithHeaders_approximateNumEntries() - verify count works                                                  
                                                                                                                                                 
  Multi-Instance/Distribution                                                                                                                    
                                                                                                                                                 
  - shouldQueryTimestampedKeyValueStoreWithHeaders_fromActiveHost() - query active instance directly                                             
  - shouldQueryTimestampedKeyValueStoreWithHeaders_fromStandbyHost() - query via standby (remote query)
  - shouldQueryTimestampedKeyValueStoreWithHeaders_allKeysInDistributedSetup() - like verifyAllKVKeys but for headers                            
                                                                                                                                                 
  2. TimestampedWindowStoreWithHeaders Tests                                                                                                     
                                                                                                                                                 
  Basic Window Operations                                                                                                                        
                  
  - shouldQueryTimestampedWindowStoreWithHeaders_singleWindow() - fetch(key, timeFrom, timeTo) with headers                                      
  - shouldQueryTimestampedWindowStoreWithHeaders_multipleWindows() - verify multiple time windows
  - shouldQueryTimestampedWindowStoreWithHeaders_emptyHeaders() - window records without headers                                                 
  - shouldQueryTimestampedWindowStoreWithHeaders_overlappingWindows() - verify windowing with headers
                                                                                                                                                 
  Range/Iteration Operations
                                                                                                                                                 
  - shouldQueryTimestampedWindowStoreWithHeaders_fetchAll() - fetchAll(timeFrom, timeTo) preserves headers                                       
  - shouldQueryTimestampedWindowStoreWithHeaders_all() - all() iterator preserves headers
  - shouldQueryTimestampedWindowStoreWithHeaders_backwardFetch() - backward iteration with headers                                               
                  
  Multi-Instance                                                                                                                                 
                  
  - shouldQueryTimestampedWindowStoreWithHeaders_fromActiveHost()                                                                                
  - shouldQueryTimestampedWindowStoreWithHeaders_fromStandbyHost()
  - shouldQueryTimestampedWindowStoreWithHeaders_allKeysInDistributedSetup() - like verifyAllWindowedKeys but for headers
                                                                                                                                                 
  3. SessionStoreWithHeaders Tests
                                                                                                                                                 
  Basic Session Operations

  - shouldQuerySessionStoreWithHeaders_singleSession() - fetch(key) returns AggregationWithHeaders                                               
  - shouldQuerySessionStoreWithHeaders_multipleSessions() - verify multiple sessions per key
  - shouldQuerySessionStoreWithHeaders_emptyHeaders() - sessions without headers                                                                 
  - shouldQuerySessionStoreWithHeaders_mergedSessions() - verify headers in merged sessions
                                                                                                                                                 
  Range/Iteration Operations
                                                                                                                                                 
  - shouldQuerySessionStoreWithHeaders_findSessions() - findSessions(earliestSessionEndTime, latestSessionStartTime)                             
  - shouldQuerySessionStoreWithHeaders_backwardFindSessions() - backward iteration
  - shouldQuerySessionStoreWithHeaders_fetchAll() - fetch all sessions                                                                           
                                                                                                                                                 
  Multi-Instance                                                                                                                                 
                                                                                                                                                 
  - shouldQuerySessionStoreWithHeaders_fromActiveHost()
  - shouldQuerySessionStoreWithHeaders_fromStandbyHost()

                                                                                                                                                 
  Headers Validation                                                                                                                             
                                                                                                                                                 
  - shouldPreserveHeadersOrder() - verify headers maintain insertion order                                                                       
  - shouldPreserveHeadersValues() - verify exact header key/value pairs
  - shouldHandleDuplicateHeaderKeys() - Kafka allows duplicate header keys                                                                       
                                                                                                                                                 
  Store Configuration                                                                                                                            
                                                                                                                                                 
  - shouldQueryHeadersStoreCreatedWithDSL() - via dsl.store.format=HEADERS config                                                                
  - shouldQueryHeadersStoreCreatedWithMaterialized() - via Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders())
                                                                                                                                                 
                                                                                                                                                 
  Failure/Recovery                                                                                                                               
                                                                                                                                                 
  - shouldQueryAfterRebalance() - verify queries work after rebalancing                                                                          
  - shouldQueryDuringRestoration() - verify behavior during state store restoration

Of course, this is not an exact or exhaustive list—you can cover multiple items in a single test. Please also look at the other tests in this class to get an idea of the expected style and coverage.

We’re also a bit pressed for time: we want to confirm that IQv1 is really working, and if there is a bug, we’d like to fix it before the 4.3 release goes out. All related bugs should be captured in this ticket.

Finally, please also rebase your branch to pick up the latest changes. Thanks in advance.

@Shekharrajak Shekharrajak force-pushed the KAFKA-20259-iq-headers-verification branch from 0d75f20 to 2a84dad Compare April 16, 2026 16:43
@github-actions github-actions bot removed the small Small PRs label Apr 16, 2026
@Shekharrajak
Copy link
Copy Markdown
Author

./gradlew :streams:integration-tests:test --tests "org.apache.kafka.streams.integration.QueryableStateIntegrationTest"

Tests looking fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved streams tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants