diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 86b445f..8a5fc6c 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -36,7 +36,8 @@ jobs: -DCMAKE_INSTALL_PREFIX=$CONDA_PREFIX \ -DCMAKE_PREFIX_PATH=$CONDA_PREFIX \ -DSPARROW_IPC_BUILD_SHARED=${{ matrix.build_shared }} \ - -DSPARROW_IPC_BUILD_TESTS=ON + -DSPARROW_IPC_BUILD_TESTS=ON \ + -DSPARROW_IPC_BUILD_EXAMPLES=ON - name: Build sparrow-ipc working-directory: build @@ -50,6 +51,14 @@ jobs: working-directory: build run: cmake --build . --target run_tests_with_junit_report + - name: Build example + working-directory: build + run: cmake --build . --target write_and_read_streams + + - name: Run example + working-directory: build + run: cmake --build . --target run_example + - name: Install working-directory: build run: cmake --install . @@ -71,6 +80,7 @@ jobs: -DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \ -DSPARROW_IPC_BUILD_SHARED=${{ matrix.build_shared }} \ -DSPARROW_IPC_BUILD_TESTS=ON \ + -DSPARROW_IPC_BUILD_EXAMPLES=ON \ -DFETCH_DEPENDENCIES_WITH_CMAKE=MISSING - name: Build sparrow-ipc @@ -85,6 +95,14 @@ jobs: working-directory: build run: cmake --build . --target run_tests_with_junit_report + - name: Build example + working-directory: build + run: cmake --build . --target write_and_read_streams + + - name: Run example + working-directory: build + run: cmake --build . --target run_example + - name: Install working-directory: build run: sudo cmake --install . diff --git a/.github/workflows/osx.yml b/.github/workflows/osx.yml index 42c90c9..15dfab0 100644 --- a/.github/workflows/osx.yml +++ b/.github/workflows/osx.yml @@ -41,7 +41,8 @@ jobs: -DCMAKE_INSTALL_PREFIX=$CONDA_PREFIX \ -DCMAKE_PREFIX_PATH=$CONDA_PREFIX \ -DSPARROW_IPC_BUILD_SHARED=${{ matrix.build_shared }} \ - -DSPARROW_IPC_BUILD_TESTS=ON + -DSPARROW_IPC_BUILD_TESTS=ON \ + -DSPARROW_IPC_BUILD_EXAMPLES=ON - name: Build sparrow-ipc working-directory: build @@ -55,6 +56,14 @@ jobs: working-directory: build run: cmake --build . --target run_tests_with_junit_report + - name: Build example + working-directory: build + run: cmake --build . --target write_and_read_streams + + - name: Run example + working-directory: build + run: cmake --build . --target run_example + - name: Install working-directory: build run: cmake --install . @@ -81,6 +90,7 @@ jobs: -DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \ -DSPARROW_IPC_BUILD_SHARED=${{ matrix.build_shared }} \ -DSPARROW_IPC_BUILD_TESTS=ON \ + -DSPARROW_IPC_BUILD_EXAMPLES=ON \ -DFETCH_DEPENDENCIES_WITH_CMAKE=MISSING - name: Build sparrow-ipc @@ -95,6 +105,14 @@ jobs: working-directory: build run: cmake --build . --target run_tests_with_junit_report + - name: Build example + working-directory: build + run: cmake --build . --target write_and_read_streams + + - name: Run example + working-directory: build + run: cmake --build . --target run_example + - name: Install working-directory: build run: sudo cmake --install . diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index c215557..50b0f58 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -40,7 +40,8 @@ jobs: -DCMAKE_INSTALL_PREFIX=$CONDA_PREFIX \ -DCMAKE_PREFIX_PATH=$GLOB_PREFIX_PATH \ -DSPARROW_IPC_BUILD_SHARED=${{ matrix.build_shared }} \ - -DSPARROW_IPC_BUILD_TESTS=ON + -DSPARROW_IPC_BUILD_TESTS=ON \ + -DSPARROW_IPC_BUILD_EXAMPLES=ON - name: Build sparrow-ipc working-directory: build @@ -55,6 +56,14 @@ jobs: run: | cmake --build . --config ${{ matrix.build_type }} --target run_tests_with_junit_report + - name: Build example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target write_and_read_streams + + - name: Run example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target run_example + - name: Install working-directory: build run: cmake --install . --config ${{ matrix.build_type }} @@ -80,6 +89,7 @@ jobs: cmake -S ./ -B ./build \ -DSPARROW_IPC_BUILD_SHARED=${{ matrix.build_shared }} \ -DSPARROW_IPC_BUILD_TESTS=ON \ + -DSPARROW_IPC_BUILD_EXAMPLES=ON \ -DFETCH_DEPENDENCIES_WITH_CMAKE=MISSING \ $TEST_COVERAGE_ACTIVATION @@ -95,6 +105,14 @@ jobs: working-directory: build run: cmake --build . --config ${{ matrix.build_type }} --target run_tests_with_junit_report + - name: Build example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target write_and_read_streams + + - name: Run example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target run_example + - name: Install working-directory: build run: cmake --install . --config ${{ matrix.build_type }} diff --git a/CMakeLists.txt b/CMakeLists.txt index 86f5ae6..02d2f1e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,6 +82,9 @@ MESSAGE(STATUS "🔧 Build tests: ${SPARROW_IPC_BUILD_TESTS}") OPTION(SPARROW_IPC_BUILD_DOCS "Build sparrow-ipc documentation" OFF) MESSAGE(STATUS "🔧 Build docs: ${SPARROW_IPC_BUILD_DOCS}") +OPTION(SPARROW_IPC_BUILD_EXAMPLES "Build sparrow-ipc examples" OFF) +MESSAGE(STATUS "🔧 Build examples: ${SPARROW_IPC_BUILD_EXAMPLES}") + # Code coverage # ============= OPTION(SPARROW_IPC_ENABLE_COVERAGE "Enable sparrow-ipc test coverage" OFF) @@ -270,6 +273,13 @@ if(SPARROW_IPC_BUILD_DOCS) add_subdirectory(docs) endif() +# Examples +# ======== +if(SPARROW_IPC_BUILD_EXAMPLES) + message(STATUS "🔨 Create examples targets") + add_subdirectory(examples) +endif() + # Installation # ============ include(GNUInstallDirs) diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index 0276425..1d46d8f 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -52,7 +52,7 @@ endif() find_package_or_fetch( PACKAGE_NAME sparrow GIT_REPOSITORY https://github.com/man-group/sparrow.git - TAG 1.1.2 + TAG 1.2.0 ) unset(CREATE_JSON_READER_TARGET) @@ -111,7 +111,7 @@ if(SPARROW_IPC_BUILD_TESTS) # Iterate over all the files in the arrow-testing-data source directiory. When it's a gz, extract in place. file(GLOB_RECURSE arrow_testing_data_targz_files CONFIGURE_DEPENDS - "${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/1.0.0-littleendian/*.json.gz" + "${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/cpp-21.0.0/*.json.gz" ) foreach(file_path IN LISTS arrow_testing_data_targz_files) cmake_path(GET file_path PARENT_PATH parent_dir) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..b7e136a --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,55 @@ +cmake_minimum_required(VERSION 3.28) + +# Create executable for the write_and_read_streams example +add_executable(write_and_read_streams write_and_read_streams.cpp) + +# Link against sparrow-ipc and its dependencies +target_link_libraries(write_and_read_streams + PRIVATE + sparrow-ipc + sparrow::sparrow + arrow-testing-data +) + +# Set C++ standard to match the main project +set_target_properties(write_and_read_streams + PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF +) + +# Include directories for headers +target_include_directories(write_and_read_streams + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + +# Ensure generated flatbuffer headers are available +add_dependencies(write_and_read_streams generate_flatbuffers_headers) + +# Optional: Copy to build directory for easy execution +if(WIN32) + # On Windows, copy required DLLs + add_custom_command( + TARGET write_and_read_streams POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMENT "Copying sparrow and sparrow-ipc DLLs to example executable directory" + ) +endif() + +# Create a custom target to easily run the example +add_custom_target(run_example + COMMAND write_and_read_streams + DEPENDS write_and_read_streams + COMMENT "Running write_and_read_streams example" + USES_TERMINAL +) + +set_target_properties(run_example PROPERTIES FOLDER "Examples") diff --git a/examples/write_and_read_streams.cpp b/examples/write_and_read_streams.cpp new file mode 100644 index 0000000..104d496 --- /dev/null +++ b/examples/write_and_read_streams.cpp @@ -0,0 +1,414 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +const std::filesystem::path arrow_testing_data_dir = ARROW_TESTING_DATA_DIR; +const std::filesystem::path tests_resources_files_path = arrow_testing_data_dir / "data" / "arrow-ipc-stream" + / "integration" / "cpp-21.0.0"; + + +namespace sp = sparrow; + +// Random number generator +std::random_device rd; +std::mt19937 gen(rd()); + +namespace utils +{ + /** + * Helper function to create a record batch with the same schema but random values + * All batches have: int32 column, float column, bool column, and string column + */ + sp::record_batch create_random_record_batch(size_t num_rows) + { + // Helper lambda to generate a vector with random values + auto generate_vector = [num_rows](auto generator) + { + using T = decltype(generator()); + std::vector values(num_rows); + std::generate(values.begin(), values.end(), generator); + return values; + }; + + // Create integer column with random values + std::uniform_int_distribution int_dist(0, 1000); + auto int_array = sp::primitive_array(generate_vector( + [&]() + { + return int_dist(gen); + } + )); + + // Create float column with random values + std::uniform_real_distribution float_dist(-100.0f, 100.0f); + auto float_array = sp::primitive_array(generate_vector( + [&]() + { + return float_dist(gen); + } + )); + + // Create boolean column with random values + std::uniform_int_distribution bool_dist(0, 1); + auto bool_array = sp::primitive_array(generate_vector( + [&]() + { + return static_cast(bool_dist(gen)); + } + )); + + // Create string column with random values + const std::vector sample_strings = + {"alpha", "beta", "gamma", "delta", "epsilon", "zeta", "eta", "theta", "iota", "kappa"}; + std::uniform_int_distribution str_dist(0, sample_strings.size() - 1); + size_t counter = 0; + auto string_array = sp::string_array(generate_vector( + [&]() + { + return sample_strings[str_dist(gen)] + "_" + std::to_string(counter++); + } + )); + + // Create record batch with named columns (same schema for all batches) + return sp::record_batch( + {{"id", sp::array(std::move(int_array))}, + {"value", sp::array(std::move(float_array))}, + {"flag", sp::array(std::move(bool_array))}, + {"name", sp::array(std::move(string_array))}} + ); + } + + /** + * Verify that two sets of record batches are identical + * Returns true if all batches match, false otherwise + */ + bool verify_batches_match( + const std::vector& original_batches, + const std::vector& deserialized_batches + ) + { + if (original_batches.size() != deserialized_batches.size()) + { + std::cerr << "ERROR: Batch count mismatch! Original: " << original_batches.size() + << ", Deserialized: " << deserialized_batches.size() << "\n"; + return false; + } + + bool all_match = true; + for (size_t batch_idx = 0; batch_idx < original_batches.size(); ++batch_idx) + { + const auto& original = original_batches[batch_idx]; + const auto& deserialized = deserialized_batches[batch_idx]; + + // Check basic structure + if (original.nb_columns() != deserialized.nb_columns() + || original.nb_rows() != deserialized.nb_rows()) + { + std::cerr << "ERROR: Batch " << batch_idx << " structure mismatch!\n"; + all_match = false; + continue; + } + + // Check column names + if (!std::ranges::equal(original.names(), deserialized.names())) + { + std::cerr << "WARNING: Batch " << batch_idx << " column names mismatch!\n"; + } + + // Check column data + for (size_t col_idx = 0; col_idx < original.nb_columns(); ++col_idx) + { + const auto& orig_col = original.get_column(col_idx); + const auto& deser_col = deserialized.get_column(col_idx); + + if (orig_col.data_type() != deser_col.data_type()) + { + std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx << " type mismatch!\n"; + all_match = false; + continue; + } + + // Check values + for (size_t row_idx = 0; row_idx < orig_col.size(); ++row_idx) + { + if (orig_col[row_idx] != deser_col[row_idx]) + { + std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx << ", row " + << row_idx << " value mismatch!\n"; + std::cerr << " Original: " << orig_col[row_idx] + << ", Deserialized: " << deser_col[row_idx] << "\n"; + all_match = false; + } + } + } + } + + return all_match; + } +} + +/** + * Create multiple record batches with the same schema but random values + */ +std::vector create_record_batches(size_t num_batches, size_t rows_per_batch) +{ + std::cout << "1. Creating " << num_batches << " record batches with random values...\n"; + std::cout << " Each batch has the same schema: (id: int32, value: float, flag: bool, name: string)\n"; + + std::vector batches; + batches.reserve(num_batches); + + for (size_t i = 0; i < num_batches; ++i) + { + batches.push_back(utils::create_random_record_batch(rows_per_batch)); + } + + std::cout << " Created " << batches.size() << " record batches\n"; + for (size_t i = 0; i < batches.size(); ++i) + { + std::cout << std::format("{}\n\n", batches[i]); + } + + return batches; +} + +/** + * Serialize record batches to a stream + */ +std::vector serialize_batches_to_stream(const std::vector& batches) +{ + std::cout << "\n2. Serializing record batches to stream...\n"; + + std::vector stream_data; + sparrow_ipc::memory_output_stream stream(stream_data); + sparrow_ipc::serializer serializer(stream); + + // Serialize all batches using the streaming operator + serializer << batches << sparrow_ipc::end_stream; + + std::cout << " Serialized stream size: " << stream_data.size() << " bytes\n"; + + return stream_data; +} + +/** + * Deserialize stream back to record batches + */ +std::vector deserialize_stream_to_batches(const std::vector& stream_data) +{ + std::cout << "\n3. Deserializing stream back to record batches...\n"; + + auto batches = sparrow_ipc::deserialize_stream(stream_data); + + std::cout << " Deserialized " << batches.size() << " record batches\n"; + + return batches; +} + +/** + * Demonstrate individual vs batch serialization + */ +void demonstrate_serialization_methods( + const std::vector& batches, + const std::vector& batch_stream_data +) +{ + std::cout << "\n6. Demonstrating individual vs batch serialization...\n"; + + // Serialize individual batches one by one + std::vector individual_stream_data; + sparrow_ipc::memory_output_stream individual_stream(individual_stream_data); + sparrow_ipc::serializer individual_serializer(individual_stream); + + for (const auto& batch : batches) + { + individual_serializer << batch; + } + individual_serializer << sparrow_ipc::end_stream; + + std::cout << " Individual serialization size: " << individual_stream_data.size() << " bytes\n"; + std::cout << " Batch serialization size: " << batch_stream_data.size() << " bytes\n"; + + // Both should produce the same result + auto individual_deserialized = sparrow_ipc::deserialize_stream(individual_stream_data); + + if (individual_deserialized.size() == batches.size()) + { + std::cout << " ✓ Individual and batch serialization produce equivalent results\n"; + } + else + { + std::cerr << " ✗ Individual and batch serialization mismatch!\n"; + } +} + +/** + * Verify schema consistency across all batches + */ +bool verify_schema_consistency(const std::vector& batches) +{ + std::cout << "\n7. Verifying schema consistency across all batches...\n"; + + if (batches.empty()) + { + std::cout << " No batches to verify\n"; + return true; + } + + bool schema_consistent = true; + for (size_t i = 1; i < batches.size(); ++i) + { + if (batches[0].nb_columns() != batches[i].nb_columns()) + { + std::cerr << " ERROR: Batch " << i << " has different number of columns!\n"; + schema_consistent = false; + } + + for (size_t col_idx = 0; col_idx < batches[0].nb_columns() && col_idx < batches[i].nb_columns(); + ++col_idx) + { + const auto& col0 = batches[0].get_column(col_idx); + const auto& col_i = batches[i].get_column(col_idx); + + if (col0.data_type() != col_i.data_type()) + { + std::cerr << " ERROR: Batch " << i << ", column " << col_idx << " has different type!\n"; + schema_consistent = false; + } + + if (col0.name() != col_i.name()) + { + std::cerr << " ERROR: Batch " << i << ", column " << col_idx << " has different name!\n"; + schema_consistent = false; + } + } + } + + if (schema_consistent) + { + std::cout << " ✓ All batches have consistent schema!\n"; + } + else + { + std::cerr << " ✗ Schema inconsistency detected!\n"; + } + + return schema_consistent; +} + +/** + * Read and display a primitive stream file from test resources + */ +void read_and_display_test_file() +{ + std::cout << "\n8. Reading a primitive stream file from test resources...\n"; + + const std::filesystem::path primitive_stream_file = tests_resources_files_path + / "generated_primitive.stream"; + + if (std::filesystem::exists(primitive_stream_file)) + { + std::cout << " Reading file: " << primitive_stream_file << "\n"; + + // Read the stream file + std::ifstream stream_file(primitive_stream_file, std::ios::in | std::ios::binary); + if (!stream_file.is_open()) + { + std::cerr << " ERROR: Could not open stream file!\n"; + } + else + { + const std::vector file_stream_data( + (std::istreambuf_iterator(stream_file)), + (std::istreambuf_iterator()) + ); + stream_file.close(); + + std::cout << " File size: " << file_stream_data.size() << " bytes\n"; + + // Deserialize the stream + auto file_batches = sparrow_ipc::deserialize_stream(file_stream_data); + + std::cout << " Deserialized " << file_batches.size() << " record batch(es) from file\n"; + + // Display the first batch + if (!file_batches.empty()) + { + std::cout << " First batch from file:\n"; + std::cout << std::format("{}\n", file_batches[0]); + } + } + } + else + { + std::cout << " Note: Test resource file not found at " << primitive_stream_file << "\n"; + std::cout << " This is expected if test data is not available.\n"; + } +} + +int main() +{ + std::cout << "=== Sparrow IPC Stream Write and Read Example ===\n"; + std::cout << "Note: All record batches in a stream must have the same schema.\n\n"; + + try + { + // Configuration + constexpr size_t num_batches = 5; + constexpr size_t rows_per_batch = 10; + + // Step 1: Create several record batches with the SAME schema but random values + auto original_batches = create_record_batches(num_batches, rows_per_batch); + + // Step 2: Serialize the record batches to a stream + auto stream_data = serialize_batches_to_stream(original_batches); + + // Step 3: Deserialize the stream back to record batches + auto deserialized_batches = deserialize_stream_to_batches(stream_data); + + // Step 4: Verify that original and deserialized data match + std::cout << "\n4. Verifying data integrity...\n"; + + if (utils::verify_batches_match(original_batches, deserialized_batches)) + { + std::cout << " ✓ All data matches perfectly!\n"; + } + else + { + std::cerr << " ✗ Data verification failed!\n"; + return 1; + } + + // Step 5: Display sample data from the first batch + std::cout << "\n5. Sample data from the first batch:\n"; + std::cout << std::format("{}\n", original_batches[0]); + + // Step 6: Demonstrate individual serialization vs batch serialization + demonstrate_serialization_methods(original_batches, stream_data); + + // Step 7: Verify schema consistency + verify_schema_consistency(deserialized_batches); + + // Step 8: Read and display a primitive stream file from test resources + read_and_display_test_file(); + + std::cout << "\n=== Example completed successfully! ===\n"; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} diff --git a/include/sparrow_ipc/flatbuffer_utils.hpp b/include/sparrow_ipc/flatbuffer_utils.hpp index 4ec4ef7..87c322d 100644 --- a/include/sparrow_ipc/flatbuffer_utils.hpp +++ b/include/sparrow_ipc/flatbuffer_utils.hpp @@ -49,15 +49,45 @@ namespace sparrow_ipc * * @param builder Reference to the FlatBufferBuilder used for creating FlatBuffer objects * @param arrow_schema The ArrowSchema structure containing the field definition to convert + * @param name_override Optional field name to use instead of the name from arrow_schema. + * If provided, this name will be used regardless of arrow_schema.name. + * If not provided, falls back to arrow_schema.name (or empty if null) * * @return A FlatBuffer offset to the created Field object that can be used in further * FlatBuffer construction operations * * @note Dictionary encoding is not currently supported (TODO item) * @note The function checks the NULLABLE flag from the ArrowSchema flags to determine nullability + * @note The name_override parameter is useful when serializing record batches where column + * names are stored separately from the array schemas */ [[nodiscard]] ::flatbuffers::Offset - create_field(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema); + create_field(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema, std::optional name_override = std::nullopt); + + /** + * @brief Creates a FlatBuffers vector of Field objects from a record batch. + * + * This function extracts column information from a record batch and converts each column + * into a FlatBuffers Field object. It uses both the column's Arrow schema and the record + * batch's column names to create properly named fields. The resulting fields are collected + * into a FlatBuffers vector. + * + * @param builder Reference to the FlatBuffers builder used for creating the vector + * @param record_batch The record batch containing columns and their associated names + * + * @return FlatBuffers offset to a vector of Field objects, or 0 if the record batch has no columns + * + * @note The function reserves space in the children vector based on the column count + * for performance optimization + * @note Each field is created using the column name from record_batch.names() rather than + * from the Arrow schema, ensuring consistency with the record batch structure + * @note This function properly handles the case where Arrow schemas may not have names + * by using the record batch's explicit column names via the name_override parameter + */ + [[nodiscard]] ::flatbuffers::Offset< + ::flatbuffers::Vector<::flatbuffers::Offset>> + create_children(flatbuffers::FlatBufferBuilder& builder, const sparrow::record_batch& record_batch); + /** * @brief Creates a FlatBuffers vector of Field objects from an ArrowSchema's children. @@ -78,26 +108,6 @@ namespace sparrow_ipc */ [[nodiscard]] ::flatbuffers::Offset< ::flatbuffers::Vector<::flatbuffers::Offset>> - create_children(flatbuffers::FlatBufferBuilder& builder, sparrow::record_batch::column_range columns); - - /** - * @brief Creates a FlatBuffers vector of Field objects from a range of columns. - * - * This function iterates through the provided column range, extracts the Arrow schema - * from each column's proxy, and creates corresponding FlatBuffers Field objects. - * The resulting fields are collected into a vector and converted to a FlatBuffers - * vector offset. - * - * @param builder Reference to the FlatBuffers builder used for creating the vector - * @param columns Range of columns to process, each containing an Arrow schema proxy - * - * @return FlatBuffers offset to a vector of Field objects, or 0 if the input range is empty - * - * @note The function reserves space in the children vector based on the column count - * for performance optimization - */ - [[nodiscard]] ::flatbuffers::Offset< - ::flatbuffers::Vector<::flatbuffers::Offset>> create_children(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema); /** diff --git a/src/deserialize.cpp b/src/deserialize.cpp index 0d13072..5779ca9 100644 --- a/src/deserialize.cpp +++ b/src/deserialize.cpp @@ -60,7 +60,7 @@ namespace sparrow_ipc const ::flatbuffers::Vector<::flatbuffers::Offset>* fb_custom_metadata = field->custom_metadata(); const std::optional>& metadata = field_metadata[field_idx++]; - const auto name = field->name()->string_view(); + const std::string name = field->name() == nullptr ? "" : field->name()->str(); const auto field_type = field->type_type(); const auto deserialize_non_owning_primitive_array_lambda = [&]() { @@ -202,7 +202,7 @@ namespace sparrow_ipc { const org::apache::arrow::flatbuf::Schema* schema = nullptr; std::vector record_batches; - std::vector field_names; + std::vector field_names; std::vector fields_nullable; std::vector field_types; std::vector>> fields_metadata; @@ -222,7 +222,13 @@ namespace sparrow_ipc for (const auto field : *(schema->fields())) { - field_names.emplace_back(field->name()->string_view()); + if(field != nullptr && field->name() != nullptr) + { + field_names.emplace_back(field->name()->str()); + } + else { + field_names.emplace_back("_unnamed_"); + } fields_nullable.push_back(field->nullable()); const ::flatbuffers::Vector<::flatbuffers::Offset>* fb_custom_metadata = field->custom_metadata(); @@ -251,8 +257,9 @@ namespace sparrow_ipc encapsulated_message, fields_metadata ); - std::vector field_names_str(field_names.cbegin(), field_names.cend()); - record_batches.emplace_back(std::move(field_names_str), std::move(arrays)); + auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed + sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays)); + record_batches.emplace_back(std::move(sp_record_batch)); } break; case org::apache::arrow::flatbuf::MessageHeader::Tensor: @@ -267,7 +274,7 @@ namespace sparrow_ipc { break; } - } while (true); + } while (!data.empty()); return record_batches; } } \ No newline at end of file diff --git a/src/flatbuffer_utils.cpp b/src/flatbuffer_utils.cpp index 91d8306..9f510b7 100644 --- a/src/flatbuffer_utils.cpp +++ b/src/flatbuffer_utils.cpp @@ -1,4 +1,5 @@ #include "sparrow_ipc/flatbuffer_utils.hpp" +#include #include "sparrow_ipc/serialize_utils.hpp" #include "sparrow_ipc/utils.hpp" @@ -425,11 +426,11 @@ namespace sparrow_ipc } ::flatbuffers::Offset - create_field(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema) + create_field(flatbuffers::FlatBufferBuilder& builder, const ArrowSchema& arrow_schema, std::optional name_override) { - flatbuffers::Offset fb_name_offset = (arrow_schema.name == nullptr) - ? 0 - : builder.CreateString(arrow_schema.name); + flatbuffers::Offset fb_name_offset = name_override.has_value() + ? builder.CreateString(name_override.value()) + : (arrow_schema.name == nullptr ? 0 : builder.CreateString(arrow_schema.name)); const auto [type_enum, type_offset] = get_flatbuffer_type(builder, arrow_schema.format); auto fb_metadata_offset = create_metadata(builder, arrow_schema); const auto children = create_children(builder, arrow_schema); @@ -465,14 +466,20 @@ namespace sparrow_ipc } ::flatbuffers::Offset<::flatbuffers::Vector<::flatbuffers::Offset>> - create_children(flatbuffers::FlatBufferBuilder& builder, sparrow::record_batch::column_range columns) + create_children(flatbuffers::FlatBufferBuilder& builder, const sparrow::record_batch& record_batch) { std::vector> children_vec; + const auto& columns = record_batch.columns(); children_vec.reserve(columns.size()); - for (const auto& column : columns) + const auto names = record_batch.names(); + for (size_t i = 0; i < columns.size(); ++i) { - const auto& arrow_schema = sparrow::detail::array_access::get_arrow_proxy(column).schema(); - flatbuffers::Offset field = create_field(builder, arrow_schema); + const auto& arrow_schema = sparrow::detail::array_access::get_arrow_proxy(columns[i]).schema(); + flatbuffers::Offset field = create_field( + builder, + arrow_schema, + names[i] + ); children_vec.emplace_back(field); } return children_vec.empty() ? 0 : builder.CreateVector(children_vec); @@ -481,7 +488,7 @@ namespace sparrow_ipc flatbuffers::FlatBufferBuilder get_schema_message_builder(const sparrow::record_batch& record_batch) { flatbuffers::FlatBufferBuilder schema_builder; - const auto fields_vec = create_children(schema_builder, record_batch.columns()); + const auto fields_vec = create_children(schema_builder, record_batch); const auto schema_offset = org::apache::arrow::flatbuf::CreateSchema( schema_builder, org::apache::arrow::flatbuf::Endianness::Little, // TODO: make configurable diff --git a/src/serializer.cpp b/src/serializer.cpp index 60b4c85..b12459c 100644 --- a/src/serializer.cpp +++ b/src/serializer.cpp @@ -1,6 +1,6 @@ #include "sparrow_ipc/serializer.hpp" -#include + #include #include "sparrow_ipc/magic_values.hpp" diff --git a/tests/test_de_serialization_with_files.cpp b/tests/test_de_serialization_with_files.cpp index 6c799d1..3bcb79c 100644 --- a/tests/test_de_serialization_with_files.cpp +++ b/tests/test_de_serialization_with_files.cpp @@ -20,11 +20,11 @@ const std::filesystem::path arrow_testing_data_dir = ARROW_TESTING_DATA_DIR; const std::filesystem::path tests_resources_files_path = arrow_testing_data_dir / "data" / "arrow-ipc-stream" - / "integration" / "1.0.0-littleendian"; + / "integration" / "cpp-21.0.0"; const std::vector files_paths_to_test = { tests_resources_files_path / "generated_primitive", - tests_resources_files_path / "generated_primitive_large_offsets", + // tests_resources_files_path / "generated_primitive_large_offsets", tests_resources_files_path / "generated_primitive_zerolength", // tests_resources_files_path / "generated_primitive_no_batches" }; @@ -63,12 +63,12 @@ void compare_record_batches( const auto& column_1 = record_batches_1[i].get_column(y); const auto& column_2 = record_batches_2[i].get_column(y); REQUIRE_EQ(column_1.size(), column_2.size()); + CHECK_EQ(record_batches_1[i].names()[y], record_batches_2[i].names()[y]); for (size_t z = 0; z < column_1.size(); z++) { const auto col_name = column_1.name().value_or("NA"); INFO("Comparing batch " << i << ", column " << y << " named :" << col_name << " , row " << z); REQUIRE_EQ(column_1.data_type(), column_2.data_type()); - CHECK_EQ(column_1.name(), column_2.name()); const auto& column_1_value = column_1[z]; const auto& column_2_value = column_2[z]; CHECK_EQ(column_1_value, column_2_value); diff --git a/tests/test_flatbuffer_utils.cpp b/tests/test_flatbuffer_utils.cpp index fd48410..b508140 100644 --- a/tests/test_flatbuffer_utils.cpp +++ b/tests/test_flatbuffer_utils.cpp @@ -103,7 +103,7 @@ namespace sparrow_ipc SUBCASE("With valid record batch") { auto record_batch = create_test_record_batch(); - auto children_offset = create_children(builder, record_batch.columns()); + auto children_offset = create_children(builder, record_batch); CHECK_NE(children_offset.o, 0); } @@ -111,7 +111,7 @@ namespace sparrow_ipc { auto empty_batch = sp::record_batch({}); - auto children_offset = create_children(builder, empty_batch.columns()); + auto children_offset = create_children(builder, empty_batch); CHECK_EQ(children_offset.o, 0); } } @@ -459,7 +459,7 @@ namespace sparrow_ipc get_flatbuffer_type(builder, "+w:16").first, org::apache::arrow::flatbuf::Type::FixedSizeList ); // FIXED_SIZED_LIST - CHECK_THROWS(get_flatbuffer_type(builder, "+w:")); // Invalid FixedSizeList format + CHECK_THROWS(static_cast(get_flatbuffer_type(builder, "+w:"))); // Invalid FixedSizeList format } SUBCASE("Struct and Map types") @@ -500,7 +500,7 @@ namespace sparrow_ipc get_flatbuffer_type(builder, "d:10,5").first, org::apache::arrow::flatbuf::Type::Decimal ); // DECIMAL (general) - CHECK_THROWS(get_flatbuffer_type(builder, "d:10")); // Invalid Decimal format + CHECK_THROWS(static_cast(get_flatbuffer_type(builder, "d:10"))); // Invalid Decimal format } SUBCASE("Fixed Width Binary type")