Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to tune DB parameters #130

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions engine/config/config.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#pragma once

#include <algorithm>
#include <atomic> // Include for std::atomic
#include <cstddef>
#include <exception>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include <stdexcept> // Include for std::invalid_argument

#include "utils/json.hpp"

namespace vectordb {

struct Config {
std::atomic<int> IntraQueryThreads{4};
std::atomic<int> MasterQueueSize{500};
std::atomic<int> LocalQueueSize{500};
std::atomic<int> GlobalSyncInterval{15};
std::atomic<int> MinimalGraphSize{100};
std::atomic<int> NumExecutorPerField{16};
std::atomic<int> RebuildThreads{1};

// Setter method for IntraQueryThreads
void setIntraQueryThreads(int value) {
if (value >= 1 && value <= 128) {
IntraQueryThreads.store(value, std::memory_order_relaxed);
} else {
throw std::invalid_argument("Invalid value for IntraQueryThreads, valid range: [1, 128]");
}
}

// Setter method for SearchQueueSize (modifies both MasterQueueSize and LocalQueueSize atomically)
void setSearchQueueSize(int value) {
if (value >= 500 && value <= 10000000) {
MasterQueueSize.store(value, std::memory_order_relaxed);
LocalQueueSize.store(value, std::memory_order_relaxed);
} else {
throw std::invalid_argument("Invalid value for SearchQueueSize, valid range: [500, 10000000]");
}
}

// Setter method for NumExecutorPerField
void setNumExecutorPerField(int value) {
if (value >= 1 && value <= 128) {
NumExecutorPerField.store(value, std::memory_order_relaxed);
} else {
throw std::invalid_argument("Invalid value for NumExecutorPerField, valid range: [1, 128]");
}
}

// Setter method for RebuildThreads
void setRebuildThreads(int value) {
if (value >= 1 && value <= 128) {
RebuildThreads.store(value, std::memory_order_relaxed);
} else {
throw std::invalid_argument("Invalid value for RebuildThreads, valid range: [1, 128]");
}
}

// A setter function that takes a JSON config, and loop through the keys and values to set the corresponding fields
void updateConfig(const vectordb::Json& json, bool& needSwapExecutors) {
needSwapExecutors = false;
if (json.HasMember("IntraQueryThreads")) {
setIntraQueryThreads(json.GetInt("IntraQueryThreads"));
needSwapExecutors = true;
}
if (json.HasMember("ConcurrentWorkersPerIndex")) {
setNumExecutorPerField(json.GetInt("ConcurrentWorkersPerIndex"));
needSwapExecutors = true;
}
if (json.HasMember("RebuildThreads")) {
setRebuildThreads(json.GetInt("RebuildThreads"));
}
if (json.HasMember("SearchQueueSize")) {
setSearchQueueSize(json.GetInt("SearchQueueSize"));
needSwapExecutors = true;
}
}
};

// Global config instance
inline Config globalConfig;

} // namespace vectordb
14 changes: 14 additions & 0 deletions engine/db/db_mvp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,19 @@ Status DBMVP::Rebuild() {
return Status::OK();
}

Status DBMVP::SwapExecutors() {
// Loop through all tables and swap executors
for (int64_t i = 0; i < tables_.size(); ++i) {
std::shared_ptr<TableMVP> table = tables_[i];
if (table != nullptr) {
auto status = table->SwapExecutors();
if (!status.ok()) {
std::cout << "Swap executors for table " << table->table_schema_.name_ << " failed." << std::endl;
}
}
}
return Status::OK();
}

} // namespace engine
} // namespace vectordb
1 change: 1 addition & 0 deletions engine/db/db_mvp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DBMVP {
std::vector<std::string> GetTables();
std::shared_ptr<TableMVP> GetTable(const std::string& table_name);
Status Rebuild();
Status SwapExecutors();

void SetWALEnabled(bool enabled) {
for (auto table : tables_) {
Expand Down
15 changes: 15 additions & 0 deletions engine/db/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,21 @@ Status DBServer::Rebuild() {
return Status::OK();
}

Status DBServer::SwapExecutors() {
// Loop through all dbs and swap executors
for (int64_t i = 0; i < dbs_.size(); ++i) {
std::shared_ptr<DBMVP> db = dbs_[i];
if (db != nullptr) {
auto status = db->SwapExecutors();
if (!status.ok()) {
std::cout << "Swap executors for db of " << db->db_catalog_path_ << " failed."
<< std::endl;
}
}
}
return Status::OK();
}

Status DBServer::ListTables(const std::string& db_name, std::vector<std::string>& table_names) {
auto db = GetDB(db_name);
if (db == nullptr) {
Expand Down
2 changes: 2 additions & 0 deletions engine/db/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class DBServer {

Status Rebuild();

Status SwapExecutors();

void InjectEmbeddingService(std::string& embedding_service_url) {
embedding_service_ = std::make_shared<vectordb::engine::EmbeddingService>(embedding_service_url);
meta_->InjectEmbeddingService(embedding_service_);
Expand Down
92 changes: 80 additions & 12 deletions engine/db/table_mvp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

#include "db/catalog/meta_types.hpp"

#include "config/config.hpp"

namespace vectordb {

extern Config globalConfig;

namespace engine {

TableMVP::TableMVP(meta::TableSchema &table_schema,
Expand Down Expand Up @@ -65,7 +70,7 @@ TableMVP::TableMVP(meta::TableSchema &table_schema,
auto distFunc = GetDistFunc(fType, mType);

auto pool = std::make_shared<execution::ExecutorPool>();
for (int executorIdx = 0; executorIdx < NumExecutorPerField;
for (int executorIdx = 0; executorIdx < globalConfig.NumExecutorPerField;
executorIdx++) {
pool->release(std::make_shared<execution::VecSearchExecutor>(
table_schema_.fields_[i].vector_dimension_,
Expand All @@ -75,10 +80,10 @@ TableMVP::TableMVP(meta::TableSchema &table_schema,
columnData,
distFunc,
&table_schema_.fields_[i].vector_dimension_,
IntraQueryThreads,
MasterQueueSize,
LocalQueueSize,
GlobalSyncInterval));
globalConfig.IntraQueryThreads,
globalConfig.MasterQueueSize,
globalConfig.LocalQueueSize,
globalConfig.GlobalSyncInterval));
}
executor_pool_.push_back(pool);
}
Expand All @@ -87,7 +92,8 @@ TableMVP::TableMVP(meta::TableSchema &table_schema,

Status TableMVP::Rebuild(const std::string &db_catalog_path) {
// Limit how many threads rebuild takes.
omp_set_num_threads(RebuildThreads);
omp_set_num_threads(globalConfig.RebuildThreads);
std::cout << "Rebuild table segment with threads: " << globalConfig.RebuildThreads << std::endl;

// Get the current record number.
int64_t record_number = table_segment_->record_number_;
Expand All @@ -112,7 +118,7 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
fType == meta::FieldType::SPARSE_VECTOR_FLOAT ||
fType == meta::FieldType::SPARSE_VECTOR_DOUBLE) {
if (ann_graph_segment_[index]->record_number_ == record_number ||
record_number < MinimalGraphSize) {
record_number < globalConfig.MinimalGraphSize) {
// No need to rebuild the ann graph.
std::cout << "Skip rebuild ANN graph for attribute: "
<< table_schema_.fields_[i].name_ << std::endl;
Expand Down Expand Up @@ -171,7 +177,7 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
auto pool = std::make_shared<execution::ExecutorPool>();
auto distFunc = GetDistFunc(fType, mType);

for (int executorIdx = 0; executorIdx < NumExecutorPerField;
for (int executorIdx = 0; executorIdx < globalConfig.NumExecutorPerField;
executorIdx++) {
pool->release(std::make_shared<execution::VecSearchExecutor>(
table_schema_.fields_[i].vector_dimension_,
Expand All @@ -182,10 +188,10 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
columnData,
distFunc,
&table_schema_.fields_[i].vector_dimension_,
IntraQueryThreads,
MasterQueueSize,
LocalQueueSize,
GlobalSyncInterval));
globalConfig.IntraQueryThreads,
globalConfig.MasterQueueSize,
globalConfig.LocalQueueSize,
globalConfig.GlobalSyncInterval));
}
std::unique_lock<std::mutex> lock(executor_pool_mutex_);
executor_pool_.set(index, pool);
Expand All @@ -199,6 +205,68 @@ Status TableMVP::Rebuild(const std::string &db_catalog_path) {
return Status::OK();
}

Status TableMVP::SwapExecutors() {
// Get the current record number.
int64_t record_number = table_segment_->record_number_;

int64_t index = 0;
for (int i = 0; i < table_schema_.fields_.size(); ++i) {
auto fType = table_schema_.fields_[i].field_type_;
auto mType = table_schema_.fields_[i].metric_type_;

if (fType == meta::FieldType::VECTOR_FLOAT ||
fType == meta::FieldType::VECTOR_DOUBLE ||
fType == meta::FieldType::SPARSE_VECTOR_FLOAT ||
fType == meta::FieldType::SPARSE_VECTOR_DOUBLE) {

VectorColumnData columnData;
if (fType == meta::FieldType::VECTOR_FLOAT || fType == meta::FieldType::VECTOR_DOUBLE) {
columnData = table_segment_
->vector_tables_[table_segment_->field_name_mem_offset_map_
[table_schema_.fields_[i].name_]];
} else {
// sparse vector
columnData = &table_segment_
->var_len_attr_table_[table_segment_->field_name_mem_offset_map_
[table_schema_.fields_[i].name_]];
}

// Rebuild the ann graph.
std::cout << "Swap executors for attribute: "
<< table_schema_.fields_[i].name_ << std::endl;

// Replace the executors.
auto pool = std::make_shared<execution::ExecutorPool>();
auto distFunc = GetDistFunc(fType, mType);

for (int executorIdx = 0; executorIdx < globalConfig.NumExecutorPerField;
executorIdx++) {
pool->release(std::make_shared<execution::VecSearchExecutor>(
table_schema_.fields_[i].vector_dimension_,
ann_graph_segment_[index]->navigation_point_,
ann_graph_segment_[index],
ann_graph_segment_[index]->offset_table_,
ann_graph_segment_[index]->neighbor_list_,
columnData,
distFunc,
&table_schema_.fields_[i].vector_dimension_,
globalConfig.IntraQueryThreads,
globalConfig.MasterQueueSize,
globalConfig.LocalQueueSize,
globalConfig.GlobalSyncInterval));
}
std::unique_lock<std::mutex> lock(executor_pool_mutex_);
executor_pool_.set(index, pool);
lock.unlock();

++index;
}
}

std::cout << "Swap executors done." << std::endl;
return Status::OK();
}

Status TableMVP::Insert(vectordb::Json &record, std::unordered_map<std::string, std::string> &headers, bool upsert) {
int64_t wal_id =
wal_->WriteEntry(upsert ? LogEntryType::UPSERT : LogEntryType::INSERT, record.DumpToString());
Expand Down
12 changes: 3 additions & 9 deletions engine/db/table_mvp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@
namespace vectordb {
namespace engine {

constexpr const int IntraQueryThreads = 4;
constexpr const int MasterQueueSize = 500;
constexpr const int LocalQueueSize = 500;
constexpr const int GlobalSyncInterval = 15;
constexpr const int MinimalGraphSize = 100;
constexpr const int NumExecutorPerField = 16;

constexpr const int RebuildThreads = 4;

class TableMVP {
public:
explicit TableMVP(
Expand All @@ -47,6 +38,9 @@ class TableMVP {
// Rebuild the table and ann graph, and save to disk.
Status Rebuild(const std::string &db_catalog_path);

// Swap executors during config change.
Status SwapExecutors();

Status Insert(vectordb::Json &records, std::unordered_map<std::string, std::string> &headers, bool upsert = false);

Status InsertPrepare(vectordb::Json &pks, vectordb::Json &result);
Expand Down
34 changes: 34 additions & 0 deletions engine/server/web_server/web_controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
#include "utils/json.hpp"
#include "utils/status.hpp"
#include "utils/constants.hpp"
#include "config/config.hpp"

#define WEB_LOG_PREFIX "[Web] "

namespace vectordb {

extern Config globalConfig;

namespace server {
namespace web {

Expand Down Expand Up @@ -895,6 +899,36 @@ class WebController : public oatpp::web::server::api::ApiController {
return createDtoResponse(Status::CODE_200, dto);
}

ADD_CORS(UpdateConfig)

ENDPOINT("POST", "api/config", UpdateConfig, BODY_STRING(String, body)) {
vectordb::Json parsedBody;
auto dto = StatusDto::createShared();
auto valid = parsedBody.LoadFromString(body);
if (!valid) {
dto->statusCode = Status::CODE_400.code;
dto->message = "Invalid payload.";
return createDtoResponse(Status::CODE_400, dto);
}

try {
bool needSwapExecutors = false;
globalConfig.updateConfig(parsedBody, needSwapExecutors);
if (needSwapExecutors) {
// Swap executors if necessary.
db_server->SwapExecutors();
}
} catch (std::exception& ex) {
dto->statusCode = Status::CODE_500.code;
dto->message = std::string(ex.what());
return createDtoResponse(Status::CODE_500, dto);
}

dto->statusCode = Status::CODE_200.code;
dto->message = std::string("Config updated successfully.");
return createDtoResponse(Status::CODE_200, dto);
}

/**
* Finish ENDPOINTs generation ('ApiController' codegen)
*/
Expand Down