From aca531c9ebca09cdf13d86ca17d37ab64e46ebae Mon Sep 17 00:00:00 2001 From: meiravgri <109056284+meiravgri@users.noreply.github.com> Date: Tue, 16 Jul 2024 07:49:23 +0300 Subject: [PATCH] [MOD-7297] Replace Variable Length Array on stack with heap allocation (#505) * vaseline for BM * fix to baseline * replace stack with allocation * use unique ptr * revert format batch iterator * fix lifetime * fix * fix * fix * rearrange * use ref to allocator instead of pointer * disable flow temp (cherry picked from commit ab96a8d1a1b1bb3a2a5584ee425f46eafad8a950) --- .github/workflows/flow-temp.yml | 2 +- src/VecSim/algorithms/hnsw/hnsw.h | 6 ++--- src/VecSim/algorithms/hnsw/hnsw_serializer.h | 15 +++++++------ src/VecSim/algorithms/hnsw/hnsw_tiered.h | 7 +++--- src/VecSim/memory/vecsim_malloc.cpp | 11 ++++++++++ src/VecSim/memory/vecsim_malloc.h | 14 +++++++++++- src/VecSim/spaces/normalize/normalize_naive.h | 5 +++-- src/VecSim/vec_sim_index.h | 20 ++++++++++------- src/VecSim/vec_sim_tiered_index.h | 22 ++++++++++++------- 9 files changed, 69 insertions(+), 33 deletions(-) diff --git a/.github/workflows/flow-temp.yml b/.github/workflows/flow-temp.yml index ec5347520..e6bcc9d15 100644 --- a/.github/workflows/flow-temp.yml +++ b/.github/workflows/flow-temp.yml @@ -15,7 +15,7 @@ jobs: uses: ./.github/workflows/task-unit-test.yml with: container: ubuntu:jammy - run-valgrind: false + run-valgrind: true focal: uses: ./.github/workflows/task-unit-test.yml with: diff --git a/src/VecSim/algorithms/hnsw/hnsw.h b/src/VecSim/algorithms/hnsw/hnsw.h index a5822bf00..3498fa534 100644 --- a/src/VecSim/algorithms/hnsw/hnsw.h +++ b/src/VecSim/algorithms/hnsw/hnsw.h @@ -1820,9 +1820,9 @@ AddVectorCtx HNSWIndex::storeNewElement(labelType label, // Create the new element's graph metadata. // We must assign manually enough memory on the stack and not just declare an `ElementGraphData` // variable, since it has a flexible array member. - char tmpData[this->elementGraphDataSize]; - memset(tmpData, 0, this->elementGraphDataSize); - ElementGraphData *cur_egd = (ElementGraphData *)tmpData; + auto tmpData = this->allocator->allocate_unique(this->elementGraphDataSize); + memset(tmpData.get(), 0, this->elementGraphDataSize); + ElementGraphData *cur_egd = (ElementGraphData *)(tmpData.get()); // Allocate memory (inside `ElementGraphData` constructor) for the links in higher levels and // initialize this memory to zeros. The reason for doing it here is that we might mark this // vector as deleted BEFORE we finish its indexing. In that case, we will collect the incoming diff --git a/src/VecSim/algorithms/hnsw/hnsw_serializer.h b/src/VecSim/algorithms/hnsw/hnsw_serializer.h index b76551cbc..804e1bdcd 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_serializer.h +++ b/src/VecSim/algorithms/hnsw/hnsw_serializer.h @@ -160,15 +160,15 @@ void HNSWIndex::restoreGraph(std::ifstream &input) { unsigned int block_len = 0; readBinaryPOD(input, block_len); for (size_t j = 0; j < block_len; j++) { - char cur_vec[this->dataSize]; - input.read(cur_vec, this->dataSize); - this->vectorBlocks.back().addElement(cur_vec); + auto cur_vec = this->getAllocator()->allocate_unique(this->dataSize); + input.read(static_cast(cur_vec.get()), this->dataSize); + this->vectorBlocks.back().addElement(cur_vec.get()); } } // Get graph data blocks ElementGraphData *cur_egt; - char tmpData[this->elementGraphDataSize]; + auto tmpData = this->getAllocator()->allocate_unique(this->elementGraphDataSize); size_t toplevel = 0; for (size_t i = 0; i < num_blocks; i++) { this->graphDataBlocks.emplace_back(this->blockSize, this->elementGraphDataSize, @@ -177,19 +177,20 @@ void HNSWIndex::restoreGraph(std::ifstream &input) { readBinaryPOD(input, block_len); for (size_t j = 0; j < block_len; j++) { // Reset tmpData - memset(tmpData, 0, this->elementGraphDataSize); + memset(tmpData.get(), 0, this->elementGraphDataSize); // Read the current element top level readBinaryPOD(input, toplevel); // Allocate space and structs for the current element try { - new (tmpData) ElementGraphData(toplevel, this->levelDataSize, this->allocator); + new (tmpData.get()) + ElementGraphData(toplevel, this->levelDataSize, this->allocator); } catch (std::runtime_error &e) { this->log(VecSimCommonStrings::LOG_WARNING_STRING, "Error - allocating memory for new element failed due to low memory"); throw e; } // Add the current element to the current block, and update cur_egt to point to it. - this->graphDataBlocks.back().addElement(tmpData); + this->graphDataBlocks.back().addElement(tmpData.get()); cur_egt = (ElementGraphData *)this->graphDataBlocks.back().getElement(j); // Restore the current element's graph data diff --git a/src/VecSim/algorithms/hnsw/hnsw_tiered.h b/src/VecSim/algorithms/hnsw/hnsw_tiered.h index 352c99123..ffb567abf 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_tiered.h +++ b/src/VecSim/algorithms/hnsw/hnsw_tiered.h @@ -504,11 +504,12 @@ void TieredHNSWIndex::executeInsertJob(HNSWInsertJob *job) { HNSWIndex *hnsw_index = this->getHNSWIndex(); // Copy the vector blob from the flat buffer, so we can release the flat lock while we are // indexing the vector into HNSW index. - DataType blob_copy[this->frontendIndex->getDim()]; - memcpy(blob_copy, this->frontendIndex->getDataByInternalId(job->id), + auto blob_copy = this->getAllocator()->allocate_unique(this->frontendIndex->getDataSize()); + + memcpy(blob_copy.get(), this->frontendIndex->getDataByInternalId(job->id), this->frontendIndex->getDim() * sizeof(DataType)); - this->insertVectorToHNSW(hnsw_index, job->label, blob_copy); + this->insertVectorToHNSW(hnsw_index, job->label, blob_copy.get()); // Remove the vector and the insert job from the flat buffer. this->flatIndexGuard.lock(); diff --git a/src/VecSim/memory/vecsim_malloc.cpp b/src/VecSim/memory/vecsim_malloc.cpp index 9296234d6..fdf5fc3f9 100644 --- a/src/VecSim/memory/vecsim_malloc.cpp +++ b/src/VecSim/memory/vecsim_malloc.cpp @@ -99,6 +99,17 @@ void *VecSimAllocator::callocate(size_t size) { return nullptr; } +std::unique_ptr +VecSimAllocator::allocate_aligned_unique(size_t size, size_t alignment) { + void *ptr = this->allocate_aligned(size, alignment); + return {ptr, Deleter(*this)}; +} + +std::unique_ptr VecSimAllocator::allocate_unique(size_t size) { + void *ptr = this->allocate(size); + return {ptr, Deleter(*this)}; +} + void *VecSimAllocator::operator new(size_t size) { return vecsim_malloc(size); } void *VecSimAllocator::operator new[](size_t size) { return vecsim_malloc(size); } diff --git a/src/VecSim/memory/vecsim_malloc.h b/src/VecSim/memory/vecsim_malloc.h index e25cf6e6b..5923fb300 100644 --- a/src/VecSim/memory/vecsim_malloc.h +++ b/src/VecSim/memory/vecsim_malloc.h @@ -25,6 +25,8 @@ struct VecSimAllocator { static size_t allocation_header_size; static VecSimMemoryFunctions memFunctions; + // Forward declaration of the deleter for the unique_ptr. + struct Deleter; VecSimAllocator() : allocated(std::atomic_uint64_t(sizeof(VecSimAllocator))) {} public: @@ -36,6 +38,10 @@ struct VecSimAllocator { void *reallocate(void *p, size_t size); void free_allocation(void *p); + // Allocations for scope-life-time memory. + std::unique_ptr allocate_aligned_unique(size_t size, size_t alignment); + std::unique_ptr allocate_unique(size_t size); + void *operator new(size_t size); void *operator new[](size_t size); void operator delete(void *p, size_t size); @@ -55,8 +61,14 @@ struct VecSimAllocator { static size_t getAllocationOverheadSize() { return allocation_header_size; } private: - // Retrive the original requested allocation size. Required for remalloc. + // Retrieve the original requested allocation size. Required for remalloc. inline size_t getPointerAllocationSize(void *p) { return *(((size_t *)p) - 1); } + + struct Deleter { + VecSimAllocator &allocator; + explicit constexpr Deleter(VecSimAllocator &allocator) : allocator(allocator) {} + void operator()(void *ptr) const { allocator.free_allocation(ptr); } + }; }; /** diff --git a/src/VecSim/spaces/normalize/normalize_naive.h b/src/VecSim/spaces/normalize/normalize_naive.h index 239db9cf3..119c19dcf 100644 --- a/src/VecSim/spaces/normalize/normalize_naive.h +++ b/src/VecSim/spaces/normalize/normalize_naive.h @@ -9,6 +9,7 @@ #include "VecSim/types/bfloat16.h" #include "VecSim/types/float16.h" #include +#include using bfloat16 = vecsim_types::bfloat16; using float16 = vecsim_types::float16; @@ -35,7 +36,7 @@ template static inline void bfloat16_normalizeVector(void *vec, const size_t dim) { bfloat16 *input_vector = (bfloat16 *)vec; - float f32_tmp[dim]; + std::vector f32_tmp(dim); float sum = 0; @@ -55,7 +56,7 @@ static inline void bfloat16_normalizeVector(void *vec, const size_t dim) { static inline void float16_normalizeVector(void *vec, const size_t dim) { float16 *input_vector = (float16 *)vec; - float f32_tmp[dim]; + std::vector f32_tmp(dim); float sum = 0; diff --git a/src/VecSim/vec_sim_index.h b/src/VecSim/vec_sim_index.h index 2de0ae48d..59bf113cb 100644 --- a/src/VecSim/vec_sim_index.h +++ b/src/VecSim/vec_sim_index.h @@ -206,16 +206,18 @@ struct VecSimIndexAbstract : public VecSimIndexInterface { protected: virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override { - char PORTABLE_ALIGN aligned_mem[this->dataSize]; - const void *processed_blob = processBlob(blob, aligned_mem); + auto aligned_mem = + this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment); + const void *processed_blob = processBlob(blob, aligned_mem.get()); return this->addVector(processed_blob, label, auxiliaryCtx); } virtual VecSimQueryReply *topKQueryWrapper(const void *queryBlob, size_t k, VecSimQueryParams *queryParams) const override { - char PORTABLE_ALIGN aligned_mem[this->dataSize]; - const void *processed_blob = processBlob(queryBlob, aligned_mem); + auto aligned_mem = + this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment); + const void *processed_blob = processBlob(queryBlob, aligned_mem.get()); return this->topKQuery(processed_blob, k, queryParams); } @@ -223,16 +225,18 @@ struct VecSimIndexAbstract : public VecSimIndexInterface { virtual VecSimQueryReply *rangeQueryWrapper(const void *queryBlob, double radius, VecSimQueryParams *queryParams, VecSimQueryReply_Order order) const override { - char PORTABLE_ALIGN aligned_mem[this->dataSize]; - const void *processed_blob = processBlob(queryBlob, aligned_mem); + auto aligned_mem = + this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment); + const void *processed_blob = processBlob(queryBlob, aligned_mem.get()); return this->rangeQuery(processed_blob, radius, queryParams, order); } virtual VecSimBatchIterator * newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override { - char PORTABLE_ALIGN aligned_mem[this->dataSize]; - const void *processed_blob = processBlob(queryBlob, aligned_mem); + auto aligned_mem = + this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment); + const void *processed_blob = processBlob(queryBlob, aligned_mem.get()); return this->newBatchIterator(processed_blob, queryParams); } diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index 12832191d..66e6230b3 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -109,31 +109,37 @@ class VecSimTieredIndex : public VecSimIndexInterface { private: virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override { - char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()]; - const void *processed_blob = this->backendIndex->processBlob(blob, aligned_mem); + auto aligned_mem = this->getAllocator()->allocate_aligned_unique( + this->backendIndex->getDataSize(), this->backendIndex->getAlignment()); + const void *processed_blob = this->backendIndex->processBlob(blob, aligned_mem.get()); + return this->addVector(processed_blob, label, auxiliaryCtx); } virtual VecSimQueryReply *topKQueryWrapper(const void *queryBlob, size_t k, VecSimQueryParams *queryParams) const override { - char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()]; - const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem); + auto aligned_mem = this->getAllocator()->allocate_aligned_unique( + this->backendIndex->getDataSize(), this->backendIndex->getAlignment()); + const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get()); + return this->topKQuery(processed_blob, k, queryParams); } virtual VecSimQueryReply *rangeQueryWrapper(const void *queryBlob, double radius, VecSimQueryParams *queryParams, VecSimQueryReply_Order order) const override { - char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()]; - const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem); + auto aligned_mem = this->getAllocator()->allocate_aligned_unique( + this->backendIndex->getDataSize(), this->backendIndex->getAlignment()); + const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get()); return this->rangeQuery(processed_blob, radius, queryParams, order); } virtual VecSimBatchIterator * newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override { - char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()]; - const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem); + auto aligned_mem = this->getAllocator()->allocate_aligned_unique( + this->backendIndex->getDataSize(), this->backendIndex->getAlignment()); + const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get()); return this->newBatchIterator(processed_blob, queryParams); }