KAFKA-20492: Prepare DBAccessor for txn support#22142
KAFKA-20492: Prepare DBAccessor for txn support#22142nicktelford wants to merge 1 commit intoapache:trunkfrom
Conversation
Moves iterator construction (all, range, prefixScan) from SingleColumnFamilyAccessor into default methods on DBAccessor. SingleColumnFamilyAccessor now delegates to the accessor, allowing future DBAccessor implementations (e.g. a transactional wrapper) to override iteration behaviour. Adds no-op commitStagedWrites() and rollbackStagedWrites() defaults to DBAccessor, and calls commitStagedWrites() from AbstractColumnFamilyAccessor.commit(). These are no-ops for the existing DirectDBAccessor but provide the hook for a transactional accessor to flush buffered writes atomically with offset commits.
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @nicktelford for the PR, I've made a pass
| } | ||
|
|
||
| default void commitStagedWrites() throws RocksDBException { | ||
| // no-op for non-transactional accessors |
There was a problem hiding this comment.
should this throw an UnsupportedOperationException by default?
There was a problem hiding this comment.
The reason this is a no-op is for the existing DirectDBAccessor. I could make these not have a default implementation and move the no-op to an @Override on DirectDBAccessor if you'd prefer?
| } | ||
|
|
||
| default void rollbackStagedWrites() { | ||
| // no-op for non-transactional accessors |
| void reset(); | ||
| void close(); | ||
|
|
||
| default ManagedKeyValueIterator<Bytes, byte[]> all(final ColumnFamilyHandle cf, final String storeName, final boolean forward) { |
There was a problem hiding this comment.
Just wondering if we could use some tests for this addition or are the existing ones enough coverage
There was a problem hiding this comment.
Checking on this surfaced a subtle bug: the DualColumnFamilyAccessor hasn't been updated to use DBAccessor to construct its iterators. This means that when the TransactionalDBAccessor is later introduced, stores that use dual column families (e.g. RocksDBTimestampedStore) would fail to read-your-own-writes on calls to all, prefixScan and range.
I'm working on a fix for this and will include unit tests that verify DBAccessor#{all,range,prefixScan} are being called as expected.
| } | ||
| } | ||
| } | ||
| accessor.commitStagedWrites(); |
There was a problem hiding this comment.
Maybe update the AbstractColumnFamilyAccessorTest to validate this call is made
| } | ||
|
|
||
| default ManagedKeyValueIterator<Bytes, byte[]> range(final ColumnFamilyHandle cf, final String storeName, | ||
| final Bytes from, final Bytes to, |
| } | ||
|
|
||
| default ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final ColumnFamilyHandle cf, final String storeName, | ||
| final Bytes prefix, final Bytes to) { |
| } | ||
| } | ||
| accessor.commitStagedWrites(); | ||
| } |
There was a problem hiding this comment.
The AbstractColumnFamilyAccessor.commit(accessor, Position storePosition) doesn't call commit - I'm assumign this is intential, but I wanted to confirm maybe add a comment on the method why it doesn't?
|
Kicked off a system test will post results later |
Moves iterator construction (all, range, prefixScan) from
SingleColumnFamilyAccessor into default methods on DBAccessor.
SingleColumnFamilyAccessor now delegates to the accessor, allowing
future DBAccessor implementations (e.g. a transactional wrapper) to
override iteration behaviour.
Adds no-op commitStagedWrites() and rollbackStagedWrites() defaults to
DBAccessor, and calls commitStagedWrites() from
AbstractColumnFamilyAccessor.commit(). These are no-ops for the existing
DirectDBAccessor but provide the hook for a transactional accessor to
flush buffered writes atomically with offset commits.
Co-Authored-By: Claude Sonnet 4.6 noreply@anthropic.com
Reviewers: Bill Bejeck bbejeck@apache.org