refactor(object_store): PayloadView end-to-end; cross-store flushTo; atomic target swap#105
Merged
Merged
Conversation
…atomic target swap
A consolidated refactor of the ObjectStore / DataEngine surface that
makes PayloadView the universal vocabulary for ownership of bytes
across the SDK, adds zero-copy cross-instance transfer of entries
between stores, and exposes an atomic target-swap primitive on the
plugin data host used to redirect writes between two stores at runtime.
ObjectStore + ObjectEntry
- ObjectEntry::payload is std::any holding either
std::shared_ptr<const std::vector<uint8_t>> (eager owned bytes,
counted against the retention budget) or
std::function<sdk::PayloadView()> (lazy resolver returning Span +
BufferAnchor; not counted, bytes owned upstream).
resolveEntry dispatches via std::any_cast; each branch handles its
own concrete type. No static_pointer_cast on the lazy anchor — its
type erasure (BufferAnchor = shared_ptr<const void>) is preserved
end-to-end so producers can anchor on any shared_ptr<T>.
- ResolvedObjectEntry consolidates on PayloadView (Span + anchor).
Removes the previous shared_ptr<const vector<uint8_t>> field,
which locked consumers to a concrete anchor type and required a
hidden static_pointer_cast in resolveEntry that would fail silently
for non-vector anchors.
- ObjectStore::flushTo(ObjectStore& dst): two-phase, atomic, zero-
copy bulk transfer of entries between two instances. Topics matched
by descriptor; monotonicity enforced strictly per series; failure
leaves both sides untouched. Each ObjectEntry is moved by value
via std::move; the std::any inside transfers its buffer intact.
- DataEngine::flushTo(DataEngine& dst): symmetric primitive for the
columnar scalar store. Each TopicStorage's sealed-chunk deque is
moved through to the destination; no chunk constructor invoked.
- sdk::makePayloadView(std::vector<uint8_t>) replaces
sdk::makeOwnedPayloadView. Wraps a vector into a shared_ptr that
serves as both the bytes backing and the BufferAnchor.
- ObjectBytesBox removed from plugin_data_host. The C-ABI toolbox
handle becomes a heap-allocated PayloadView directly.
plugin_data_host atomic target swap
- setTarget on engine and parser write hosts atomically swap which
engine/store receives subsequent writes. Safe under concurrent
ingest: in-flight writes complete on the previous target.
Version bump
- 0.4.0 -> 0.5.0. Source-incompatible: ResolvedObjectEntry::data
renamed to payload (PayloadView instead of shared_ptr<vector>).
Consumers migrate from entry->data->{data,size,empty}() to
entry->payload.bytes.{data,size,empty}(); from entry->data to
entry->payload.anchor for ownership retention.
0d0a763 to
fafa624
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
A consolidated refactor of the
ObjectStore/DataEnginesurface that:sdk::PayloadView(Span<const uint8_t>+BufferAnchor) the universal vocabulary for ownership of bytes across the SDK.static_pointer_cast<const vector<uint8_t>>trap inresolveEntry— any producer can now anchor on anyshared_ptr<T>(arrow buffers, mmap pools, foreign ctx) and the type erasure travels intact to consumers.flushTo) for bothObjectStoreandDataEngine. Pointer-move of internal deques; entries cross the boundary without copy or closure invocation.plugin_data_hostwrite hosts (setTarget) so a host can redirect writes between two stores at runtime safely under concurrent ingest.Contents
ObjectEntry/ResolvedObjectEntryObjectEntry::payloadisstd::any, holding either:std::shared_ptr<const std::vector<uint8_t>>— eager owned, counted against the retention budget.std::function<sdk::PayloadView()>— lazy resolver returning Span + BufferAnchor; not counted (bytes are owned upstream).resolveEntrydispatches viastd::any_caston each alternative explicitly. Each branch handles its own concrete type — nostatic_pointer_caston the lazy anchor. TheBufferAnchor(=shared_ptr<const void>) stays opaque end-to-end.ResolvedObjectEntry::payloadissdk::PayloadView. The previousdatafield (shared_ptr<const vector<uint8_t>>) is removed — it locked consumers to a concrete anchor type and required the cast that would fail silently for non-vector anchors.Cross-store flush
ObjectStore::flushTo(ObjectStore& dst)andDataEngine::flushTo(DataEngine& dst): two-phase, atomic, zero-copy bulk transfer of entries / chunks between two instances. Topics matched by descriptor (dataset_id+ name); monotonicity enforced strictly per series/topic; failure leaves both sides untouched. Each entry/chunk is moved by value — thestd::any(ordeque<TopicChunk>) transfers its buffer intact, lazy closures preserved.Atomic target swap
DatastoreSourceWriteHost::setTarget,DatastoreParserWriteHost::setTarget, and the object-write equivalents atomically swap which engine/store receives subsequent writes. Safe under concurrent ingest: in-flight writes complete on the previous target, future writes go to the new one.Helper rename
sdk::makeOwnedPayloadView(vector)→sdk::makePayloadView(vector).ObjectBytesBoxremovedplugin_data_host.cpppreviously wrapped the resolved bytes in anObjectBytesBoxfor the C-ABI toolbox handle. WithPayloadViewonResolvedObjectEntry, the wrapper became redundant; the handle now points at a heap-allocatedPayloadViewdirectly.Version bump
0.4.0→0.5.0. Source-incompatible:ResolvedObjectEntry::datarenamed topayloadand changes shape (PayloadViewinstead ofshared_ptr<vector>).sdk::makeOwnedPayloadViewrenamed tosdk::makePayloadView.Consumer migration is mechanical:
entry->data->{data,size,empty}()→entry->payload.bytes.{data,size,empty}()entry->data == nullptr→entry->payload.anchor == nullptrentry->data(shared_ptr) →entry->payload.anchor(BufferAnchor = shared_ptr).Tests
pj_datastore/tests/object_store_test.cpp: 45 tests including new flushTo coverage (basic transfer, monotonicity rejection, lazy closure preservation across flush, shared_ptr identity, retention budget after flush).pj_datastore/tests/engine_integration_test.cpp: 21 tests including new DataEngine flushTo coverage.pj_datastore/tests/plugin_data_host_object_test.cpp: existing tests updated for the PayloadView surface.All green on Linux/gcc under
./build.sh --debug && ./test.sh.