-
Notifications
You must be signed in to change notification settings - Fork 3
Rework deserializing #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
85ef02d
to
f82d723
Compare
|
||
private: | ||
|
||
const uint8_t* m_buf_ptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't store the buffer length, how can you make sure you're not accessing memory out of bounds?
const uint8_t* m_buf_ptr; | ||
}; | ||
|
||
[[nodiscard]] EncapsulatedMessage create_encapsulated_message(const uint8_t* buf_ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you want an API like:
// Return the encapsulated message and the rest of the span
std::pair<EncapsulatedMessage, std::span<const uint8_t>> extract_encapsulated_message(std::span<const uint8_t>);
As a side note, I think in the future, and for relatively big PRs with multiple files like this one, having multiple commits where corresponding messages describe what they are doing would make the review easier. I don't know if the additional code is making the project not buildable anymore, but maybe it's best if we remove all the versioning and install parts to focus on the core changes in this PR. |
Are we planning eventually to use functions from |
9347b29
to
620ea81
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a comprehensive rework of the serialization system to improve code organization, add new functionality, and enhance testing infrastructure. The changes restructure the codebase with proper namespacing, introduce new deserialization capabilities, and add extensive integration testing with Arrow data files.
- Reorganized headers and source files with proper
sparrow_ipc
namespace structure - Added new deserialization functionality for streams and various array types
- Introduced comprehensive integration testing with Arrow testing data files
Reviewed Changes
Copilot reviewed 41 out of 44 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
tests/test_utils.cpp | Reformatted test assertions for better readability |
tests/test_primitive_array_with_files.cpp | New integration tests comparing stream vs JSON deserialization |
tests/test_primitive_array_serialization.cpp | Updated includes and minor formatting improvements |
tests/test_null_array_serialization.cpp | Updated includes to use new header structure |
tests/test_arrow_schema.cpp | New comprehensive tests for Arrow schema functionality |
tests/metadata_sample.hpp | New helper for metadata testing with endianness support |
tests/CMakeLists.txt | Added new test files and dependencies |
src/utils.cpp | Updated includes and improved code formatting |
src/serialize_null_array.cpp | Updated to use new deserialization functions |
src/serialize.cpp | Moved deserialization functions and improved formatting |
src/metadata.cpp | New utility for metadata conversion |
src/encapsulated_message.cpp | New class for handling encapsulated Arrow messages |
src/deserialize_utils.cpp | New utilities for deserialization operations |
src/deserialize_fixedsizebinary_array.cpp | New deserialization for fixed-size binary arrays |
src/deserialize.cpp | New comprehensive deserialization implementation |
Multiple header files | Reorganized with proper namespace structure and new functionality |
Comments suppressed due to low confidence (3)
src/utils.cpp:1
- [nitpick] These multi-line trailing comments are hard to read and maintain. Consider moving them above the variable declarations or making them single-line comments.
#include "sparrow_ipc/utils.hpp"
src/utils.cpp:1
- [nitpick] These multi-line trailing comments are hard to read and maintain. Consider moving them above the variable declarations or making them single-line comments.
#include "sparrow_ipc/utils.hpp"
src/utils.cpp:1
- [nitpick] These multi-line trailing comments are hard to read and maintain. Consider moving them above the variable declarations or making them single-line comments.
#include "sparrow_ipc/utils.hpp"
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/utils.cpp
Outdated
const auto map_type = org::apache::arrow::flatbuf::CreateMap(builder, false); // not | ||
// sorted | ||
// keys |
Copilot
AI
Sep 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The multi-line comment is unnecessarily fragmented. Consider using a single-line comment: // not sorted keys
or moving the comment above the line.
const auto map_type = org::apache::arrow::flatbuf::CreateMap(builder, false); // not | |
// sorted | |
// keys | |
const auto map_type = org::apache::arrow::flatbuf::CreateMap(builder, false); // not sorted keys |
Copilot uses AI. Check for mistakes.
@@ -0,0 +1,44 @@ | |||
#include <cstdint> |
Copilot
AI
Sep 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing header guard. Add #pragma once
at the beginning of the file to prevent multiple inclusions.
Copilot uses AI. Check for mistakes.
const auto offset_metadata = record_batch.buffers()->Get(buffer_index++); | ||
auto offset_ptr = const_cast<uint8_t*>(body.data() + offset_metadata->offset()); | ||
const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); | ||
auto buffer_ptr = const_cast<uint8_t*>(body.data() + buffer_metadata->offset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well, you might check that the advertised buffer lengths for offsets and data are consistent with the batch length and fall within bounds of the body.
Besides being good security practice, having such checks will make the debugging experience much nicer in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); | ||
deserialize_schema_message(std::span<const uint8_t>(buffer), current_offset, name, metadata); | ||
|
||
// II - Deserialize the RecordBatch message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit surprised by the logic here. Typically, an IPC stream has a single Schema message at start, followed by an arbitrary number of RecordBatch messages (optionally other messages as well). Here you seem to be assuming that the IPC stream will only ever contain a single RecordBatch, which is weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, I would not expect a single stateless function to handle both Schema and RecordBatch at once.
I'm also surprised that this function knows the desired type primitive_array<T>
before deserializing the Schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only reworked the deserialization. The serialization will be in another PR. Here it's the old implementation which is kept for tests. But it will be removed soon.
); | ||
|
||
current_offset += utils::align_to_8(batch_meta_len); | ||
const uint8_t* body_ptr = buf_ptr + current_offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR, but I'm surprised to see raw new
invocations below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the old implementation which be removed soon
src/deserialize.cpp
Outdated
) | ||
{ | ||
const uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(data.data() + current_offset)); | ||
current_offset += sizeof(uint32_t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, you might want some bounds checking here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/deserialize.cpp
Outdated
schema_message->header() | ||
); | ||
const auto fields = flatbuffer_schema->fields(); | ||
if (fields->size() != 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose you plan to remove this limitation later? Otherwise you'll fail on most real-world data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
size_t buffer_index = 0; | ||
|
||
std::vector<sparrow::array> arrays; | ||
arrays.reserve(schema.fields()->size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly you are not limited to 1 field here.
src/deserialize.cpp
Outdated
const std::optional<std::vector<sparrow::metadata_pair>> | ||
metadata = fb_custom_metadata == nullptr | ||
? std::nullopt | ||
: std::make_optional(to_sparrow_metadata(*fb_custom_metadata)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deserializes Schema metadata again for each RecordBatch, why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the moment I avoid recreating the metadatas, but we still create several time ArrowSchema. I created a PR to share the same schema in several record batch: #20
const EncapsulatedMessage& encapsulated_message | ||
) | ||
{ | ||
const size_t length = static_cast<size_t>(record_batch.length()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that you don't support buffer compression, you should IMHO check the compression
field in the RecordBatch and error out if present. Better than returning garbage data to the user :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#18 I created an issue for that
{ | ||
case org::apache::arrow::flatbuf::MessageHeader::Schema: | ||
{ | ||
schema = message->header_as_Schema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you have a function deserialize_schema_message
if it's not being used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the deserialize_schema_message 👍
So what's the plan here? This rework seems to be intending to drop all specific serializations/deserializations ( |
src/serialize.cpp
Outdated
|
||
const size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message) | ||
const size_t current_size = final_buffer.size(); // Get the current size (which is the end of the | ||
// Schema message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment assumes you're only serializing a single RecordBatch in the IPC stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be reworked in another PR 👍, I just reworked the deserialization for the moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
src/serialize.cpp
Outdated
const ArrowArray& arrow_arr, | ||
const std::vector<int64_t>& buffers_sizes, | ||
std::vector<uint8_t>& final_buffer | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a more performance-minded implementation, you would probably want to serialize directly to a generic writable handle (which can be an in-memory buffer writer but also a file handle), to avoid making an intermediate copy of all buffers before emitting them. Perhaps std::ostream
is a suitable abstraction or perhaps not, I don't know :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be reworked in another PR 👍, I just reworked the deserialization for the moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
src/serialize.cpp
Outdated
// start | ||
|
||
// Write the 4-byte metadata length for the RecordBatch message | ||
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the code below: the null bitmap is optional in the IPC stream as well, so the memset
thing is really sub-optimal :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be reworked in another PR 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@Hind-M Ok let's delete the old implementation |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #15 +/- ##
=======================================
Coverage ? 70.91%
=======================================
Files ? 18
Lines ? 832
Branches ? 0
=======================================
Hits ? 590
Misses ? 242
Partials ? 0
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
No description provided.