Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flow-temp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: ./.github/workflows/task-unit-test.yml
with:
container: ubuntu:jammy
run-valgrind: false
run-valgrind: true
focal:
uses: ./.github/workflows/task-unit-test.yml
with:
Expand Down
6 changes: 3 additions & 3 deletions src/VecSim/algorithms/hnsw/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -1820,9 +1820,9 @@ AddVectorCtx HNSWIndex<DataType, DistType>::storeNewElement(labelType label,
// Create the new element's graph metadata.
// We must assign manually enough memory on the stack and not just declare an `ElementGraphData`
// variable, since it has a flexible array member.
char tmpData[this->elementGraphDataSize];
memset(tmpData, 0, this->elementGraphDataSize);
ElementGraphData *cur_egd = (ElementGraphData *)tmpData;
auto tmpData = this->allocator->allocate_unique(this->elementGraphDataSize);
memset(tmpData.get(), 0, this->elementGraphDataSize);
ElementGraphData *cur_egd = (ElementGraphData *)(tmpData.get());
// Allocate memory (inside `ElementGraphData` constructor) for the links in higher levels and
// initialize this memory to zeros. The reason for doing it here is that we might mark this
// vector as deleted BEFORE we finish its indexing. In that case, we will collect the incoming
Expand Down
15 changes: 8 additions & 7 deletions src/VecSim/algorithms/hnsw/hnsw_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ void HNSWIndex<DataType, DistType>::restoreGraph(std::ifstream &input) {
unsigned int block_len = 0;
readBinaryPOD(input, block_len);
for (size_t j = 0; j < block_len; j++) {
char cur_vec[this->dataSize];
input.read(cur_vec, this->dataSize);
this->vectorBlocks.back().addElement(cur_vec);
auto cur_vec = this->getAllocator()->allocate_unique(this->dataSize);
input.read(static_cast<char *>(cur_vec.get()), this->dataSize);
this->vectorBlocks.back().addElement(cur_vec.get());
}
}

// Get graph data blocks
ElementGraphData *cur_egt;
char tmpData[this->elementGraphDataSize];
auto tmpData = this->getAllocator()->allocate_unique(this->elementGraphDataSize);
size_t toplevel = 0;
for (size_t i = 0; i < num_blocks; i++) {
this->graphDataBlocks.emplace_back(this->blockSize, this->elementGraphDataSize,
Expand All @@ -177,19 +177,20 @@ void HNSWIndex<DataType, DistType>::restoreGraph(std::ifstream &input) {
readBinaryPOD(input, block_len);
for (size_t j = 0; j < block_len; j++) {
// Reset tmpData
memset(tmpData, 0, this->elementGraphDataSize);
memset(tmpData.get(), 0, this->elementGraphDataSize);
// Read the current element top level
readBinaryPOD(input, toplevel);
// Allocate space and structs for the current element
try {
new (tmpData) ElementGraphData(toplevel, this->levelDataSize, this->allocator);
new (tmpData.get())
ElementGraphData(toplevel, this->levelDataSize, this->allocator);
} catch (std::runtime_error &e) {
this->log(VecSimCommonStrings::LOG_WARNING_STRING,
"Error - allocating memory for new element failed due to low memory");
throw e;
}
// Add the current element to the current block, and update cur_egt to point to it.
this->graphDataBlocks.back().addElement(tmpData);
this->graphDataBlocks.back().addElement(tmpData.get());
cur_egt = (ElementGraphData *)this->graphDataBlocks.back().getElement(j);

// Restore the current element's graph data
Expand Down
7 changes: 4 additions & 3 deletions src/VecSim/algorithms/hnsw/hnsw_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,12 @@ void TieredHNSWIndex<DataType, DistType>::executeInsertJob(HNSWInsertJob *job) {
HNSWIndex<DataType, DistType> *hnsw_index = this->getHNSWIndex();
// Copy the vector blob from the flat buffer, so we can release the flat lock while we are
// indexing the vector into HNSW index.
DataType blob_copy[this->frontendIndex->getDim()];
memcpy(blob_copy, this->frontendIndex->getDataByInternalId(job->id),
auto blob_copy = this->getAllocator()->allocate_unique(this->frontendIndex->getDataSize());

memcpy(blob_copy.get(), this->frontendIndex->getDataByInternalId(job->id),
this->frontendIndex->getDim() * sizeof(DataType));

this->insertVectorToHNSW<true>(hnsw_index, job->label, blob_copy);
this->insertVectorToHNSW<true>(hnsw_index, job->label, blob_copy.get());

// Remove the vector and the insert job from the flat buffer.
this->flatIndexGuard.lock();
Expand Down
11 changes: 11 additions & 0 deletions src/VecSim/memory/vecsim_malloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ void *VecSimAllocator::callocate(size_t size) {
return nullptr;
}

std::unique_ptr<void, VecSimAllocator::Deleter>
VecSimAllocator::allocate_aligned_unique(size_t size, size_t alignment) {
void *ptr = this->allocate_aligned(size, alignment);
return {ptr, Deleter(*this)};
}

std::unique_ptr<void, VecSimAllocator::Deleter> VecSimAllocator::allocate_unique(size_t size) {
void *ptr = this->allocate(size);
return {ptr, Deleter(*this)};
}

void *VecSimAllocator::operator new(size_t size) { return vecsim_malloc(size); }

void *VecSimAllocator::operator new[](size_t size) { return vecsim_malloc(size); }
Expand Down
14 changes: 13 additions & 1 deletion src/VecSim/memory/vecsim_malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ struct VecSimAllocator {
static size_t allocation_header_size;
static VecSimMemoryFunctions memFunctions;

// Forward declaration of the deleter for the unique_ptr.
struct Deleter;
VecSimAllocator() : allocated(std::atomic_uint64_t(sizeof(VecSimAllocator))) {}

public:
Expand All @@ -36,6 +38,10 @@ struct VecSimAllocator {
void *reallocate(void *p, size_t size);
void free_allocation(void *p);

// Allocations for scope-life-time memory.
std::unique_ptr<void, Deleter> allocate_aligned_unique(size_t size, size_t alignment);
std::unique_ptr<void, Deleter> allocate_unique(size_t size);

void *operator new(size_t size);
void *operator new[](size_t size);
void operator delete(void *p, size_t size);
Expand All @@ -55,8 +61,14 @@ struct VecSimAllocator {
static size_t getAllocationOverheadSize() { return allocation_header_size; }

private:
// Retrive the original requested allocation size. Required for remalloc.
// Retrieve the original requested allocation size. Required for remalloc.
inline size_t getPointerAllocationSize(void *p) { return *(((size_t *)p) - 1); }

struct Deleter {
VecSimAllocator &allocator;
explicit constexpr Deleter(VecSimAllocator &allocator) : allocator(allocator) {}
void operator()(void *ptr) const { allocator.free_allocation(ptr); }
};
};

/**
Expand Down
5 changes: 3 additions & 2 deletions src/VecSim/spaces/normalize/normalize_naive.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "VecSim/types/bfloat16.h"
#include "VecSim/types/float16.h"
#include <cmath>
#include <vector>

using bfloat16 = vecsim_types::bfloat16;
using float16 = vecsim_types::float16;
Expand All @@ -35,7 +36,7 @@ template <bool is_little>
static inline void bfloat16_normalizeVector(void *vec, const size_t dim) {
bfloat16 *input_vector = (bfloat16 *)vec;

float f32_tmp[dim];
std::vector<float> f32_tmp(dim);

float sum = 0;

Expand All @@ -55,7 +56,7 @@ static inline void bfloat16_normalizeVector(void *vec, const size_t dim) {
static inline void float16_normalizeVector(void *vec, const size_t dim) {
float16 *input_vector = (float16 *)vec;

float f32_tmp[dim];
std::vector<float> f32_tmp(dim);

float sum = 0;

Expand Down
20 changes: 12 additions & 8 deletions src/VecSim/vec_sim_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,33 +206,37 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {

protected:
virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(blob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(blob, aligned_mem.get());

return this->addVector(processed_blob, label, auxiliaryCtx);
}

virtual VecSimQueryReply *topKQueryWrapper(const void *queryBlob, size_t k,
VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(queryBlob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(queryBlob, aligned_mem.get());

return this->topKQuery(processed_blob, k, queryParams);
}

virtual VecSimQueryReply *rangeQueryWrapper(const void *queryBlob, double radius,
VecSimQueryParams *queryParams,
VecSimQueryReply_Order order) const override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(queryBlob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(queryBlob, aligned_mem.get());

return this->rangeQuery(processed_blob, radius, queryParams, order);
}

virtual VecSimBatchIterator *
newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(queryBlob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(queryBlob, aligned_mem.get());

return this->newBatchIterator(processed_blob, queryParams);
}
Expand Down
22 changes: 14 additions & 8 deletions src/VecSim/vec_sim_tiered_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,37 @@ class VecSimTieredIndex : public VecSimIndexInterface {

private:
virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(blob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(blob, aligned_mem.get());

return this->addVector(processed_blob, label, auxiliaryCtx);
}

virtual VecSimQueryReply *topKQueryWrapper(const void *queryBlob, size_t k,
VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get());

return this->topKQuery(processed_blob, k, queryParams);
}

virtual VecSimQueryReply *rangeQueryWrapper(const void *queryBlob, double radius,
VecSimQueryParams *queryParams,
VecSimQueryReply_Order order) const override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get());

return this->rangeQuery(processed_blob, radius, queryParams, order);
}

virtual VecSimBatchIterator *
newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get());

return this->newBatchIterator(processed_blob, queryParams);
}
Expand Down