diff --git a/CMakeLists.txt b/CMakeLists.txt index a30fa81..0947f0b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.28) -project(sparrow-ipc CXX) +project(sparrow-ipc LANGUAGES CXX) set(CMAKE_CXX_STANDARD 20 CACHE STRING "C++ Standard") set(CMAKE_CXX_STANDARD_REQUIRED ON CACHE BOOL "C++ Standard Required") @@ -15,6 +15,9 @@ include(external_dependencies) set(SPARROW_IPC_COMPILE_DEFINITIONS "" CACHE STRING "List of public compile definitions of the sparrow-ipc target") +set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) +set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) + # Linter options # ============= OPTION(ACTIVATE_LINTER "Create targets to run clang-format" OFF) @@ -26,6 +29,38 @@ if(ACTIVATE_LINTER) include(clang-tidy) endif() +# Versionning +# =========== +file(STRINGS "${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp" sparrow_ipc_version_defines + REGEX "constexpr int SPARROW_IPC_VERSION_(MAJOR|MINOR|PATCH)") + +foreach(ver ${sparrow_ipc_version_defines}) + if(ver MATCHES "constexpr int SPARROW_VERSION_(MAJOR|MINOR|PATCH) = ([0-9]+);$") + set(PROJECT_VERSION_${CMAKE_MATCH_1} "${CMAKE_MATCH_2}" CACHE INTERNAL "") + endif() +endforeach() + +set(CMAKE_PROJECT_VERSION + ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}) + +message(STATUS "Building sparrow_ipc v${CMAKE_PROJECT_VERSION}") + +# Binary version +# See the following URL for explanations about the binary versionning +# https://www.gnu.org/software/libtool/manual/html_node/Updating-version-info.html#Updating-version-info +file(STRINGS "${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp" sparrow_ipc_version_defines + REGEX "constexpr int SPARROW_IPC_BINARY_(CURRENT|REVISION|AGE)") + +foreach(ver ${sparrow_ipc_version_defines}) + if(ver MATCHES "constexpr int SPARROW_IPC_BINARY_(CURRENT|REVISION|AGE) = ([0-9]+);$") + set(SPARROW_IPC_BINARY_${CMAKE_MATCH_1} "${CMAKE_MATCH_2}" CACHE INTERNAL "") + endif() +endforeach() + +set(SPARROW_IPC_BINARY_VERSION + ${SPARROW_IPC_BINARY_CURRENT}.${SPARROW_IPC_BINARY_REVISION}.${SPARROW_IPC_BINARY_AGE}) + +message(STATUS "sparrow_ipc binary version: v${SPARROW_IPC_BINARY_VERSION}") # Build options # ============= @@ -51,16 +86,33 @@ set(SPARROW_IPC_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) set(SPARROW_IPC_HEADERS - ${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp - ${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp - ${SPARROW_IPC_INCLUDE_DIR}/serialize_primitive_array.hpp - ${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp - ${SPARROW_IPC_INCLUDE_DIR}/utils.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_array.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/metadata.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/utils.hpp ) set(SPARROW_IPC_SRC - ${SPARROW_IPC_SOURCE_DIR}/serialize.cpp - ${SPARROW_IPC_SOURCE_DIR}/serialize_null_array.cpp + ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array.cpp + ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array/private_data.cpp + ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp + ${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp + ${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp + ${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp + ${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp + ${SPARROW_IPC_SOURCE_DIR}/encapsulated_message.cpp + ${SPARROW_IPC_SOURCE_DIR}/metadata.cpp ${SPARROW_IPC_SOURCE_DIR}/utils.cpp ) @@ -117,11 +169,14 @@ add_custom_command( add_custom_target(generate_flatbuffers_headers DEPENDS ${FLATBUFFERS_GENERATED_HEADERS} + COMMENT "Ensuring FlatBuffers headers are generated" ) # Interface target for generated headers add_library(flatbuffers_interface INTERFACE) -target_include_directories(flatbuffers_interface INTERFACE ${FLATBUFFERS_GENERATED_DIR}) +target_include_directories(flatbuffers_interface INTERFACE + $ + $) add_dependencies(flatbuffers_interface generate_flatbuffers_headers) add_library(sparrow-ipc ${SPARROW_IPC_LIBRARY_TYPE} ${SPARROW_IPC_SRC} ${SPARROW_IPC_HEADERS}) @@ -141,19 +196,20 @@ else() target_compile_definitions(sparrow-ipc PRIVATE SPARROW_IPC_EXPORTS) endif() -target_include_directories(sparrow-ipc +target_include_directories(sparrow-ipc PUBLIC - ${SPARROW_IPC_INCLUDE_DIR} + $ + $ PRIVATE - ${SPARROW_IPC_SOURCE_DIR} ) + $) target_link_libraries(sparrow-ipc PUBLIC sparrow::sparrow flatbuffers::flatbuffers - PRIVATE - flatbuffers_interface) + ) +# Ensure generated headers are available when building sparrow-ipc add_dependencies(sparrow-ipc generate_flatbuffers_headers) # Tests diff --git a/cmake/external_dependencies.cmake b/cmake/external_dependencies.cmake index ad93282..7eb7e6f 100644 --- a/cmake/external_dependencies.cmake +++ b/cmake/external_dependencies.cmake @@ -46,15 +46,24 @@ function(find_package_or_fetch) endfunction() set(SPARROW_BUILD_SHARED ${SPARROW_IPC_BUILD_SHARED}) +if(${SPARROW_IPC_BUILD_TESTS}) + set(CREATE_JSON_READER_TARGET ON) +endif() find_package_or_fetch( PACKAGE_NAME sparrow GIT_REPOSITORY https://github.com/man-group/sparrow.git - TAG 1.1.0 + TAG 1.1.1 ) +unset(CREATE_JSON_READER_TARGET) if(NOT TARGET sparrow::sparrow) add_library(sparrow::sparrow ALIAS sparrow) endif() +if(${SPARROW_IPC_BUILD_TESTS}) + if(NOT TARGET sparrow::json_reader) + add_library(sparrow::json_reader ALIAS json_reader) + endif() +endif() set(FLATBUFFERS_BUILD_TESTS OFF) set(FLATBUFFERS_BUILD_SHAREDLIB ${SPARROW_IPC_BUILD_SHARED}) @@ -76,4 +85,48 @@ if(SPARROW_IPC_BUILD_TESTS) GIT_REPOSITORY https://github.com/doctest/doctest.git TAG v2.4.12 ) + + message(STATUS "📦 Fetching arrow-testing") + cmake_policy(PUSH) + cmake_policy(SET CMP0174 NEW) # Suppress warning about FetchContent_Declare GIT_REPOSITORY + # Fetch arrow-testing data (no CMake build needed) + FetchContent_Declare( + arrow-testing + GIT_REPOSITORY https://github.com/apache/arrow-testing.git + GIT_SHALLOW TRUE + # CONFIGURE_COMMAND "" + # BUILD_COMMAND "" + # INSTALL_COMMAND "" + ) + FetchContent_MakeAvailable(arrow-testing) + cmake_policy(POP) + + # Create interface library for easy access to test data + add_library(arrow-testing-data INTERFACE) + message(STATUS "Arrow testing data directory: ${arrow-testing_SOURCE_DIR}") + target_compile_definitions(arrow-testing-data INTERFACE + ARROW_TESTING_DATA_DIR="${arrow-testing_SOURCE_DIR}" + ) + message(STATUS "\t✅ Fetched arrow-testing") + + # Iterate over all the files in the arrow-testing-data source directiory. When it's a gz, extract in place. + file(GLOB_RECURSE arrow_testing_data_targz_files CONFIGURE_DEPENDS + "${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/1.0.0-littleendian/*.json.gz" + ) + foreach(file_path IN LISTS arrow_testing_data_targz_files) + cmake_path(GET file_path PARENT_PATH parent_dir) + cmake_path(GET file_path STEM filename) + set(destination_file_path "${parent_dir}/${filename}.json") + if(EXISTS "${destination_file_path}") + message(VERBOSE "File already extracted: ${destination_file_path}") + else() + message(STATUS "Extracting ${file_path}") + if(WIN32) + execute_process(COMMAND powershell -Command "$i=\"${file_path}\"; $o=\"${destination_file_path}\"; [IO.Compression.GZipStream]::new([IO.File]::OpenRead($i),[IO.Compression.CompressionMode]::Decompress).CopyTo([IO.File]::Create($o))") + else() + execute_process(COMMAND gunzip -kf "${file_path}") + endif() + endif() + endforeach() + endif() diff --git a/cmake/sparrow-ipcConfig.cmake.in b/cmake/sparrow-ipcConfig.cmake.in new file mode 100644 index 0000000..3bd4e21 --- /dev/null +++ b/cmake/sparrow-ipcConfig.cmake.in @@ -0,0 +1,27 @@ +# sparrow-ipc cmake module +# This module sets the following variables in your project:: +# +# sparrow-ipc_FOUND - true if sparrow-ipc found on the system +# sparrow-ipc_INCLUDE_DIRS - the directory containing sparrow-ipc headers +# sparrow-ipc_LIBRARY - empty + +@PACKAGE_INIT@ + +include(CMakeFindDependencyMacro) + +if("@USE_DATE_POLYFILL@") + find_dependency(date) +endif() + +if("@CREATE_JSON_READER_TARGET@") + find_dependency(nlohmann_json) +endif() + +find_dependency(sparrow) +find_dependency(FlatBuffers) + +if(NOT TARGET sparrow-ipc::sparrow-ipc) + include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@Targets.cmake") + get_target_property(@PROJECT_NAME@_INCLUDE_DIRS sparrow-ipc::sparrow-ipc INTERFACE_INCLUDE_DIRECTORIES) + get_target_property(@PROJECT_NAME@_LIBRARY sparrow-ipc::sparrow-ipc LOCATION) +endif() diff --git a/environment-dev.yml b/environment-dev.yml index 65e0f04..2e46112 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -8,7 +8,7 @@ dependencies: - cxx-compiler # Libraries dependencies - flatbuffers - - sparrow >=1.1.0 + - sparrow-devel >=1.1.2 - doctest # Documentation dependencies - doxygen diff --git a/include/serialize.hpp b/include/serialize.hpp deleted file mode 100644 index 2dbf148..0000000 --- a/include/serialize.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "sparrow.hpp" - -#include "Message_generated.h" -#include "Schema_generated.h" - -#include "config/config.hpp" - -namespace sparrow_ipc -{ - namespace details - { - SPARROW_IPC_API std::vector serialize_schema_message(const ArrowSchema& arrow_schema); - SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer); - - SPARROW_IPC_API void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata); - SPARROW_IPC_API const org::apache::arrow::flatbuf::RecordBatch* deserialize_record_batch_message(const uint8_t* buf_ptr, size_t& current_offset); - } -} diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp deleted file mode 100644 index 269184a..0000000 --- a/include/serialize_null_array.hpp +++ /dev/null @@ -1,11 +0,0 @@ -#pragma once - -#include "config/config.hpp" -#include "serialize.hpp" - -namespace sparrow_ipc -{ - // TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures - SPARROW_IPC_API std::vector serialize_null_array(sparrow::null_array& arr); - SPARROW_IPC_API sparrow::null_array deserialize_null_array(const std::vector& buffer); -} diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp deleted file mode 100644 index e3fa799..0000000 --- a/include/serialize_primitive_array.hpp +++ /dev/null @@ -1,91 +0,0 @@ -#pragma once - -#include - -#include "serialize.hpp" -#include "utils.hpp" - -namespace sparrow_ipc -{ - // TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures - template - std::vector serialize_primitive_array(sparrow::primitive_array& arr); - - template - sparrow::primitive_array deserialize_primitive_array(const std::vector& buffer); - - template - std::vector serialize_primitive_array(sparrow::primitive_array& arr) - { - // This function serializes a sparrow::primitive_array into a byte vector that is compliant - // with the Apache Arrow IPC Streaming Format. It constructs a stream containing two messages: - // 1. A Schema message: Describes the data's metadata (field name, type, nullability). - // 2. A RecordBatch message: Contains the actual array data (null count, length, and raw buffers). - // This two-part structure makes the data self-describing and readable by other Arrow-native tools. - // The implementation adheres to the specification by correctly handling: - // - Message order (Schema first, then RecordBatch). - // - The encapsulated message format (4-byte metadata length prefix). - // - 8-byte padding and alignment for the message body. - // - Correctly populating the Flatbuffer-defined metadata for both messages. - - // Get arrow structures - const auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); - const auto& arrow_arr = *arrow_arr_ptr; - const auto& arrow_schema = *arrow_schema_ptr; - - // I - Serialize the Schema message - auto final_buffer = details::serialize_schema_message(arrow_schema); - - // II - Serialize the RecordBatch message - // After the Schema, we send the RecordBatch containing the actual data - - // Calculate the size of the validity and data buffers - const int64_t validity_size = (arrow_arr.length + 7) / 8; - const int64_t data_size = arrow_arr.length * sizeof(T); - const std::vector buffers_sizes = {validity_size, data_size}; - details::serialize_record_batch_message(arrow_arr, buffers_sizes, final_buffer); - - // Return the final buffer containing the complete IPC stream - return final_buffer; - } - - template - sparrow::primitive_array deserialize_primitive_array(const std::vector& buffer) { - const uint8_t* buf_ptr = buffer.data(); - size_t current_offset = 0; - - // I - Deserialize the Schema message - std::optional name; - std::optional> metadata; - details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); - - // II - Deserialize the RecordBatch message - const uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); - const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset); - - current_offset += utils::align_to_8(batch_meta_len); - const uint8_t* body_ptr = buf_ptr + current_offset; - - // Extract metadata from the RecordBatch - const auto buffers_meta = record_batch->buffers(); - const auto nodes_meta = record_batch->nodes(); - const auto node_meta = nodes_meta->Get(0); - - // The body contains the validity bitmap and the data buffer concatenated - // We need to copy this data into memory owned by the new ArrowArray - const int64_t validity_len = buffers_meta->Get(0)->length(); - const int64_t data_len = buffers_meta->Get(1)->length(); - - uint8_t* validity_buffer_copy = new uint8_t[validity_len]; - memcpy(validity_buffer_copy, body_ptr + buffers_meta->Get(0)->offset(), validity_len); - - uint8_t* data_buffer_copy = new uint8_t[data_len]; - memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len); - - - auto data = sparrow::u8_buffer(reinterpret_cast(data_buffer_copy), node_meta->length()); - auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length()); - - return sparrow::primitive_array(std::move(data), node_meta->length(), std::move(bitmap), name, metadata); - } -} diff --git a/include/sparrow_ipc/arrow_interface/arrow_array.hpp b/include/sparrow_ipc/arrow_interface/arrow_array.hpp new file mode 100644 index 0000000..2f1f72d --- /dev/null +++ b/include/sparrow_ipc/arrow_interface/arrow_array.hpp @@ -0,0 +1,34 @@ + +#pragma once + +#include + +#include + +#include "sparrow_ipc/config/config.hpp" + +namespace sparrow_ipc +{ + [[nodiscard]] SPARROW_IPC_API ArrowArray make_non_owning_arrow_array( + int64_t length, + int64_t null_count, + int64_t offset, + std::vector&& buffers, + size_t children_count, + ArrowArray** children, + ArrowArray* dictionary + ); + + SPARROW_IPC_API void release_non_owning_arrow_array(ArrowArray* array); + + SPARROW_IPC_API void fill_non_owning_arrow_array( + ArrowArray& array, + int64_t length, + int64_t null_count, + int64_t offset, + std::vector&& buffers, + size_t children_count, + ArrowArray** children, + ArrowArray* dictionary + ); +} \ No newline at end of file diff --git a/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp new file mode 100644 index 0000000..90e633f --- /dev/null +++ b/include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + +#include "sparrow_ipc/config/config.hpp" + +namespace sparrow_ipc +{ + class non_owning_arrow_array_private_data + { + public: + + explicit constexpr non_owning_arrow_array_private_data(std::vector&& buffers_pointers) + : m_buffers_pointers(std::move(buffers_pointers)) + { + } + + [[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept; + + private: + + std::vector m_buffers_pointers; + }; +} diff --git a/include/sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp b/include/sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp new file mode 100644 index 0000000..8ef5f68 --- /dev/null +++ b/include/sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp @@ -0,0 +1,64 @@ + +#pragma once + +#include + +#include "arrow_array/private_data.hpp" +#include "arrow_schema/private_data.hpp" + +namespace sparrow_ipc +{ + /** + * Release the children and dictionnary of an `ArrowArray` or `ArrowSchema`. + * + * @tparam T `ArrowArray` or `ArrowSchema` + * @param t The `ArrowArray` or `ArrowSchema` to release. + */ + template + requires std::same_as || std::same_as + void release_common_non_owning_arrow(T& t) + { + using private_data_type = std::conditional_t< + std::same_as, + non_owning_arrow_array_private_data, + non_owning_arrow_schema_private_data>; + if (t.release == nullptr) + { + return; + } + SPARROW_ASSERT_TRUE(t.private_data != nullptr); + const auto private_data = static_cast(t.private_data); + delete private_data; + t.private_data = nullptr; + + if (t.dictionary) + { + if (t.dictionary->release) + { + t.dictionary->release(t.dictionary); + } + delete t.dictionary; + t.dictionary = nullptr; + } + + if (t.children) + { + for (int64_t i = 0; i < t.n_children; ++i) + { + T* child = t.children[i]; + if (child) + { + if (child->release) + { + child->release(child); + } + delete child; + child = nullptr; + } + } + delete[] t.children; + t.children = nullptr; + } + t.release = nullptr; + } +} diff --git a/include/sparrow_ipc/arrow_interface/arrow_schema.hpp b/include/sparrow_ipc/arrow_interface/arrow_schema.hpp new file mode 100644 index 0000000..099aa86 --- /dev/null +++ b/include/sparrow_ipc/arrow_interface/arrow_schema.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include +#include + +#include +#include +#include + +#include "sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp" +#include "sparrow_ipc/config/config.hpp" + +namespace sparrow_ipc +{ + SPARROW_IPC_API void release_non_owning_arrow_schema(ArrowSchema* schema); + + template > + void fill_non_owning_arrow_schema( + ArrowSchema& schema, + std::string_view format, + const char* name, + std::optional metadata, + std::optional> flags, + size_t children_count, + ArrowSchema** children, + ArrowSchema* dictionary + ) + { + schema.flags = 0; + if (flags.has_value()) + { + for (const auto& flag : *flags) + { + schema.flags |= static_cast(flag); + } + } + schema.n_children = static_cast(children_count); + + std::optional metadata_str = metadata.has_value() + ? std::make_optional( + sparrow::get_metadata_from_key_values(*metadata) + ) + : std::nullopt; + + schema.private_data = new non_owning_arrow_schema_private_data(format, name, std::move(metadata_str)); + + const auto private_data = static_cast(schema.private_data); + schema.format = private_data->format_ptr(); + schema.name = private_data->name_ptr(); + schema.metadata = private_data->metadata_ptr(); + schema.children = children; + schema.dictionary = dictionary; + schema.release = release_non_owning_arrow_schema; + } + + template > + [[nodiscard]] ArrowSchema make_non_owning_arrow_schema( + std::string_view format, + const char* name, + std::optional metadata, + std::optional> flags, + size_t children_count, + ArrowSchema** children, + ArrowSchema* dictionary + ) + { + ArrowSchema schema{}; + fill_non_owning_arrow_schema(schema, format, name, metadata, flags, children_count, children, dictionary); + return schema; + } +} \ No newline at end of file diff --git a/include/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp b/include/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp new file mode 100644 index 0000000..f7bc910 --- /dev/null +++ b/include/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp @@ -0,0 +1,32 @@ + +#pragma once + +#include +#include +#include + +#include "sparrow_ipc/config/config.hpp" + +namespace sparrow_ipc +{ + class non_owning_arrow_schema_private_data + { + public: + + SPARROW_IPC_API non_owning_arrow_schema_private_data( + std::string_view format, + const char* name, + std::optional metadata + ); + + [[nodiscard]] SPARROW_IPC_API const char* format_ptr() const noexcept; + [[nodiscard]] SPARROW_IPC_API const char* name_ptr() const noexcept; + [[nodiscard]] SPARROW_IPC_API const char* metadata_ptr() const noexcept; + + private: + + std::string m_format; + const char* m_name; + std::optional m_metadata; + }; +} diff --git a/include/config/config.hpp b/include/sparrow_ipc/config/config.hpp similarity index 100% rename from include/config/config.hpp rename to include/sparrow_ipc/config/config.hpp diff --git a/include/sparrow_ipc/config/sparrow_ipc_version.hpp b/include/sparrow_ipc/config/sparrow_ipc_version.hpp new file mode 100644 index 0000000..4f718d7 --- /dev/null +++ b/include/sparrow_ipc/config/sparrow_ipc_version.hpp @@ -0,0 +1,12 @@ +#pragma once + +namespace sparrow_ipc +{ + constexpr int SPARROW_IPC_VERSION_MAJOR = 0; + constexpr int SPARROW_IPC_VERSION_MINOR = 1; + constexpr int SPARROW_IPC_VERSION_PATCH = 0; + + constexpr int SPARROW_IPC_BINARY_CURRENT = 1; + constexpr int SPARROW_IPC_BINARY_REVISION = 0; + constexpr int SPARROW_IPC_BINARY_AGE = 0; +} diff --git a/include/sparrow_ipc/deserialize.hpp b/include/sparrow_ipc/deserialize.hpp new file mode 100644 index 0000000..074949f --- /dev/null +++ b/include/sparrow_ipc/deserialize.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +#include + +#include "Message_generated.h" +#include "sparrow_ipc/config/config.hpp" +#include "sparrow_ipc/encapsulated_message.hpp" + +namespace sparrow_ipc +{ + /** + * @brief Deserializes an Arrow IPC stream from binary data into a vector of record batches. + * + * This function processes an Arrow IPC stream format, extracting schema information + * and record batch data. It handles encapsulated messages sequentially, first expecting + * a Schema message followed by one or more RecordBatch messages. + * + * @param data A span of bytes containing the serialized Arrow IPC stream data + * + * @return std::vector A vector containing all deserialized record batches + * + * @throws std::runtime_error If: + * - A RecordBatch message is encountered before a Schema message + * - A RecordBatch message header is missing or invalid + * - Unsupported message types are encountered (Tensor, DictionaryBatch, SparseTensor) + * - An unknown message header type is encountered + * + * @note The function processes messages until an end-of-stream marker is detected + */ + [[nodiscard]] SPARROW_IPC_API std::vector + deserialize_stream(std::span data); +} \ No newline at end of file diff --git a/include/sparrow_ipc/deserialize_fixedsizebinary_array.hpp b/include/sparrow_ipc/deserialize_fixedsizebinary_array.hpp new file mode 100644 index 0000000..29f113c --- /dev/null +++ b/include/sparrow_ipc/deserialize_fixedsizebinary_array.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +#include "Message_generated.h" +#include "sparrow_ipc/arrow_interface/arrow_array.hpp" +#include "sparrow_ipc/arrow_interface/arrow_schema.hpp" +#include "sparrow_ipc/deserialize_utils.hpp" + +namespace sparrow_ipc +{ + [[nodiscard]] sparrow::fixed_width_binary_array deserialize_non_owning_fixedwidthbinary( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + std::string_view name, + const std::optional>& metadata, + size_t& buffer_index, + int32_t byte_width + ); +} \ No newline at end of file diff --git a/include/sparrow_ipc/deserialize_primitive_array.hpp b/include/sparrow_ipc/deserialize_primitive_array.hpp new file mode 100644 index 0000000..a1c5dad --- /dev/null +++ b/include/sparrow_ipc/deserialize_primitive_array.hpp @@ -0,0 +1,61 @@ +#pragma once + +#include +#include + +#include +#include + +#include "Message_generated.h" +#include "sparrow_ipc/arrow_interface/arrow_array.hpp" +#include "sparrow_ipc/arrow_interface/arrow_schema.hpp" +#include "sparrow_ipc/deserialize_utils.hpp" + +namespace sparrow_ipc +{ + template + [[nodiscard]] sparrow::primitive_array deserialize_non_owning_primitive_array( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + std::string_view name, + const std::optional>& metadata, + size_t& buffer_index + ) + { + const std::string_view format = data_type_to_format( + sparrow::detail::get_data_type_from_array>::get() + ); + ArrowSchema schema = make_non_owning_arrow_schema( + format, + name.data(), + metadata, + std::nullopt, + 0, + nullptr, + nullptr + ); + const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count( + record_batch, + body, + buffer_index++ + ); + const auto primitive_buffer_metadata = record_batch.buffers()->Get(buffer_index++); + if (body.size() < (primitive_buffer_metadata->offset() + primitive_buffer_metadata->length())) + { + throw std::runtime_error("Primitive buffer exceeds body size"); + } + auto primitives_ptr = const_cast(body.data() + primitive_buffer_metadata->offset()); + std::vector buffers = {bitmap_ptr, primitives_ptr}; + ArrowArray array = make_non_owning_arrow_array( + record_batch.length(), + null_count, + 0, + std::move(buffers), + 0, + nullptr, + nullptr + ); + sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; + return sparrow::primitive_array{std::move(ap)}; + } +} \ No newline at end of file diff --git a/include/sparrow_ipc/deserialize_utils.hpp b/include/sparrow_ipc/deserialize_utils.hpp new file mode 100644 index 0000000..fc1ca05 --- /dev/null +++ b/include/sparrow_ipc/deserialize_utils.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +#include +#include + +#include "Message_generated.h" +#include "Schema_generated.h" + +namespace sparrow_ipc::utils +{ + /** + * @brief Extracts bitmap pointer and null count from a RecordBatch buffer. + * + * This function retrieves a bitmap buffer from the specified index in the RecordBatch's + * buffer list and calculates the number of null values represented by the bitmap. + * + * @param record_batch The Arrow RecordBatch containing buffer metadata + * @param body The raw buffer data as a byte span + * @param index The index of the bitmap buffer in the RecordBatch's buffer list + * + * @return A pair containing: + * - First: Pointer to the bitmap data (nullptr if buffer is empty) + * - Second: Count of null values in the bitmap (0 if buffer is empty) + * + * @note If the bitmap buffer has zero length, returns {nullptr, 0} + * @note The returned pointer is a non-const cast of the original const data + */ + [[nodiscard]] std::pair get_bitmap_pointer_and_null_count( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + size_t index + ); +} \ No newline at end of file diff --git a/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp new file mode 100644 index 0000000..f6a5729 --- /dev/null +++ b/include/sparrow_ipc/deserialize_variable_size_binary_array.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include + +#include +#include + +#include "Message_generated.h" +#include "sparrow_ipc/arrow_interface/arrow_array.hpp" +#include "sparrow_ipc/arrow_interface/arrow_schema.hpp" +#include "sparrow_ipc/deserialize_utils.hpp" + +namespace sparrow_ipc +{ + template + [[nodiscard]] T deserialize_non_owning_variable_size_binary( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + std::string_view name, + const std::optional>& metadata, + size_t& buffer_index + ) + { + const std::string_view format = data_type_to_format(sparrow::detail::get_data_type_from_array::get()); + ArrowSchema schema = make_non_owning_arrow_schema( + format, + name.data(), + metadata, + std::nullopt, + 0, + nullptr, + nullptr + ); + const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count( + record_batch, + body, + buffer_index++ + ); + + const auto offset_metadata = record_batch.buffers()->Get(buffer_index++); + if ((offset_metadata->offset() + offset_metadata->length()) > body.size()) + { + throw std::runtime_error("Offset buffer exceeds body size"); + } + auto offset_ptr = const_cast(body.data() + offset_metadata->offset()); + const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); + if ((buffer_metadata->offset() + buffer_metadata->length()) > body.size()) + { + throw std::runtime_error("Data buffer exceeds body size"); + } + auto buffer_ptr = const_cast(body.data() + buffer_metadata->offset()); + std::vector buffers = {bitmap_ptr, offset_ptr, buffer_ptr}; + ArrowArray array = make_non_owning_arrow_array( + record_batch.length(), + null_count, + 0, + std::move(buffers), + 0, + nullptr, + nullptr + ); + sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; + return T{std::move(ap)}; + } +} \ No newline at end of file diff --git a/include/sparrow_ipc/encapsulated_message.hpp b/include/sparrow_ipc/encapsulated_message.hpp new file mode 100644 index 0000000..7e95339 --- /dev/null +++ b/include/sparrow_ipc/encapsulated_message.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include +#include + +#include "Message_generated.h" + +namespace sparrow_ipc +{ + class encapsulated_message + { + public: + + encapsulated_message(std::span data); + + [[nodiscard]] const org::apache::arrow::flatbuf::Message* flat_buffer_message() const; + + [[nodiscard]] size_t metadata_length() const; + + [[nodiscard]] std::variant< + const org::apache::arrow::flatbuf::Schema*, + const org::apache::arrow::flatbuf::RecordBatch*, + const org::apache::arrow::flatbuf::Tensor*, + const org::apache::arrow::flatbuf::DictionaryBatch*, + const org::apache::arrow::flatbuf::SparseTensor*> + metadata() const; + + [[nodiscard]] const ::flatbuffers::Vector<::flatbuffers::Offset>* + custom_metadata() const; + + [[nodiscard]] size_t body_length() const; + + [[nodiscard]] std::span body() const; + + [[nodiscard]] size_t total_length() const; + + [[nodiscard]] std::span as_span() const; + + private: + + std::span m_data; + }; + + [[nodiscard]] std::pair> + extract_encapsulated_message(std::span buf_ptr); +} \ No newline at end of file diff --git a/include/sparrow_ipc/magic_values.hpp b/include/sparrow_ipc/magic_values.hpp new file mode 100644 index 0000000..b08d505 --- /dev/null +++ b/include/sparrow_ipc/magic_values.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +namespace sparrow_ipc +{ + + /** + * Continuation value defined in the Arrow IPC specification: + * https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format + */ + constexpr std::array continuation = {0xFF, 0xFF, 0xFF, 0xFF}; + + /** + * End-of-stream marker defined in the Arrow IPC specification: + * https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format + */ + constexpr std::array end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00}; + + template + [[nodiscard]] bool is_continuation(const R& buf) + { + return std::ranges::equal(buf, continuation); + } + + template + [[nodiscard]] bool is_end_of_stream(const R& buf) + { + return std::ranges::equal(buf, end_of_stream); + } +} \ No newline at end of file diff --git a/include/sparrow_ipc/metadata.hpp b/include/sparrow_ipc/metadata.hpp new file mode 100644 index 0000000..83951ee --- /dev/null +++ b/include/sparrow_ipc/metadata.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include + +#include + +#include + +#include "Schema_generated.h" + +namespace sparrow_ipc +{ + /** + * @brief Converts FlatBuffers metadata to Sparrow metadata format. + * + * This function takes a FlatBuffers vector containing key-value pairs from Apache Arrow + * format and converts them into a vector of Sparrow metadata pairs. Each key-value pair + * from the FlatBuffers structure is extracted and stored as a sparrow::metadata_pair. + * + * @param metadata A FlatBuffers vector containing KeyValue pairs from Apache Arrow format + * @return std::vector A vector of Sparrow metadata pairs containing + * the converted key-value data + * + * @note The function reserves space in the output vector to match the input size for + * optimal memory allocation performance. + */ + std::vector to_sparrow_metadata( + const ::flatbuffers::Vector<::flatbuffers::Offset>& metadata + ); +} \ No newline at end of file diff --git a/include/sparrow_ipc/utils.hpp b/include/sparrow_ipc/utils.hpp new file mode 100644 index 0000000..65563a0 --- /dev/null +++ b/include/sparrow_ipc/utils.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include +#include +#include +#include + +#include "Schema_generated.h" +#include "sparrow_ipc/config/config.hpp" + +namespace sparrow_ipc::utils +{ + // Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies + SPARROW_IPC_API int64_t align_to_8(const int64_t n); + + // Creates a Flatbuffers type from a format string + // This function maps a sparrow data type to the corresponding Flatbuffers type + SPARROW_IPC_API std::pair> + get_flatbuffer_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str); +} diff --git a/include/utils.hpp b/include/utils.hpp deleted file mode 100644 index 60eae81..0000000 --- a/include/utils.hpp +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "Schema_generated.h" - -#include "config/config.hpp" - -namespace sparrow_ipc -{ - namespace utils - { - // Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies - SPARROW_IPC_API int64_t align_to_8(const int64_t n); - - // Creates a Flatbuffers type from a format string - // This function maps a sparrow data type to the corresponding Flatbuffers type - SPARROW_IPC_API std::pair> - get_flatbuffer_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str); - } -} diff --git a/src/arrow_interface/arrow_array.cpp b/src/arrow_interface/arrow_array.cpp new file mode 100644 index 0000000..ed0a0f2 --- /dev/null +++ b/src/arrow_interface/arrow_array.cpp @@ -0,0 +1,73 @@ +#include "sparrow_ipc/arrow_interface/arrow_array.hpp" + +#include + +#include +#include + +#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp" +#include "sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp" + +namespace sparrow_ipc +{ + void release_non_owning_arrow_array(ArrowArray* array) + { + SPARROW_ASSERT_FALSE(array == nullptr) + SPARROW_ASSERT_TRUE(array->release == std::addressof(release_non_owning_arrow_array)) + + release_common_non_owning_arrow(*array); + array->buffers = nullptr; // The buffers were deleted with the private data + } + + void fill_non_owning_arrow_array( + ArrowArray& array, + int64_t length, + int64_t null_count, + int64_t offset, + std::vector&& buffers, + size_t children_count, + ArrowArray** children, + ArrowArray* dictionary + ) + { + SPARROW_ASSERT_TRUE(length >= 0); + SPARROW_ASSERT_TRUE(null_count >= -1); + SPARROW_ASSERT_TRUE(offset >= 0); + + array.length = length; + array.null_count = null_count; + array.offset = offset; + array.n_buffers = static_cast(buffers.size()); + array.private_data = new non_owning_arrow_array_private_data(std::move(buffers)); + const auto private_data = static_cast(array.private_data); + array.buffers = private_data->buffers_ptrs(); + array.n_children = static_cast(children_count); + array.children = children; + array.dictionary = dictionary; + array.release = release_non_owning_arrow_array; + } + + ArrowArray make_non_owning_arrow_array( + int64_t length, + int64_t null_count, + int64_t offset, + std::vector&& buffers, + size_t children_count, + ArrowArray** children, + ArrowArray* dictionary + ) + { + ArrowArray array{}; + fill_non_owning_arrow_array( + array, + length, + null_count, + offset, + std::move(buffers), + children_count, + children, + dictionary + ); + return array; + } +} diff --git a/src/arrow_interface/arrow_array/private_data.cpp b/src/arrow_interface/arrow_array/private_data.cpp new file mode 100644 index 0000000..b133c8e --- /dev/null +++ b/src/arrow_interface/arrow_array/private_data.cpp @@ -0,0 +1,9 @@ +#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp" + +namespace sparrow_ipc +{ + const void** non_owning_arrow_array_private_data::buffers_ptrs() noexcept + { + return const_cast(reinterpret_cast(m_buffers_pointers.data())); + } +} \ No newline at end of file diff --git a/src/arrow_interface/arrow_schema.cpp b/src/arrow_interface/arrow_schema.cpp new file mode 100644 index 0000000..6c3ed7d --- /dev/null +++ b/src/arrow_interface/arrow_schema.cpp @@ -0,0 +1,14 @@ +#include "sparrow_ipc/arrow_interface/arrow_schema.hpp" + +#include "sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp" + +namespace sparrow_ipc +{ + void release_non_owning_arrow_schema(ArrowSchema* schema) + { + SPARROW_ASSERT_FALSE(schema == nullptr); + SPARROW_ASSERT_TRUE(schema->release == std::addressof(release_non_owning_arrow_schema)); + release_common_non_owning_arrow(*schema); + *schema = {}; + } +} \ No newline at end of file diff --git a/src/arrow_interface/arrow_schema/private_data.cpp b/src/arrow_interface/arrow_schema/private_data.cpp new file mode 100644 index 0000000..969742b --- /dev/null +++ b/src/arrow_interface/arrow_schema/private_data.cpp @@ -0,0 +1,30 @@ +#include "sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp" + +namespace sparrow_ipc +{ + non_owning_arrow_schema_private_data::non_owning_arrow_schema_private_data( + std::string_view format, + const char* name, + std::optional metadata + ) + : m_format(format) + , m_name(name) + , m_metadata(std::move(metadata)) + { + } + + const char* non_owning_arrow_schema_private_data::format_ptr() const noexcept + { + return m_format.data(); + } + + const char* non_owning_arrow_schema_private_data::name_ptr() const noexcept + { + return m_name; + } + + const char* non_owning_arrow_schema_private_data::metadata_ptr() const noexcept + { + return m_metadata.has_value() ? m_metadata->c_str() : nullptr; + } +} \ No newline at end of file diff --git a/src/deserialize.cpp b/src/deserialize.cpp new file mode 100644 index 0000000..0d13072 --- /dev/null +++ b/src/deserialize.cpp @@ -0,0 +1,273 @@ +#include "sparrow_ipc/deserialize.hpp" + +#include + +#include "sparrow_ipc/deserialize_fixedsizebinary_array.hpp" +#include "sparrow_ipc/deserialize_primitive_array.hpp" +#include "sparrow_ipc/deserialize_variable_size_binary_array.hpp" +#include "sparrow_ipc/magic_values.hpp" +#include "sparrow_ipc/metadata.hpp" + +namespace sparrow_ipc +{ + const org::apache::arrow::flatbuf::RecordBatch* + deserialize_record_batch_message(std::span data, size_t& current_offset) + { + current_offset += sizeof(uint32_t); + const auto batch_message = org::apache::arrow::flatbuf::GetMessage(data.data() + current_offset); + if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) + { + throw std::runtime_error("Expected RecordBatch message, but got a different type."); + } + return static_cast(batch_message->header()); + } + + /** + * @brief Deserializes arrays from an Apache Arrow RecordBatch using the provided schema. + * + * This function processes each field in the schema and deserializes the corresponding + * data from the RecordBatch into sparrow::array objects. It handles various Arrow data + * types including primitive types (bool, integers, floating point), binary data, and + * string data with their respective size variants. + * + * @param record_batch The Apache Arrow FlatBuffer RecordBatch containing the serialized data + * @param schema The Apache Arrow FlatBuffer Schema defining the structure and types of the data + * @param encapsulated_message The message containing the binary data buffers + * + * @return std::vector A vector of deserialized arrays, one for each field in the schema + * + * @throws std::runtime_error If an unsupported data type, integer bit width, or floating point precision + * is encountered + * + * The function maintains a buffer index that is incremented as it processes each field + * to correctly map data buffers to their corresponding arrays. + */ + std::vector get_arrays_from_record_batch( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + const org::apache::arrow::flatbuf::Schema& schema, + const encapsulated_message& encapsulated_message, + const std::vector>>& field_metadata + ) + { + const size_t length = static_cast(record_batch.length()); + size_t buffer_index = 0; + + std::vector arrays; + arrays.reserve(schema.fields()->size()); + size_t field_idx = 0; + for (const auto field : *(schema.fields())) + { + const ::flatbuffers::Vector<::flatbuffers::Offset>* + fb_custom_metadata = field->custom_metadata(); + const std::optional>& metadata = field_metadata[field_idx++]; + const auto name = field->name()->string_view(); + const auto field_type = field->type_type(); + const auto deserialize_non_owning_primitive_array_lambda = [&]() + { + return deserialize_non_owning_primitive_array( + record_batch, + encapsulated_message.body(), + name, + metadata, + buffer_index + ); + }; + switch (field_type) + { + case org::apache::arrow::flatbuf::Type::Bool: + arrays.emplace_back( + deserialize_non_owning_primitive_array_lambda.template operator()() + ); + break; + case org::apache::arrow::flatbuf::Type::Int: + { + const auto int_type = field->type_as_Int(); + const auto bit_width = int_type->bitWidth(); + const bool is_signed = int_type->is_signed(); + + if (is_signed) + { + switch (bit_width) + { + // clang-format off + case 8: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + case 16: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + case 32: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + case 64: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + default: throw std::runtime_error("Unsupported integer bit width."); + // clang-format on + } + } + else + { + switch (bit_width) + { + // clang-format off + case 8: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + case 16: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + case 32: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + case 64: arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); break; + default: throw std::runtime_error("Unsupported integer bit width."); + // clang-format on + } + } + } + break; + case org::apache::arrow::flatbuf::Type::FloatingPoint: + { + const auto float_type = field->type_as_FloatingPoint(); + switch (float_type->precision()) + { + // clang-format off + case org::apache::arrow::flatbuf::Precision::HALF: + arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); + break; + case org::apache::arrow::flatbuf::Precision::SINGLE: + arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); + break; + case org::apache::arrow::flatbuf::Precision::DOUBLE: + arrays.emplace_back(deserialize_non_owning_primitive_array_lambda.template operator()()); + break; + default: + throw std::runtime_error("Unsupported floating point precision."); + // clang-format on + } + break; + } + case org::apache::arrow::flatbuf::Type::FixedSizeBinary: + { + const auto fixed_size_binary_field = field->type_as_FixedSizeBinary(); + arrays.emplace_back(deserialize_non_owning_fixedwidthbinary( + record_batch, + encapsulated_message.body(), + name, + metadata, + buffer_index, + fixed_size_binary_field->byteWidth() + )); + break; + } + case org::apache::arrow::flatbuf::Type::Binary: + arrays.emplace_back( + deserialize_non_owning_variable_size_binary( + record_batch, + encapsulated_message.body(), + name, + metadata, + buffer_index + ) + ); + break; + case org::apache::arrow::flatbuf::Type::LargeBinary: + arrays.emplace_back( + deserialize_non_owning_variable_size_binary( + record_batch, + encapsulated_message.body(), + name, + metadata, + buffer_index + ) + ); + break; + case org::apache::arrow::flatbuf::Type::Utf8: + arrays.emplace_back( + deserialize_non_owning_variable_size_binary( + record_batch, + encapsulated_message.body(), + name, + metadata, + buffer_index + ) + ); + break; + case org::apache::arrow::flatbuf::Type::LargeUtf8: + arrays.emplace_back( + deserialize_non_owning_variable_size_binary( + record_batch, + encapsulated_message.body(), + name, + metadata, + buffer_index + ) + ); + break; + default: + throw std::runtime_error("Unsupported type."); + } + } + return arrays; + } + + std::vector deserialize_stream(std::span data) + { + const org::apache::arrow::flatbuf::Schema* schema = nullptr; + std::vector record_batches; + std::vector field_names; + std::vector fields_nullable; + std::vector field_types; + std::vector>> fields_metadata; + do + { + const auto [encapsulated_message, rest] = extract_encapsulated_message(data); + const org::apache::arrow::flatbuf::Message* message = encapsulated_message.flat_buffer_message(); + switch (message->header_type()) + { + case org::apache::arrow::flatbuf::MessageHeader::Schema: + { + schema = message->header_as_Schema(); + const size_t size = static_cast(schema->fields()->size()); + field_names.reserve(size); + fields_nullable.reserve(size); + fields_metadata.reserve(size); + + for (const auto field : *(schema->fields())) + { + field_names.emplace_back(field->name()->string_view()); + fields_nullable.push_back(field->nullable()); + const ::flatbuffers::Vector<::flatbuffers::Offset>* + fb_custom_metadata = field->custom_metadata(); + std::optional> + metadata = fb_custom_metadata == nullptr + ? std::nullopt + : std::make_optional(to_sparrow_metadata(*fb_custom_metadata)); + fields_metadata.push_back(std::move(metadata)); + } + } + break; + case org::apache::arrow::flatbuf::MessageHeader::RecordBatch: + { + if (schema == nullptr) + { + throw std::runtime_error("Schema message is missing."); + } + const auto record_batch = message->header_as_RecordBatch(); + if (record_batch == nullptr) + { + throw std::runtime_error("RecordBatch message is missing."); + } + std::vector arrays = get_arrays_from_record_batch( + *record_batch, + *schema, + encapsulated_message, + fields_metadata + ); + std::vector field_names_str(field_names.cbegin(), field_names.cend()); + record_batches.emplace_back(std::move(field_names_str), std::move(arrays)); + } + break; + case org::apache::arrow::flatbuf::MessageHeader::Tensor: + case org::apache::arrow::flatbuf::MessageHeader::DictionaryBatch: + case org::apache::arrow::flatbuf::MessageHeader::SparseTensor: + throw std::runtime_error("Not supported"); + default: + throw std::runtime_error("Unknown message header type."); + } + data = rest; + if (is_end_of_stream(data.subspan(0, 8))) + { + break; + } + } while (true); + return record_batches; + } +} \ No newline at end of file diff --git a/src/deserialize_fixedsizebinary_array.cpp b/src/deserialize_fixedsizebinary_array.cpp new file mode 100644 index 0000000..63ea213 --- /dev/null +++ b/src/deserialize_fixedsizebinary_array.cpp @@ -0,0 +1,48 @@ +#include "sparrow_ipc/deserialize_fixedsizebinary_array.hpp" + +namespace sparrow_ipc +{ + sparrow::fixed_width_binary_array deserialize_non_owning_fixedwidthbinary( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + std::string_view name, + const std::optional>& metadata, + size_t& buffer_index, + int32_t byte_width + ) + { + const std::string format = "w:" + std::to_string(byte_width); + ArrowSchema schema = make_non_owning_arrow_schema( + format, + name.data(), + metadata, + std::nullopt, + 0, + nullptr, + nullptr + ); + const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count( + record_batch, + body, + buffer_index++ + ); + const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++); + if ((body.size() < (buffer_metadata->offset() + buffer_metadata->length()))) + { + throw std::runtime_error("Data buffer exceeds body size"); + } + auto buffer_ptr = const_cast(body.data() + buffer_metadata->offset()); + std::vector buffers = {bitmap_ptr, buffer_ptr}; + ArrowArray array = make_non_owning_arrow_array( + record_batch.length(), + null_count, + 0, + std::move(buffers), + 0, + nullptr, + nullptr + ); + sparrow::arrow_proxy ap{std::move(array), std::move(schema)}; + return sparrow::fixed_width_binary_array{std::move(ap)}; + } +} diff --git a/src/deserialize_utils.cpp b/src/deserialize_utils.cpp new file mode 100644 index 0000000..d89be6c --- /dev/null +++ b/src/deserialize_utils.cpp @@ -0,0 +1,27 @@ +#include "sparrow_ipc/deserialize_utils.hpp" + +namespace sparrow_ipc::utils +{ + std::pair get_bitmap_pointer_and_null_count( + const org::apache::arrow::flatbuf::RecordBatch& record_batch, + std::span body, + size_t index + ) + { + const auto bitmap_metadata = record_batch.buffers()->Get(index); + if (bitmap_metadata->length() == 0) + { + return {nullptr, 0}; + } + if (body.size() < (bitmap_metadata->offset() + bitmap_metadata->length())) + { + throw std::runtime_error("Bitmap buffer exceeds body size"); + } + auto ptr = const_cast(body.data() + bitmap_metadata->offset()); + const sparrow::dynamic_bitset_view bitmap_view{ + ptr, + static_cast(record_batch.length()) + }; + return {ptr, bitmap_view.null_count()}; + } +} \ No newline at end of file diff --git a/src/encapsulated_message.cpp b/src/encapsulated_message.cpp new file mode 100644 index 0000000..128b7fc --- /dev/null +++ b/src/encapsulated_message.cpp @@ -0,0 +1,115 @@ +#include "sparrow_ipc/encapsulated_message.hpp" + +#include + +#include "sparrow_ipc/magic_values.hpp" +#include "sparrow_ipc/utils.hpp" + +namespace sparrow_ipc +{ + encapsulated_message::encapsulated_message(std::span data) + : m_data(data) + { + } + + const org::apache::arrow::flatbuf::Message* encapsulated_message::flat_buffer_message() const + { + const uint8_t* message_ptr = m_data.data() + (sizeof(uint32_t) * 2); // 4 bytes continuation + 4 + // bytes metadata size + return org::apache::arrow::flatbuf::GetMessage(message_ptr); + } + + size_t encapsulated_message::metadata_length() const + { + return *(reinterpret_cast(m_data.data() + sizeof(uint32_t))); + } + + [[nodiscard]] std::variant< + const org::apache::arrow::flatbuf::Schema*, + const org::apache::arrow::flatbuf::RecordBatch*, + const org::apache::arrow::flatbuf::Tensor*, + const org::apache::arrow::flatbuf::DictionaryBatch*, + const org::apache::arrow::flatbuf::SparseTensor*> + encapsulated_message::metadata() const + { + const auto schema_message = flat_buffer_message(); + switch (schema_message->header_type()) + { + case org::apache::arrow::flatbuf::MessageHeader::Schema: + { + return schema_message->header_as_Schema(); + } + case org::apache::arrow::flatbuf::MessageHeader::RecordBatch: + { + return schema_message->header_as_RecordBatch(); + } + case org::apache::arrow::flatbuf::MessageHeader::Tensor: + { + return schema_message->header_as_Tensor(); + } + case org::apache::arrow::flatbuf::MessageHeader::DictionaryBatch: + { + return schema_message->header_as_DictionaryBatch(); + } + case org::apache::arrow::flatbuf::MessageHeader::SparseTensor: + { + return schema_message->header_as_SparseTensor(); + } + default: + throw std::runtime_error("Unknown message header type."); + } + } + + const ::flatbuffers::Vector<::flatbuffers::Offset>* + encapsulated_message::custom_metadata() const + { + return flat_buffer_message()->custom_metadata(); + } + + size_t encapsulated_message::body_length() const + { + return static_cast(flat_buffer_message()->bodyLength()); + } + + std::span encapsulated_message::body() const + { + const size_t offset = sizeof(uint32_t) * 2 // 4 bytes continuation + 4 bytes metadata size + + metadata_length(); + const size_t padded_offset = utils::align_to_8(offset); // Round up to 8-byte boundary + if (m_data.size() < padded_offset + body_length()) + { + throw std::runtime_error("Data size is smaller than expected from metadata."); + } + return m_data.subspan(padded_offset, body_length()); + } + + size_t encapsulated_message::total_length() const + { + const size_t offset = sizeof(uint32_t) * 2 // 4 bytes continuation + 4 bytes metadata size + + metadata_length(); + const size_t padded_offset = utils::align_to_8(offset); // Round up to 8-byte boundary + return padded_offset + body_length(); + } + + std::span encapsulated_message::as_span() const + { + return m_data; + } + + std::pair> + extract_encapsulated_message(std::span data) + { + if (!data.size() || data.size() < 8) + { + throw std::invalid_argument("Buffer is too small to contain a valid message."); + } + const std::span continuation_span = data.subspan(0, 4); + if (!is_continuation(continuation_span)) + { + throw std::runtime_error("Buffer starts with continuation bytes, expected a valid message."); + } + encapsulated_message message(data); + std::span rest = data.subspan(message.total_length()); + return {std::move(message), std::move(rest)}; + } +} \ No newline at end of file diff --git a/src/metadata.cpp b/src/metadata.cpp new file mode 100644 index 0000000..3638f76 --- /dev/null +++ b/src/metadata.cpp @@ -0,0 +1,23 @@ +#include "sparrow_ipc/metadata.hpp" + +#include + +namespace sparrow_ipc +{ + std::vector to_sparrow_metadata( + const ::flatbuffers::Vector<::flatbuffers::Offset>& metadata + ) + { + std::vector sparrow_metadata; + sparrow_metadata.reserve(metadata.size()); + std::ranges::transform( + metadata, + std::back_inserter(sparrow_metadata), + [](const auto& kv) + { + return sparrow::metadata_pair{kv->key()->str(), kv->value()->str()}; + } + ); + return sparrow_metadata; + } +} \ No newline at end of file diff --git a/src/serialize.cpp b/src/serialize.cpp deleted file mode 100644 index 0c76678..0000000 --- a/src/serialize.cpp +++ /dev/null @@ -1,208 +0,0 @@ -#include -#include - -#include "serialize.hpp" -#include "utils.hpp" - -namespace sparrow_ipc -{ - namespace details - { - std::vector serialize_schema_message(const ArrowSchema& arrow_schema) - { - // Create a new builder for the Schema message's metadata - flatbuffers::FlatBufferBuilder schema_builder; - - flatbuffers::Offset fb_name_offset = 0; - if (arrow_schema.name) - { - fb_name_offset = schema_builder.CreateString(arrow_schema.name); - } - - // Determine the Flatbuffer type information from the C schema's format string - const auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format); - - // Handle metadata - flatbuffers::Offset>> - fb_metadata_offset = 0; - - if (arrow_schema.metadata) - { - const auto metadata_view = sparrow::key_value_view(arrow_schema.metadata); - std::vector> kv_offsets; - kv_offsets.reserve(metadata_view.size()); - for (const auto& [key, value] : metadata_view) - { - const auto key_offset = schema_builder.CreateString(std::string(key)); - const auto value_offset = schema_builder.CreateString(std::string(value)); - kv_offsets.push_back( - org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); - } - fb_metadata_offset = schema_builder.CreateVector(kv_offsets); - } - - // Build the Field object - const auto fb_field = org::apache::arrow::flatbuf::CreateField( - schema_builder, - fb_name_offset, - (arrow_schema.flags & static_cast(sparrow::ArrowFlag::NULLABLE)) != 0, - type_enum, - type_offset, - 0, // dictionary - 0, // children - fb_metadata_offset); - - // A Schema contains a vector of fields - const std::vector> fields_vec = {fb_field}; - const auto fb_fields = schema_builder.CreateVector(fields_vec); - - // Build the Schema object from the vector of fields - const auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); - - // Wrap the Schema in a top-level Message, which is the standard IPC envelope - const auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( - schema_builder, - org::apache::arrow::flatbuf::MetadataVersion::V5, - org::apache::arrow::flatbuf::MessageHeader::Schema, - schema_offset.Union(), - 0 - ); - schema_builder.Finish(schema_message_offset); - - // Assemble the Schema message bytes - const uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata - // This will be the final buffer holding the complete IPC stream. - std::vector final_buffer; - final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message - // Copy the metadata into the buffer, after the 4-byte length prefix - memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); - // Write the 4-byte metadata length at the beginning of the message - *(reinterpret_cast(final_buffer.data())) = schema_len; - return final_buffer; - } - - void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer) - { - // Create a new builder for the RecordBatch message's metadata - flatbuffers::FlatBufferBuilder batch_builder; - - std::vector buffers_vec; - int64_t current_offset = 0; - int64_t body_len = 0; // The total size of the message body - for (const auto& size : buffers_sizes) - { - buffers_vec.emplace_back(current_offset, size); - current_offset += size; - } - body_len = current_offset; - - // Create the FieldNode, which describes the layout of the array data - const org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); - // A RecordBatch contains a vector of nodes and a vector of buffers - const auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); - const auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec); - - // Build the RecordBatch metadata object - const auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); - - // Wrap the RecordBatch in a top-level Message - const auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( - batch_builder, - org::apache::arrow::flatbuf::MetadataVersion::V5, - org::apache::arrow::flatbuf::MessageHeader::RecordBatch, - record_batch_offset.Union(), - body_len - ); - batch_builder.Finish(batch_message_offset); - - // Append the RecordBatch message to the final buffer - const uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata - const int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length - - const size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message) - // Resize the buffer to append the new message - final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len); - uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start - - // Write the 4-byte metadata length for the RecordBatch message - *(reinterpret_cast(dst)) = batch_meta_len; - dst += sizeof(uint32_t); - // Copy the RecordBatch metadata into the buffer - memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len); - // Add padding to align the body to an 8-byte boundary - memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len); - - dst += aligned_batch_meta_len; - // Copy the actual data buffers (the message body) into the buffer - for (size_t i = 0; i < buffers_sizes.size(); ++i) - { - // arrow_arr.buffers[0] is the validity bitmap - // arrow_arr.buffers[1] is the actual data buffer - const uint8_t* data_buffer = reinterpret_cast(arrow_arr.buffers[i]); - if (data_buffer) - { - memcpy(dst, data_buffer, buffers_sizes[i]); - } - else - { - // If validity_bitmap is null, it means there are no nulls - if (i == 0) - { - memset(dst, 0xFF, buffers_sizes[i]); - } - } - dst += buffers_sizes[i]; - } - } - - void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata) - { - const uint32_t schema_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); - current_offset += sizeof(uint32_t); - const auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); - if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) - { - throw std::runtime_error("Expected Schema message at the start of the buffer."); - } - const auto flatbuffer_schema = static_cast(schema_message->header()); - const auto fields = flatbuffer_schema->fields(); - if (fields->size() != 1) - { - throw std::runtime_error("Expected schema with exactly one field."); - } - - const auto field = fields->Get(0); - - // Get name - if (const auto fb_name = field->name()) - { - name = fb_name->str(); - } - - // Handle metadata - const auto fb_metadata = field->custom_metadata(); - if (fb_metadata && !fb_metadata->empty()) - { - metadata = std::vector(); - metadata->reserve(fb_metadata->size()); - for (const auto& kv : *fb_metadata) - { - metadata->emplace_back(kv->key()->str(), kv->value()->str()); - } - } - current_offset += schema_meta_len; - } - - const org::apache::arrow::flatbuf::RecordBatch* deserialize_record_batch_message(const uint8_t* buf_ptr, size_t& current_offset) - { - current_offset += sizeof(uint32_t); - const auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); - if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) - { - throw std::runtime_error("Expected RecordBatch message, but got a different type."); - } - return static_cast(batch_message->header()); - } - - } // namespace details -} // namespace sparrow-ipc diff --git a/src/serialize_null_array.cpp b/src/serialize_null_array.cpp deleted file mode 100644 index 230a9db..0000000 --- a/src/serialize_null_array.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "serialize_null_array.hpp" - -namespace sparrow_ipc -{ - // A null_array is represented by metadata only (Schema, RecordBatch) and has no data buffers, - // making its message body zero-length. - std::vector serialize_null_array(sparrow::null_array& arr) - { - const auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); - const auto& arrow_arr = *arrow_arr_ptr; - const auto& arrow_schema = *arrow_schema_ptr; - - // I - Serialize the Schema message - auto final_buffer = details::serialize_schema_message(arrow_schema); - - // II - Serialize the RecordBatch message - details::serialize_record_batch_message(arrow_arr, {}, final_buffer); - - // Return the final buffer containing the complete IPC stream - return final_buffer; - } - - // This reads the Schema and RecordBatch messages to extract the array's length, - // name, and metadata, then constructs a null_array. - sparrow::null_array deserialize_null_array(const std::vector& buffer) - { - const uint8_t* buf_ptr = buffer.data(); - size_t current_offset = 0; - - // I - Deserialize the Schema message - std::optional name; - std::optional> metadata; - details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); - - // II - Deserialize the RecordBatch message - const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset); - - // The body is empty, so we don't need to read any further. - // Construct the null_array from the deserialized metadata. - return sparrow::null_array(record_batch->length(), name, metadata); - } -} diff --git a/src/utils.cpp b/src/utils.cpp index a538700..3d7b5e7 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -1,11 +1,11 @@ +#include "sparrow_ipc/utils.hpp" + #include #include #include #include "sparrow.hpp" -#include "utils.hpp" - namespace sparrow_ipc { namespace @@ -24,7 +24,11 @@ namespace sparrow_ipc std::string_view substr_str(format_str.data() + sep_pos + 1, format_str.size() - sep_pos - 1); int32_t substr_size = 0; - const auto [ptr, ec] = std::from_chars(substr_str.data(), substr_str.data() + substr_str.size(), substr_size); + const auto [ptr, ec] = std::from_chars( + substr_str.data(), + substr_str.data() + substr_str.size(), + substr_size + ); if (ec != std::errc() || ptr != substr_str.data() + substr_str.size()) { @@ -35,23 +39,37 @@ namespace sparrow_ipc // Creates a Flatbuffers Decimal type from a format string // The format string is expected to be in the format "d:precision,scale" - std::pair> - get_flatbuffer_decimal_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str, const int32_t bitWidth) + std::pair> get_flatbuffer_decimal_type( + flatbuffers::FlatBufferBuilder& builder, + std::string_view format_str, + const int32_t bitWidth + ) { // Decimal requires precision and scale. We need to parse the format_str. // Format: "d:precision,scale" - const auto scale = parse_format(format_str, ","); + const auto scale = parse_format(format_str, ","); if (!scale.has_value()) { - throw std::runtime_error("Failed to parse Decimal " + std::to_string(bitWidth) + " scale from format string: " + std::string(format_str)); + throw std::runtime_error( + "Failed to parse Decimal " + std::to_string(bitWidth) + + " scale from format string: " + std::string(format_str) + ); } const size_t comma_pos = format_str.find(','); - const auto precision = parse_format(format_str.substr(0, comma_pos), ":"); + const auto precision = parse_format(format_str.substr(0, comma_pos), ":"); if (!precision.has_value()) { - throw std::runtime_error("Failed to parse Decimal " + std::to_string(bitWidth) + " precision from format string: " + std::string(format_str)); + throw std::runtime_error( + "Failed to parse Decimal " + std::to_string(bitWidth) + + " precision from format string: " + std::string(format_str) + ); } - const auto decimal_type = org::apache::arrow::flatbuf::CreateDecimal(builder, precision.value(), scale.value(), bitWidth); + const auto decimal_type = org::apache::arrow::flatbuf::CreateDecimal( + builder, + precision.value(), + scale.value(), + bitWidth + ); return {org::apache::arrow::flatbuf::Type::Decimal, decimal_type.Union()}; } } @@ -122,19 +140,25 @@ namespace sparrow_ipc case sparrow::data_type::HALF_FLOAT: { const auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( - builder, org::apache::arrow::flatbuf::Precision::HALF); + builder, + org::apache::arrow::flatbuf::Precision::HALF + ); return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; } case sparrow::data_type::FLOAT: { const auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( - builder, org::apache::arrow::flatbuf::Precision::SINGLE); + builder, + org::apache::arrow::flatbuf::Precision::SINGLE + ); return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; } case sparrow::data_type::DOUBLE: { const auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( - builder, org::apache::arrow::flatbuf::Precision::DOUBLE); + builder, + org::apache::arrow::flatbuf::Precision::DOUBLE + ); return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; } case sparrow::data_type::STRING: @@ -169,87 +193,142 @@ namespace sparrow_ipc } case sparrow::data_type::DATE_DAYS: { - const auto date_type = org::apache::arrow::flatbuf::CreateDate(builder, org::apache::arrow::flatbuf::DateUnit::DAY); + const auto date_type = org::apache::arrow::flatbuf::CreateDate( + builder, + org::apache::arrow::flatbuf::DateUnit::DAY + ); return {org::apache::arrow::flatbuf::Type::Date, date_type.Union()}; } case sparrow::data_type::DATE_MILLISECONDS: { - const auto date_type = org::apache::arrow::flatbuf::CreateDate(builder, org::apache::arrow::flatbuf::DateUnit::MILLISECOND); + const auto date_type = org::apache::arrow::flatbuf::CreateDate( + builder, + org::apache::arrow::flatbuf::DateUnit::MILLISECOND + ); return {org::apache::arrow::flatbuf::Type::Date, date_type.Union()}; } case sparrow::data_type::TIMESTAMP_SECONDS: { - const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp( + builder, + org::apache::arrow::flatbuf::TimeUnit::SECOND + ); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::TIMESTAMP_MILLISECONDS: { - const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp( + builder, + org::apache::arrow::flatbuf::TimeUnit::MILLISECOND + ); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::TIMESTAMP_MICROSECONDS: { - const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp( + builder, + org::apache::arrow::flatbuf::TimeUnit::MICROSECOND + ); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::TIMESTAMP_NANOSECONDS: { - const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp( + builder, + org::apache::arrow::flatbuf::TimeUnit::NANOSECOND + ); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::DURATION_SECONDS: { - const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration( + builder, + org::apache::arrow::flatbuf::TimeUnit::SECOND + ); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::DURATION_MILLISECONDS: { - const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration( + builder, + org::apache::arrow::flatbuf::TimeUnit::MILLISECOND + ); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::DURATION_MICROSECONDS: { - const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration( + builder, + org::apache::arrow::flatbuf::TimeUnit::MICROSECOND + ); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::DURATION_NANOSECONDS: { - const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration( + builder, + org::apache::arrow::flatbuf::TimeUnit::NANOSECOND + ); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::INTERVAL_MONTHS: { - const auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::YEAR_MONTH); + const auto interval_type = org::apache::arrow::flatbuf::CreateInterval( + builder, + org::apache::arrow::flatbuf::IntervalUnit::YEAR_MONTH + ); return {org::apache::arrow::flatbuf::Type::Interval, interval_type.Union()}; } case sparrow::data_type::INTERVAL_DAYS_TIME: { - const auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::DAY_TIME); + const auto interval_type = org::apache::arrow::flatbuf::CreateInterval( + builder, + org::apache::arrow::flatbuf::IntervalUnit::DAY_TIME + ); return {org::apache::arrow::flatbuf::Type::Interval, interval_type.Union()}; } case sparrow::data_type::INTERVAL_MONTHS_DAYS_NANOSECONDS: { - const auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::MONTH_DAY_NANO); + const auto interval_type = org::apache::arrow::flatbuf::CreateInterval( + builder, + org::apache::arrow::flatbuf::IntervalUnit::MONTH_DAY_NANO + ); return {org::apache::arrow::flatbuf::Type::Interval, interval_type.Union()}; } case sparrow::data_type::TIME_SECONDS: { - const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND, 32); + const auto time_type = org::apache::arrow::flatbuf::CreateTime( + builder, + org::apache::arrow::flatbuf::TimeUnit::SECOND, + 32 + ); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::TIME_MILLISECONDS: { - const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND, 32); + const auto time_type = org::apache::arrow::flatbuf::CreateTime( + builder, + org::apache::arrow::flatbuf::TimeUnit::MILLISECOND, + 32 + ); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::TIME_MICROSECONDS: { - const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND, 64); + const auto time_type = org::apache::arrow::flatbuf::CreateTime( + builder, + org::apache::arrow::flatbuf::TimeUnit::MICROSECOND, + 64 + ); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::TIME_NANOSECONDS: { - const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND, 64); + const auto time_type = org::apache::arrow::flatbuf::CreateTime( + builder, + org::apache::arrow::flatbuf::TimeUnit::NANOSECOND, + 64 + ); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::LIST: @@ -276,13 +355,18 @@ namespace sparrow_ipc { // FixedSizeList requires listSize. We need to parse the format_str. // Format: "+w:size" - const auto list_size = parse_format(format_str, ":"); + const auto list_size = parse_format(format_str, ":"); if (!list_size.has_value()) { - throw std::runtime_error("Failed to parse FixedSizeList size from format string: " + std::string(format_str)); + throw std::runtime_error( + "Failed to parse FixedSizeList size from format string: " + std::string(format_str) + ); } - const auto fixed_size_list_type = org::apache::arrow::flatbuf::CreateFixedSizeList(builder, list_size.value()); + const auto fixed_size_list_type = org::apache::arrow::flatbuf::CreateFixedSizeList( + builder, + list_size.value() + ); return {org::apache::arrow::flatbuf::Type::FixedSizeList, fixed_size_list_type.Union()}; } case sparrow::data_type::STRUCT: @@ -292,17 +376,26 @@ namespace sparrow_ipc } case sparrow::data_type::MAP: { - const auto map_type = org::apache::arrow::flatbuf::CreateMap(builder, false); // not sorted keys + // not sorted keys + const auto map_type = org::apache::arrow::flatbuf::CreateMap(builder, false); return {org::apache::arrow::flatbuf::Type::Map, map_type.Union()}; } case sparrow::data_type::DENSE_UNION: { - const auto union_type = org::apache::arrow::flatbuf::CreateUnion(builder, org::apache::arrow::flatbuf::UnionMode::Dense, 0); + const auto union_type = org::apache::arrow::flatbuf::CreateUnion( + builder, + org::apache::arrow::flatbuf::UnionMode::Dense, + 0 + ); return {org::apache::arrow::flatbuf::Type::Union, union_type.Union()}; } case sparrow::data_type::SPARSE_UNION: { - const auto union_type = org::apache::arrow::flatbuf::CreateUnion(builder, org::apache::arrow::flatbuf::UnionMode::Sparse, 0); + const auto union_type = org::apache::arrow::flatbuf::CreateUnion( + builder, + org::apache::arrow::flatbuf::UnionMode::Sparse, + 0 + ); return {org::apache::arrow::flatbuf::Type::Union, union_type.Union()}; } case sparrow::data_type::RUN_ENCODED: @@ -330,13 +423,19 @@ namespace sparrow_ipc { // FixedSizeBinary requires byteWidth. We need to parse the format_str. // Format: "w:size" - const auto byte_width = parse_format(format_str, ":"); + const auto byte_width = parse_format(format_str, ":"); if (!byte_width.has_value()) { - throw std::runtime_error("Failed to parse FixedWidthBinary size from format string: " + std::string(format_str)); + throw std::runtime_error( + "Failed to parse FixedWidthBinary size from format string: " + + std::string(format_str) + ); } - const auto fixed_width_binary_type = org::apache::arrow::flatbuf::CreateFixedSizeBinary(builder, byte_width.value()); + const auto fixed_width_binary_type = org::apache::arrow::flatbuf::CreateFixedSizeBinary( + builder, + byte_width.value() + ); return {org::apache::arrow::flatbuf::Type::FixedSizeBinary, fixed_width_binary_type.Union()}; } default: diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e184974..b46f509 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,14 +2,13 @@ cmake_minimum_required(VERSION 3.28) set(test_target "test_sparrow_ipc_lib") -set( - SPARROW_IPC_TESTS_SRC +set(SPARROW_IPC_TESTS_SRC include/sparrow_ipc_tests_helpers.hpp - # TODO move all the files below under src? main.cpp + test_arrow_array.cpp + test_arrow_schema.cpp + test_deserialization_with_files.cpp test_utils.cpp - test_primitive_array_serialization.cpp - test_null_array_serialization.cpp ) add_executable(${test_target} ${SPARROW_IPC_TESTS_SRC}) @@ -17,6 +16,8 @@ target_link_libraries(${test_target} PRIVATE sparrow-ipc doctest::doctest + sparrow::json_reader + arrow-testing-data ) if(WIN32) @@ -28,6 +29,9 @@ if(WIN32) COMMAND ${CMAKE_COMMAND} -E copy "$" "$" + COMMAND ${CMAKE_COMMAND} -E copy + "$" + "$" COMMENT "Copying sparrow and sparrow-ipc DLLs to executable directory" ) endif() diff --git a/tests/metadata_sample.hpp b/tests/metadata_sample.hpp new file mode 100644 index 0000000..5991e6b --- /dev/null +++ b/tests/metadata_sample.hpp @@ -0,0 +1,75 @@ +// Man Group Operations Limited +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include + +#include "sparrow/utils/metadata.hpp" + +namespace sparrow_ipc +{ + static const std::string metadata_buffer = []() + { + if constexpr (std::endian::native == std::endian::big) + { + return std::string{ + 0x00, 0x00, 0x00, 0x02, // Number of keys/values + 0x00, 0x00, 0x00, 0x04, // Length of key1 + 'k', 'e', 'y', '1', // Key 1 + 0x00, 0x00, 0x00, 0x04, // Length of value1 + 'v', 'a', 'l', '1', // Value 1 + 0x00, 0x00, 0x00, 0x04, // Length of key2 + 'k', 'e', 'y', '2', // Key 2 + 0x00, 0x00, 0x00, 0x04, // Length of value2 + 'v', 'a', 'l', '2' // Value 2 + }; + } + else if constexpr (std::endian::native == std::endian::little) + { + return std::string{ + 0x02, 0x00, 0x00, 0x00, // Number of keys/values + 0x04, 0x00, 0x00, 0x00, // Length of key1 + 'k', 'e', 'y', '1', // Key 1 + 0x04, 0x00, 0x00, 0x00, // Length of value1 + 'v', 'a', 'l', '1', // Value 1 + 0x04, 0x00, 0x00, 0x00, // Length of key2 + 'k', 'e', 'y', '2', // Key 2 + 0x04, 0x00, 0x00, 0x00, // Length of value2 + 'v', 'a', 'l', '2' // Value 2 + }; + } + }(); + + static const std::vector metadata_sample = {{"key1", "val1"}, {"key2", "val2"}}; + static const std::optional> metadata_sample_opt = metadata_sample; + + inline void + test_metadata(const std::vector& metadata_1, const sparrow::key_value_view& metadata_2) + { + REQUIRE_EQ(metadata_1.size(), metadata_2.size()); + + auto it = metadata_2.cbegin(); + for (const auto& [key, value] : metadata_1) + { + CHECK_EQ(key, (*it).first); + CHECK_EQ(value, (*it).second); + ++it; + } + } +} diff --git a/tests/test_arrow_array.cpp b/tests/test_arrow_array.cpp new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_arrow_schema.cpp b/tests/test_arrow_schema.cpp new file mode 100644 index 0000000..7ef09dc --- /dev/null +++ b/tests/test_arrow_schema.cpp @@ -0,0 +1,278 @@ + +#include +#include // Needed by doctest +#include +#include + +#include + +#include "sparrow/utils/metadata.hpp" +#include "sparrow/utils/repeat_container.hpp" + +#include "doctest/doctest.h" +#include "metadata_sample.hpp" +#include "sparrow_ipc/arrow_interface/arrow_schema.hpp" +#include "sparrow_ipc/deserialize_primitive_array.hpp" + +using namespace std::string_literals; + +void compare_arrow_schema(const ArrowSchema& schema, const ArrowSchema& schema_copy) +{ + CHECK_NE(&schema, &schema_copy); + CHECK_EQ(std::string_view(schema.format), std::string_view(schema_copy.format)); + CHECK_EQ(std::string_view(schema.name), std::string_view(schema_copy.name)); + CHECK_EQ(std::string_view(schema.metadata), std::string_view(schema_copy.metadata)); + CHECK_EQ(schema.flags, schema_copy.flags); + CHECK_EQ(schema.n_children, schema_copy.n_children); + if (schema.n_children > 0) + { + REQUIRE_NE(schema.children, nullptr); + REQUIRE_NE(schema_copy.children, nullptr); + for (int64_t i = 0; i < schema.n_children; ++i) + { + CHECK_NE(schema.children[i], nullptr); + compare_arrow_schema(*schema.children[i], *schema_copy.children[i]); + } + } + else + { + CHECK_EQ(schema.children, nullptr); + CHECK_EQ(schema_copy.children, nullptr); + } + + if (schema.dictionary != nullptr) + { + REQUIRE_NE(schema_copy.dictionary, nullptr); + compare_arrow_schema(*schema.dictionary, *schema_copy.dictionary); + } + else + { + CHECK_EQ(schema_copy.dictionary, nullptr); + } +} + +void check_empty(ArrowSchema& sch) +{ + CHECK_EQ(std::strcmp(sch.format, "n"), 0); + CHECK_EQ(std::strcmp(sch.name, ""), 0); + CHECK_EQ(std::strcmp(sch.metadata, ""), 0); + CHECK_EQ(sch.flags, 0); + CHECK_EQ(sch.n_children, 0); + CHECK_EQ(sch.children, nullptr); + CHECK_EQ(sch.dictionary, nullptr); +} + +TEST_SUITE("C Data Interface") +{ + TEST_CASE("ArrowSchema") + { + SUBCASE("make_non_owning_arrow_schema") + { + ArrowSchema** children = new ArrowSchema*[2]; + children[0] = new ArrowSchema(); + children[1] = new ArrowSchema(); + + const auto children_1_ptr = children[0]; + const auto children_2_ptr = children[1]; + + auto dictionnary = new ArrowSchema(); + dictionnary->name = "dictionary"; + const std::string format = "format"; + const std::string name = "name"; + auto schema = sparrow_ipc::make_non_owning_arrow_schema( + format.data(), + name.data(), + sparrow_ipc::metadata_sample_opt, + std::unordered_set{sparrow::ArrowFlag::DICTIONARY_ORDERED}, + 2, + children, + dictionnary + ); + + const auto schema_format = std::string_view(schema.format); + const bool format_eq = schema_format == format; + CHECK(format_eq); + const auto schema_name = std::string_view(schema.name); + const bool name_eq = schema_name == name; + CHECK(name_eq); + sparrow_ipc::test_metadata(sparrow_ipc::metadata_sample, schema.metadata); + CHECK_EQ(schema.flags, 1); + CHECK_EQ(schema.n_children, 2); + REQUIRE_NE(schema.children, nullptr); + CHECK_EQ(schema.children[0], children_1_ptr); + CHECK_EQ(schema.children[1], children_2_ptr); + CHECK_EQ(schema.dictionary, dictionnary); + const bool is_release_arrow_schema = schema.release + == &sparrow_ipc::release_non_owning_arrow_schema; + CHECK(is_release_arrow_schema); + CHECK_NE(schema.private_data, nullptr); + schema.release(&schema); + } + + SUBCASE("make_non_owning_arrow_schema no children, no dictionary, no name and metadata") + { + auto schema = sparrow_ipc::make_non_owning_arrow_schema( + "format", + nullptr, + std::optional>{}, + std::unordered_set{sparrow::ArrowFlag::DICTIONARY_ORDERED}, + 0, + nullptr, + nullptr + ); + + const auto schema_format = std::string_view(schema.format); + const bool format_eq = schema_format == "format"; + CHECK(format_eq); + CHECK_EQ(schema.name, nullptr); + CHECK_EQ(schema.metadata, nullptr); + CHECK_EQ(schema.flags, 1); + CHECK_EQ(schema.n_children, 0); + CHECK_EQ(schema.children, nullptr); + CHECK_EQ(schema.dictionary, nullptr); + const bool is_release_arrow_schema = schema.release + == &sparrow_ipc::release_non_owning_arrow_schema; + CHECK(is_release_arrow_schema); + CHECK_NE(schema.private_data, nullptr); + schema.release(&schema); + } + + SUBCASE("ArrowSchema release") + { + ArrowSchema** children = new ArrowSchema*[2]; + children[0] = new ArrowSchema(); + children[1] = new ArrowSchema(); + + auto schema = sparrow_ipc::make_non_owning_arrow_schema( + "format", + "name", + sparrow_ipc::metadata_sample_opt, + std::unordered_set{sparrow::ArrowFlag::DICTIONARY_ORDERED}, + 2, + children, + new ArrowSchema() + ); + + schema.release(&schema); + + CHECK_EQ(schema.format, nullptr); + CHECK_EQ(schema.name, nullptr); + CHECK_EQ(schema.metadata, nullptr); + CHECK_EQ(schema.children, nullptr); + CHECK_EQ(schema.dictionary, nullptr); + const bool is_nullptr = schema.release == nullptr; + CHECK(is_nullptr); + CHECK_EQ(schema.private_data, nullptr); + } + + SUBCASE("ArrowSchema release no children, no dictionary, no name and metadata") + { + auto schema = sparrow_ipc::make_non_owning_arrow_schema( + "format", + nullptr, + std::optional>{}, + std::unordered_set{sparrow::ArrowFlag::DICTIONARY_ORDERED}, + 0, + nullptr, + nullptr + ); + + schema.release(&schema); + + CHECK_EQ(schema.format, nullptr); + CHECK_EQ(schema.name, nullptr); + CHECK_EQ(schema.metadata, nullptr); + CHECK_EQ(schema.children, nullptr); + CHECK_EQ(schema.dictionary, nullptr); + const bool is_nullptr = schema.release == nullptr; + CHECK(is_nullptr); + CHECK_EQ(schema.private_data, nullptr); + } + + SUBCASE("deep_copy_schema") + { + auto children = new ArrowSchema*[2]; + children[0] = new ArrowSchema(); + *children[0] = sparrow_ipc::make_non_owning_arrow_schema( + "format", + "child1", + sparrow_ipc::metadata_sample_opt, + std::unordered_set{sparrow::ArrowFlag::MAP_KEYS_SORTED}, + 0, + nullptr, + nullptr + ); + children[1] = new ArrowSchema(); + *children[1] = sparrow_ipc::make_non_owning_arrow_schema( + "format", + "child2", + sparrow_ipc::metadata_sample_opt, + std::unordered_set{sparrow::ArrowFlag::NULLABLE}, + 0, + nullptr, + nullptr + ); + + auto dictionary = new ArrowSchema(); + *dictionary = sparrow_ipc::make_non_owning_arrow_schema( + "format", + "dictionary", + sparrow_ipc::metadata_sample_opt, + std::unordered_set{sparrow::ArrowFlag::MAP_KEYS_SORTED}, + 0, + nullptr, + nullptr + ); + auto schema = sparrow_ipc::make_non_owning_arrow_schema( + "format", + "name", + sparrow_ipc::metadata_sample_opt, + std::unordered_set{sparrow::ArrowFlag::DICTIONARY_ORDERED}, + 2, + children, + dictionary + ); + + auto schema_copy = sparrow::copy_schema(schema); + + compare_arrow_schema(schema, schema_copy); + + schema_copy.release(&schema_copy); + schema.release(&schema); + } + + // SUBCASE("swap_schema") + // { + // auto schema0 = test::make_arrow_schema(true); + // auto schema0_bkup = sparrow::copy_schema(schema0); + + // auto schema1 = test::make_arrow_schema(false); + // auto schema1_bkup = sparrow::copy_schema(schema1); + + // sparrow::swap(schema0, schema1); + // compare_arrow_schema(schema0, schema1_bkup); + // compare_arrow_schema(schema1, schema0_bkup); + + // schema0.release(&schema0); + // schema1.release(&schema1); + // schema0_bkup.release(&schema0_bkup); + // schema1_bkup.release(&schema1_bkup); + // } + + // SUBCASE("move_schema") + // { + // auto src_schema = test::make_arrow_schema(true); + // auto control = sparrow::copy_schema(src_schema); + + // auto dst_schema = sparrow::move_schema(std::move(src_schema)); + // // check_empty(src_schema); + // compare_arrow_schema(dst_schema, control); + + // auto dst2_schema = sparrow::move_schema(dst_schema); + // // check_empty(dst_schema); + // compare_arrow_schema(dst2_schema, control); + // dst2_schema.release(&dst2_schema); + // control.release(&control); + // } + } +} diff --git a/tests/test_deserialization_with_files.cpp b/tests/test_deserialization_with_files.cpp new file mode 100644 index 0000000..7a0ef99 --- /dev/null +++ b/tests/test_deserialization_with_files.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include +#include + +#include + +#include + +#include "sparrow/json_reader/json_parser.hpp" + +#include "doctest/doctest.h" +#include "sparrow.hpp" +#include "sparrow_ipc/deserialize.hpp" + + +const std::filesystem::path arrow_testing_data_dir = ARROW_TESTING_DATA_DIR; + +const std::filesystem::path tests_resources_files_path = arrow_testing_data_dir / "data" / "arrow-ipc-stream" + / "integration" / "1.0.0-littleendian"; + +const std::vector files_paths_to_test = { + tests_resources_files_path / "generated_primitive", + // tests_resources_files_path / "generated_primitive_large_offsets", + tests_resources_files_path / "generated_primitive_zerolength", + tests_resources_files_path / "generated_primitive_no_batches" +}; + +size_t get_number_of_batches(const std::filesystem::path& json_path) +{ + std::ifstream json_file(json_path); + if (!json_file.is_open()) + { + throw std::runtime_error("Could not open file: " + json_path.string()); + } + const nlohmann::json data = nlohmann::json::parse(json_file); + return data["batches"].size(); +} + +nlohmann::json load_json_file(const std::filesystem::path& json_path) +{ + std::ifstream json_file(json_path); + if (!json_file.is_open()) + { + throw std::runtime_error("Could not open file: " + json_path.string()); + } + return nlohmann::json::parse(json_file); +} + +TEST_SUITE("Integration tests") +{ + TEST_CASE("Compare stream deserialization with JSON deserialization") + { + for (const auto& file_path : files_paths_to_test) + { + std::filesystem::path json_path = file_path; + json_path.replace_extension(".json"); + const std::string test_name = "Testing " + file_path.filename().string(); + SUBCASE(test_name.c_str()) + { + // Load the JSON file + auto json_data = load_json_file(json_path); + CHECK(json_data != nullptr); + + const size_t num_batches = get_number_of_batches(json_path); + + std::vector record_batches_from_json; + + for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) + { + INFO("Processing batch " << batch_idx << " of " << num_batches); + record_batches_from_json.emplace_back( + sparrow::json_reader::build_record_batch_from_json(json_data, batch_idx) + ); + } + + // Load stream file + std::filesystem::path stream_file_path = file_path; + stream_file_path.replace_extension(".stream"); + std::ifstream stream_file(stream_file_path, std::ios::in | std::ios::binary); + REQUIRE(stream_file.is_open()); + const std::vector stream_data( + (std::istreambuf_iterator(stream_file)), + (std::istreambuf_iterator()) + ); + stream_file.close(); + + // Process the stream file + const auto record_batches_from_stream = sparrow_ipc::deserialize_stream( + std::span(stream_data) + ); + + // Compare record batches + REQUIRE_EQ(record_batches_from_stream.size(), record_batches_from_json.size()); + for (size_t i = 0; i < record_batches_from_stream.size(); ++i) + { + for (size_t y = 0; y < record_batches_from_stream[i].nb_columns(); y++) + { + const auto& column_stream = record_batches_from_stream[i].get_column(y); + const auto& column_json = record_batches_from_json[i].get_column(y); + REQUIRE_EQ(column_stream.size(), column_json.size()); + for (size_t z = 0; z < column_json.size(); z++) + { + const auto col_name = column_stream.name().value_or("NA"); + INFO( + "Comparing batch " << i << ", column " << y << " named :" << col_name + << " , row " << z + ); + const auto& column_stream_value = column_stream[z]; + const auto& column_json_value = column_json[z]; + CHECK_EQ(column_stream_value, column_json_value); + } + } + } + } + } + } +} diff --git a/tests/test_null_array_serialization.cpp b/tests/test_null_array_serialization.cpp deleted file mode 100644 index d3b06f0..0000000 --- a/tests/test_null_array_serialization.cpp +++ /dev/null @@ -1,52 +0,0 @@ -#include "doctest/doctest.h" -#include "sparrow.hpp" - -#include "serialize_null_array.hpp" -#include "sparrow_ipc_tests_helpers.hpp" - -namespace sparrow_ipc -{ - namespace sp = sparrow; - - - TEST_CASE("Serialize and deserialize null_array") - { - const std::size_t size = 10; - const std::string_view name = "my_null_array"; - - const std::vector metadata_vec = {{"key1", "value1"}, {"key2", "value2"}}; - const std::optional> metadata = metadata_vec; - - sp::null_array arr(size, name, metadata); - - const auto buffer = serialize_null_array(arr); - const auto deserialized_arr = deserialize_null_array(buffer); - - CHECK_EQ(deserialized_arr.size(), arr.size()); - REQUIRE(deserialized_arr.name().has_value()); - CHECK_EQ(deserialized_arr.name().value(), arr.name().value()); - - REQUIRE(deserialized_arr.metadata().has_value()); - compare_metadata(arr, deserialized_arr); - - // Check the deserialized object is a null_array - const auto& arrow_proxy = sp::detail::array_access::get_arrow_proxy(deserialized_arr); - CHECK_EQ(arrow_proxy.format(), "n"); - CHECK_EQ(arrow_proxy.n_children(), 0); - CHECK_EQ(arrow_proxy.flags(), std::unordered_set{sp::ArrowFlag::NULLABLE}); - CHECK_EQ(arrow_proxy.name(), name); - CHECK_EQ(arrow_proxy.dictionary(), nullptr); - CHECK_EQ(arrow_proxy.buffers().size(), 0); - } - - TEST_CASE("Serialize and deserialize null_array with no name and no metadata") - { - const std::size_t size = 100; - sp::null_array arr(size); - const auto buffer = serialize_null_array(arr); - const auto deserialized_arr = deserialize_null_array(buffer); - CHECK_EQ(deserialized_arr.size(), arr.size()); - CHECK_FALSE(deserialized_arr.name().has_value()); - CHECK_FALSE(deserialized_arr.metadata().has_value()); - } -} diff --git a/tests/test_primitive_array_serialization.cpp b/tests/test_primitive_array_serialization.cpp deleted file mode 100644 index b0086f0..0000000 --- a/tests/test_primitive_array_serialization.cpp +++ /dev/null @@ -1,130 +0,0 @@ -#include -#include -#include -#include - -#include "doctest/doctest.h" -#include "sparrow.hpp" - -#include "serialize_primitive_array.hpp" -#include "sparrow_ipc_tests_helpers.hpp" - -namespace sparrow_ipc -{ - namespace sp = sparrow; - - using testing_types = std::tuple< - int, - float, - double>; - - template - void compare_bitmap(const sp::primitive_array& pa1, const sp::primitive_array& pa2) - { - const auto pa1_bitmap = pa1.bitmap(); - const auto pa2_bitmap = pa2.bitmap(); - - CHECK_EQ(pa1_bitmap.size(), pa2_bitmap.size()); - auto pa1_it = pa1_bitmap.begin(); - auto pa2_it = pa2_bitmap.begin(); - for (size_t i = 0; i < pa1_bitmap.size(); ++i) - { - CHECK_EQ(*pa1_it, *pa2_it); - ++pa1_it; - ++pa2_it; - } - } - - template - void compare_primitive_arrays(const sp::primitive_array& ar, const sp::primitive_array& deserialized_ar) - { - CHECK_EQ(ar, deserialized_ar); - compare_bitmap(ar, deserialized_ar); - compare_metadata(ar, deserialized_ar); - } - - TEST_CASE_TEMPLATE_DEFINE("Serialize and Deserialize primitive_array", T, primitive_array_types) - { - auto create_primitive_array = []() -> sp::primitive_array { - if constexpr (std::is_same_v) - { - return {10, 20, 30, 40, 50}; - } - else if constexpr (std::is_same_v) - { - return {10.5f, 20.5f, 30.5f, 40.5f, 50.5f}; - } - else if constexpr (std::is_same_v) - { - return {10.1, 20.2, 30.3, 40.4, 50.5}; - } - else - { - FAIL("Unsupported type for templated test case"); - } - }; - - sp::primitive_array ar = create_primitive_array(); - - const std::vector serialized_data = serialize_primitive_array(ar); - - CHECK(serialized_data.size() > 0); - - sp::primitive_array deserialized_ar = deserialize_primitive_array(serialized_data); - - compare_primitive_arrays(ar, deserialized_ar); - } - - TEST_CASE_TEMPLATE_APPLY(primitive_array_types, testing_types); - - TEST_CASE("Serialize and Deserialize primitive_array - int with nulls") - { - // Data buffer - const sp::u8_buffer data_buffer = {100, 200, 300, 400, 500}; - - // Validity bitmap: 100 (valid), 200 (valid), 300 (null), 400 (valid), 500 (null) - sp::validity_bitmap validity(5, true); // All valid initially - validity.set(2, false); // Set index 2 to null - validity.set(4, false); // Set index 4 to null - - sp::primitive_array ar(std::move(data_buffer), std::move(validity)); - - const std::vector serialized_data = serialize_primitive_array(ar); - - CHECK(serialized_data.size() > 0); - - sp::primitive_array deserialized_ar = deserialize_primitive_array(serialized_data); - - compare_primitive_arrays(ar, deserialized_ar); - } - - TEST_CASE("Serialize and Deserialize primitive_array - with name and metadata") - { - // Data buffer - const sp::u8_buffer data_buffer = {1, 2, 3}; - - // Validity bitmap: All valid - const sp::validity_bitmap validity(3, true); - - // Custom metadata - const std::vector metadata = { - {"key1", "value1"}, - {"key2", "value2"} - }; - - sp::primitive_array ar( - std::move(data_buffer), - std::move(validity), - "my_named_array", // name - std::make_optional(std::vector{{"key1", "value1"}, {"key2", "value2"}}) - ); - - const std::vector serialized_data = serialize_primitive_array(ar); - - CHECK(serialized_data.size() > 0); - - sp::primitive_array deserialized_ar = deserialize_primitive_array(serialized_data); - - compare_primitive_arrays(ar, deserialized_ar); - } -} diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp index f53ef0c..ab9f4a0 100644 --- a/tests/test_utils.cpp +++ b/tests/test_utils.cpp @@ -1,8 +1,8 @@ -#include "doctest/doctest.h" +#include +#include -#include "sparrow.hpp" - -#include "utils.hpp" +#include "sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp" +#include "sparrow_ipc/utils.hpp" namespace sparrow_ipc { @@ -22,118 +22,328 @@ namespace sparrow_ipc flatbuffers::FlatBufferBuilder builder; SUBCASE("Null and Boolean types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::NA)).first, org::apache::arrow::flatbuf::Type::Null); - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::BOOL)).first, org::apache::arrow::flatbuf::Type::Bool); + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::NA)).first, + org::apache::arrow::flatbuf::Type::Null + ); + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::BOOL)).first, + org::apache::arrow::flatbuf::Type::Bool + ); } SUBCASE("Integer types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT8)).first, org::apache::arrow::flatbuf::Type::Int); // INT8 - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT8)).first, org::apache::arrow::flatbuf::Type::Int); // UINT8 - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT16)).first, org::apache::arrow::flatbuf::Type::Int); // INT16 - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT16)).first, org::apache::arrow::flatbuf::Type::Int); // UINT16 - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT32)).first, org::apache::arrow::flatbuf::Type::Int); // INT32 - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT32)).first, org::apache::arrow::flatbuf::Type::Int); // UINT32 - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT64)).first, org::apache::arrow::flatbuf::Type::Int); // INT64 - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT64)).first, org::apache::arrow::flatbuf::Type::Int); // UINT64 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT8)).first, + org::apache::arrow::flatbuf::Type::Int + ); // INT8 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT8)).first, + org::apache::arrow::flatbuf::Type::Int + ); // UINT8 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT16)).first, + org::apache::arrow::flatbuf::Type::Int + ); // INT16 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT16)).first, + org::apache::arrow::flatbuf::Type::Int + ); // UINT16 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT32)).first, + org::apache::arrow::flatbuf::Type::Int + ); // INT32 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT32)).first, + org::apache::arrow::flatbuf::Type::Int + ); // UINT32 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INT64)).first, + org::apache::arrow::flatbuf::Type::Int + ); // INT64 + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::UINT64)).first, + org::apache::arrow::flatbuf::Type::Int + ); // UINT64 } SUBCASE("Floating Point types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::HALF_FLOAT)).first, org::apache::arrow::flatbuf::Type::FloatingPoint); // HALF_FLOAT - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::FLOAT)).first, org::apache::arrow::flatbuf::Type::FloatingPoint); // FLOAT - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DOUBLE)).first, org::apache::arrow::flatbuf::Type::FloatingPoint); // DOUBLE + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::HALF_FLOAT)) + .first, + org::apache::arrow::flatbuf::Type::FloatingPoint + ); // HALF_FLOAT + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::FLOAT)).first, + org::apache::arrow::flatbuf::Type::FloatingPoint + ); // FLOAT + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DOUBLE)).first, + org::apache::arrow::flatbuf::Type::FloatingPoint + ); // DOUBLE } SUBCASE("String and Binary types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::STRING)).first, org::apache::arrow::flatbuf::Type::Utf8); // STRING - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LARGE_STRING)).first, org::apache::arrow::flatbuf::Type::LargeUtf8); // LARGE_STRING - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::BINARY)).first, org::apache::arrow::flatbuf::Type::Binary); // BINARY - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LARGE_BINARY)).first, org::apache::arrow::flatbuf::Type::LargeBinary); // LARGE_BINARY - CHECK_EQ(utils::get_flatbuffer_type(builder, "vu").first, org::apache::arrow::flatbuf::Type::Utf8View); // STRING_VIEW - CHECK_EQ(utils::get_flatbuffer_type(builder, "vz").first, org::apache::arrow::flatbuf::Type::BinaryView); // BINARY_VIEW + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::STRING)).first, + org::apache::arrow::flatbuf::Type::Utf8 + ); // STRING + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LARGE_STRING)) + .first, + org::apache::arrow::flatbuf::Type::LargeUtf8 + ); // LARGE_STRING + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::BINARY)).first, + org::apache::arrow::flatbuf::Type::Binary + ); // BINARY + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LARGE_BINARY)) + .first, + org::apache::arrow::flatbuf::Type::LargeBinary + ); // LARGE_BINARY + CHECK_EQ( + utils::get_flatbuffer_type(builder, "vu").first, + org::apache::arrow::flatbuf::Type::Utf8View + ); // STRING_VIEW + CHECK_EQ( + utils::get_flatbuffer_type(builder, "vz").first, + org::apache::arrow::flatbuf::Type::BinaryView + ); // BINARY_VIEW } SUBCASE("Date types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DATE_DAYS)).first, org::apache::arrow::flatbuf::Type::Date); // DATE_DAYS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DATE_MILLISECONDS)).first, org::apache::arrow::flatbuf::Type::Date); // DATE_MILLISECONDS + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DATE_DAYS)) + .first, + org::apache::arrow::flatbuf::Type::Date + ); // DATE_DAYS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::DATE_MILLISECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Date + ); // DATE_MILLISECONDS } SUBCASE("Timestamp types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_SECONDS)).first, org::apache::arrow::flatbuf::Type::Timestamp); // TIMESTAMP_SECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_MILLISECONDS)).first, org::apache::arrow::flatbuf::Type::Timestamp); // TIMESTAMP_MILLISECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_MICROSECONDS)).first, org::apache::arrow::flatbuf::Type::Timestamp); // TIMESTAMP_MICROSECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_NANOSECONDS)).first, org::apache::arrow::flatbuf::Type::Timestamp); // TIMESTAMP_NANOSECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_SECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Timestamp + ); // TIMESTAMP_SECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_MILLISECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Timestamp + ); // TIMESTAMP_MILLISECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_MICROSECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Timestamp + ); // TIMESTAMP_MICROSECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::TIMESTAMP_NANOSECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Timestamp + ); // TIMESTAMP_NANOSECONDS } SUBCASE("Duration types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DURATION_SECONDS)).first, org::apache::arrow::flatbuf::Type::Duration); // DURATION_SECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DURATION_MILLISECONDS)).first, org::apache::arrow::flatbuf::Type::Duration); // DURATION_MILLISECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DURATION_MICROSECONDS)).first, org::apache::arrow::flatbuf::Type::Duration); // DURATION_MICROSECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::DURATION_NANOSECONDS)).first, org::apache::arrow::flatbuf::Type::Duration); // DURATION_NANOSECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::DURATION_SECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Duration + ); // DURATION_SECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::DURATION_MILLISECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Duration + ); // DURATION_MILLISECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::DURATION_MICROSECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Duration + ); // DURATION_MICROSECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::DURATION_NANOSECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Duration + ); // DURATION_NANOSECONDS } SUBCASE("Interval types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INTERVAL_MONTHS)).first, org::apache::arrow::flatbuf::Type::Interval); // INTERVAL_MONTHS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INTERVAL_DAYS_TIME)).first, org::apache::arrow::flatbuf::Type::Interval); // INTERVAL_DAYS_TIME - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INTERVAL_MONTHS_DAYS_NANOSECONDS)).first, org::apache::arrow::flatbuf::Type::Interval); // INTERVAL_MONTHS_DAYS_NANOSECONDS + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::INTERVAL_MONTHS)) + .first, + org::apache::arrow::flatbuf::Type::Interval + ); // INTERVAL_MONTHS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::INTERVAL_DAYS_TIME) + ) + .first, + org::apache::arrow::flatbuf::Type::Interval + ); // INTERVAL_DAYS_TIME + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::INTERVAL_MONTHS_DAYS_NANOSECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Interval + ); // INTERVAL_MONTHS_DAYS_NANOSECONDS } SUBCASE("Time types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIME_SECONDS)).first, org::apache::arrow::flatbuf::Type::Time); // TIME_SECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIME_MILLISECONDS)).first, org::apache::arrow::flatbuf::Type::Time); // TIME_MILLISECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIME_MICROSECONDS)).first, org::apache::arrow::flatbuf::Type::Time); // TIME_MICROSECONDS - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIME_NANOSECONDS)).first, org::apache::arrow::flatbuf::Type::Time); // TIME_NANOSECONDS + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::TIME_SECONDS)) + .first, + org::apache::arrow::flatbuf::Type::Time + ); // TIME_SECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::TIME_MILLISECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Time + ); // TIME_MILLISECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::TIME_MICROSECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Time + ); // TIME_MICROSECONDS + CHECK_EQ( + utils::get_flatbuffer_type( + builder, + sparrow::data_type_to_format(sparrow::data_type::TIME_NANOSECONDS) + ) + .first, + org::apache::arrow::flatbuf::Type::Time + ); // TIME_NANOSECONDS } SUBCASE("List types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LIST)).first, org::apache::arrow::flatbuf::Type::List); // LIST - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LARGE_LIST)).first, org::apache::arrow::flatbuf::Type::LargeList); // LARGE_LIST - CHECK_EQ(utils::get_flatbuffer_type(builder, "+vl").first, org::apache::arrow::flatbuf::Type::ListView); // LIST_VIEW - CHECK_EQ(utils::get_flatbuffer_type(builder, "+vL").first, org::apache::arrow::flatbuf::Type::LargeListView); // LARGE_LIST_VIEW - CHECK_EQ(utils::get_flatbuffer_type(builder, "+w:16").first, org::apache::arrow::flatbuf::Type::FixedSizeList); // FIXED_SIZED_LIST - CHECK_THROWS(utils::get_flatbuffer_type(builder, "+w:")); // Invalid FixedSizeList format + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LIST)).first, + org::apache::arrow::flatbuf::Type::List + ); // LIST + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::LARGE_LIST)) + .first, + org::apache::arrow::flatbuf::Type::LargeList + ); // LARGE_LIST + CHECK_EQ( + utils::get_flatbuffer_type(builder, "+vl").first, + org::apache::arrow::flatbuf::Type::ListView + ); // LIST_VIEW + CHECK_EQ( + utils::get_flatbuffer_type(builder, "+vL").first, + org::apache::arrow::flatbuf::Type::LargeListView + ); // LARGE_LIST_VIEW + CHECK_EQ( + utils::get_flatbuffer_type(builder, "+w:16").first, + org::apache::arrow::flatbuf::Type::FixedSizeList + ); // FIXED_SIZED_LIST + CHECK_THROWS(utils::get_flatbuffer_type(builder, "+w:")); // Invalid FixedSizeList format } SUBCASE("Struct and Map types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::STRUCT)).first, org::apache::arrow::flatbuf::Type::Struct_); // STRUCT - CHECK_EQ(utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::MAP)).first, org::apache::arrow::flatbuf::Type::Map); // MAP + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::STRUCT)).first, + org::apache::arrow::flatbuf::Type::Struct_ + ); // STRUCT + CHECK_EQ( + utils::get_flatbuffer_type(builder, sparrow::data_type_to_format(sparrow::data_type::MAP)).first, + org::apache::arrow::flatbuf::Type::Map + ); // MAP } SUBCASE("Union types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, "+ud:").first, org::apache::arrow::flatbuf::Type::Union); // DENSE_UNION - CHECK_EQ(utils::get_flatbuffer_type(builder, "+us:").first, org::apache::arrow::flatbuf::Type::Union); // SPARSE_UNION + CHECK_EQ( + utils::get_flatbuffer_type(builder, "+ud:").first, + org::apache::arrow::flatbuf::Type::Union + ); // DENSE_UNION + CHECK_EQ( + utils::get_flatbuffer_type(builder, "+us:").first, + org::apache::arrow::flatbuf::Type::Union + ); // SPARSE_UNION } SUBCASE("Run-End Encoded type") { - CHECK_EQ(utils::get_flatbuffer_type(builder, "+r").first, org::apache::arrow::flatbuf::Type::RunEndEncoded); // RUN_ENCODED + CHECK_EQ( + utils::get_flatbuffer_type(builder, "+r").first, + org::apache::arrow::flatbuf::Type::RunEndEncoded + ); // RUN_ENCODED } SUBCASE("Decimal types") { - CHECK_EQ(utils::get_flatbuffer_type(builder, "d:10,5").first, org::apache::arrow::flatbuf::Type::Decimal); // DECIMAL (general) - CHECK_THROWS(utils::get_flatbuffer_type(builder, "d:10")); // Invalid Decimal format + CHECK_EQ( + utils::get_flatbuffer_type(builder, "d:10,5").first, + org::apache::arrow::flatbuf::Type::Decimal + ); // DECIMAL (general) + CHECK_THROWS(utils::get_flatbuffer_type(builder, "d:10")); // Invalid Decimal format } SUBCASE("Fixed Width Binary type") { - CHECK_EQ(utils::get_flatbuffer_type(builder, "w:32").first, org::apache::arrow::flatbuf::Type::FixedSizeBinary); // FIXED_WIDTH_BINARY - CHECK_THROWS(utils::get_flatbuffer_type(builder, "w:")); // Invalid FixedSizeBinary format + CHECK_EQ( + utils::get_flatbuffer_type(builder, "w:32").first, + org::apache::arrow::flatbuf::Type::FixedSizeBinary + ); // FIXED_WIDTH_BINARY + CHECK_THROWS(utils::get_flatbuffer_type(builder, "w:")); // Invalid FixedSizeBinary format } SUBCASE("Unsupported type returns Null") { - CHECK_EQ(utils::get_flatbuffer_type(builder, "unsupported_format").first, org::apache::arrow::flatbuf::Type::Null); + CHECK_EQ( + utils::get_flatbuffer_type(builder, "unsupported_format").first, + org::apache::arrow::flatbuf::Type::Null + ); } } }