diff --git a/eloq_data_store_service/build_eloq_store.cmake b/eloq_data_store_service/build_eloq_store.cmake index 90ec9bc..883f5bf 100644 --- a/eloq_data_store_service/build_eloq_store.cmake +++ b/eloq_data_store_service/build_eloq_store.cmake @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS ON) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) -SET(ELOQ_STORE_SOURCE_DIR ${ELOQSTORE_PARENT_DIR}/eloq_store) +SET(ELOQ_STORE_SOURCE_DIR ${ELOQSTORE_PARENT_DIR}/eloqstore) option(ELOQ_MODULE_ENABLED "Enable EloqModule" OFF) message("ELOQ_MODULE_ENABLED: " ${ELOQ_MODULE_ENABLED}) @@ -55,8 +55,8 @@ endif() set(INI_SOURCES ${ELOQ_STORE_SOURCE_DIR}/inih/ini.c ${ELOQ_STORE_SOURCE_DIR}/inih/cpp/INIReader.cpp) -option(WITH_ASAN "build with ASAN" OFF) if(WITH_ASAN) + message("build eloqstore with ASAN: ${WITH_ASAN}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fno-omit-frame-pointer") endif() diff --git a/eloq_data_store_service/eloq_store_data_store.cpp b/eloq_data_store_service/eloq_store_data_store.cpp index b17524e..863b337 100644 --- a/eloq_data_store_service/eloq_store_data_store.cpp +++ b/eloq_data_store_service/eloq_store_data_store.cpp @@ -25,15 +25,15 @@ namespace EloqDS { -thread_local ObjectPool> +thread_local ObjectPool> eloq_store_read_op_pool_; -thread_local ObjectPool> +thread_local ObjectPool> eloq_store_batch_write_op_pool_; -thread_local ObjectPool> +thread_local ObjectPool> eloq_store_scan_req_op_pool_; -thread_local ObjectPool> +thread_local ObjectPool> eloq_store_truncate_op_pool_; -thread_local ObjectPool> +thread_local ObjectPool> eloq_store_floor_op_pool_; thread_local ObjectPool eloq_store_scan_del_op_pool_; @@ -65,27 +65,27 @@ inline void BuildValue(const WriteRecordsRequest &write_req, EloqStoreDataStore::EloqStoreDataStore(uint32_t shard_id, DataStoreService *data_store_service, - const ::kvstore::KvOptions &configs) + const ::eloqstore::KvOptions &configs) : DataStore(shard_id, data_store_service), eloq_store_service_(configs) { } void EloqStoreDataStore::Read(ReadRequest *read_req) { - ::kvstore::TableIdent eloq_store_table_id; + ::eloqstore::TableIdent eloq_store_table_id; eloq_store_table_id.tbl_name_ = read_req->GetTableName(); eloq_store_table_id.partition_id_ = read_req->GetPartitionId(); std::string_view key = read_req->GetKey(); // Read from eloqstore async - EloqStoreOperationData<::kvstore::ReadRequest> *read_op = + EloqStoreOperationData<::eloqstore::ReadRequest> *read_op = eloq_store_read_op_pool_.NextObject(); read_op->Reset(read_req); PoolableGuard op_guard(read_op); - ::kvstore::ReadRequest &kv_read_req = read_op->EloqStoreRequest(); + ::eloqstore::ReadRequest &kv_read_req = read_op->EloqStoreRequest(); kv_read_req.SetArgs(eloq_store_table_id, key); uint64_t user_data = reinterpret_cast(read_op); @@ -100,31 +100,31 @@ void EloqStoreDataStore::Read(ReadRequest *read_req) op_guard.Release(); } -void EloqStoreDataStore::OnRead(::kvstore::KvRequest *req) +void EloqStoreDataStore::OnRead(::eloqstore::KvRequest *req) { - EloqStoreOperationData<::kvstore::ReadRequest> *read_op = - static_cast *>( + EloqStoreOperationData<::eloqstore::ReadRequest> *read_op = + static_cast *>( reinterpret_cast(req->UserData())); assert(req == &read_op->EloqStoreRequest()); - ::kvstore::ReadRequest *read_req = - static_cast<::kvstore::ReadRequest *>(req); + ::eloqstore::ReadRequest *read_req = + static_cast<::eloqstore::ReadRequest *>(req); PoolableGuard op_guard(read_op); ReadRequest *ds_read_req = static_cast(read_op->DataStoreRequest()); - if (read_req->Error() != ::kvstore::KvError::NoError) + if (read_req->Error() != ::eloqstore::KvError::NoError) { - LOG_IF(ERROR, read_req->Error() != ::kvstore::KvError::NotFound) + LOG_IF(ERROR, read_req->Error() != ::eloqstore::KvError::NotFound) << "Read from EloqStore failed with error code: " << static_cast(read_req->Error()) << ", error message: " << read_req->ErrMessage() << ". Table: " << req->TableId(); remote::DataStoreError error_code = - read_req->Error() == ::kvstore::KvError::NotFound + read_req->Error() == ::eloqstore::KvError::NotFound ? remote::DataStoreError::KEY_NOT_FOUND : remote::DataStoreError::READ_FAILED; ds_read_req->SetFinish(error_code); @@ -138,20 +138,20 @@ void EloqStoreDataStore::OnRead(::kvstore::KvRequest *req) void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req) { - ::kvstore::TableIdent eloq_store_table_id; + ::eloqstore::TableIdent eloq_store_table_id; eloq_store_table_id.tbl_name_ = write_req->GetTableName(); eloq_store_table_id.partition_id_ = write_req->GetPartitionId(); // Write to eloqstore async - EloqStoreOperationData<::kvstore::BatchWriteRequest> *write_op = + EloqStoreOperationData<::eloqstore::BatchWriteRequest> *write_op = eloq_store_batch_write_op_pool_.NextObject(); write_op->Reset(write_req); PoolableGuard op_guard(write_op); - ::kvstore::BatchWriteRequest &kv_write_req = write_op->EloqStoreRequest(); + ::eloqstore::BatchWriteRequest &kv_write_req = write_op->EloqStoreRequest(); - std::vector<::kvstore::WriteDataEntry> entries; + std::vector<::eloqstore::WriteDataEntry> entries; size_t rec_cnt = write_req->RecordsCount(); entries.reserve(rec_cnt); const uint16_t parts_per_key = write_req->PartsCountPerKey(); @@ -159,22 +159,22 @@ void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req) size_t first_idx = 0; for (size_t i = 0; i < rec_cnt; ++i) { - ::kvstore::WriteDataEntry entry; + ::eloqstore::WriteDataEntry entry; first_idx = i * parts_per_key; BuildKey(*write_req, first_idx, parts_per_key, entry.key_); first_idx = i * parts_per_record; BuildValue(*write_req, first_idx, parts_per_record, entry.val_); entry.timestamp_ = write_req->GetRecordTs(i); entry.op_ = (write_req->KeyOpType(i) == WriteOpType::PUT - ? ::kvstore::WriteOp::Upsert - : ::kvstore::WriteOp::Delete); + ? ::eloqstore::WriteOp::Upsert + : ::eloqstore::WriteOp::Delete); entries.emplace_back(std::move(entry)); } if (!std::is_sorted(entries.begin(), entries.end(), - [](const ::kvstore::WriteDataEntry &lhs, - const ::kvstore::WriteDataEntry &rhs) + [](const ::eloqstore::WriteDataEntry &lhs, + const ::eloqstore::WriteDataEntry &rhs) { return lhs.key_ < rhs.key_; })) { DLOG(INFO) << "Sort this batch records in non-descending order before " @@ -183,8 +183,8 @@ void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req) // Sort the batch keys std::sort(entries.begin(), entries.end(), - [](const ::kvstore::WriteDataEntry &lhs, - const ::kvstore::WriteDataEntry &rhs) + [](const ::eloqstore::WriteDataEntry &lhs, + const ::eloqstore::WriteDataEntry &rhs) { return lhs.key_ < rhs.key_; }); } @@ -205,15 +205,15 @@ void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req) op_guard.Release(); } -void EloqStoreDataStore::OnBatchWrite(::kvstore::KvRequest *req) +void EloqStoreDataStore::OnBatchWrite(::eloqstore::KvRequest *req) { - EloqStoreOperationData<::kvstore::BatchWriteRequest> *write_op = - static_cast *>( + EloqStoreOperationData<::eloqstore::BatchWriteRequest> *write_op = + static_cast *>( reinterpret_cast(req->UserData())); assert(req == &write_op->EloqStoreRequest()); - ::kvstore::BatchWriteRequest *write_req = - static_cast<::kvstore::BatchWriteRequest *>(req); + ::eloqstore::BatchWriteRequest *write_req = + static_cast<::eloqstore::BatchWriteRequest *>(req); PoolableGuard op_guard(write_op); @@ -221,7 +221,7 @@ void EloqStoreDataStore::OnBatchWrite(::kvstore::KvRequest *req) static_cast(write_op->DataStoreRequest()); remote::CommonResult result; - if (write_req->Error() != ::kvstore::KvError::NoError) + if (write_req->Error() != ::eloqstore::KvError::NoError) { LOG(ERROR) << "Write to EloqStore failed with error code: " << static_cast(write_req->Error()) @@ -258,20 +258,20 @@ void EloqStoreDataStore::DeleteRange(DeleteRangeRequest *delete_range_req) // Truncate from start key. - ::kvstore::TableIdent eloq_store_table_id; + ::eloqstore::TableIdent eloq_store_table_id; eloq_store_table_id.tbl_name_ = delete_range_req->GetTableName(), eloq_store_table_id.partition_id_ = delete_range_req->GetPartitionId(); const std::string_view start_key = delete_range_req->GetStartKey(); // Delete records from eloqstore async - EloqStoreOperationData<::kvstore::TruncateRequest> *truncate_op = + EloqStoreOperationData<::eloqstore::TruncateRequest> *truncate_op = eloq_store_truncate_op_pool_.NextObject(); truncate_op->Reset(delete_range_req); PoolableGuard op_guard(delete_range_req); - ::kvstore::TruncateRequest &kv_truncate_req = + ::eloqstore::TruncateRequest &kv_truncate_req = truncate_op->EloqStoreRequest(); kv_truncate_req.SetArgs(eloq_store_table_id, start_key); @@ -292,10 +292,10 @@ void EloqStoreDataStore::DeleteRange(DeleteRangeRequest *delete_range_req) op_guard.Release(); } -void EloqStoreDataStore::OnDeleteRange(::kvstore::KvRequest *req) +void EloqStoreDataStore::OnDeleteRange(::eloqstore::KvRequest *req) { - EloqStoreOperationData<::kvstore::TruncateRequest> *truncate_op = - static_cast *>( + EloqStoreOperationData<::eloqstore::TruncateRequest> *truncate_op = + static_cast *>( reinterpret_cast(req->UserData())); assert(req == &truncate_op->EloqStoreRequest()); @@ -306,8 +306,8 @@ void EloqStoreDataStore::OnDeleteRange(::kvstore::KvRequest *req) static_cast(truncate_op->DataStoreRequest()); remote::CommonResult result; - if (req->Error() != ::kvstore::KvError::NoError && - req->Error() != ::kvstore::KvError::NotFound) + if (req->Error() != ::eloqstore::KvError::NoError && + req->Error() != ::eloqstore::KvError::NotFound) { LOG(ERROR) << "Delete keys from EloqStore failed with error code: " << static_cast(req->Error()) @@ -352,7 +352,7 @@ void EloqStoreDataStore::ScanNext(ScanRequest *scan_req) return; } - ::kvstore::TableIdent eloq_store_table_id; + ::eloqstore::TableIdent eloq_store_table_id; eloq_store_table_id.tbl_name_ = scan_req->GetTableName(); eloq_store_table_id.partition_id_ = scan_req->GetPartitionId(); @@ -362,13 +362,13 @@ void EloqStoreDataStore::ScanNext(ScanRequest *scan_req) // const bool inclusive_end = scan_req->InclusiveEnd(); // Scan from eloqstore async - EloqStoreOperationData<::kvstore::ScanRequest> *scan_op = + EloqStoreOperationData<::eloqstore::ScanRequest> *scan_op = eloq_store_scan_req_op_pool_.NextObject(); scan_op->Reset(scan_req); PoolableGuard op_guard(scan_op); - ::kvstore::ScanRequest &kv_scan_req = scan_op->EloqStoreRequest(); + ::eloqstore::ScanRequest &kv_scan_req = scan_op->EloqStoreRequest(); kv_scan_req.SetArgs( eloq_store_table_id, start_key, end_key, inclusive_start); kv_scan_req.SetPagination(batch_size, 0); @@ -385,31 +385,31 @@ void EloqStoreDataStore::ScanNext(ScanRequest *scan_req) op_guard.Release(); } -void EloqStoreDataStore::OnScanNext(::kvstore::KvRequest *req) +void EloqStoreDataStore::OnScanNext(::eloqstore::KvRequest *req) { - EloqStoreOperationData<::kvstore::ScanRequest> *scan_op = - static_cast *>( + EloqStoreOperationData<::eloqstore::ScanRequest> *scan_op = + static_cast *>( reinterpret_cast(req->UserData())); assert(req == &scan_op->EloqStoreRequest()); - ::kvstore::ScanRequest *scan_req = - static_cast<::kvstore::ScanRequest *>(req); + ::eloqstore::ScanRequest *scan_req = + static_cast<::eloqstore::ScanRequest *>(req); PoolableGuard op_guard(scan_op); ScanRequest *ds_scan_req = static_cast(scan_op->DataStoreRequest()); - if (scan_req->Error() != ::kvstore::KvError::NoError) + if (scan_req->Error() != ::eloqstore::KvError::NoError) { - LOG_IF(ERROR, scan_req->Error() != ::kvstore::KvError::NotFound) + LOG_IF(ERROR, scan_req->Error() != ::eloqstore::KvError::NotFound) << "Scan from EloqStore failed with error code: " << static_cast(scan_req->Error()) << ", error message: " << scan_req->ErrMessage() << ". Table: " << req->TableId(); remote::DataStoreError error_code = - scan_req->Error() == ::kvstore::KvError::NotFound + scan_req->Error() == ::eloqstore::KvError::NotFound ? remote::DataStoreError::NO_ERROR : remote::DataStoreError::READ_FAILED; ds_scan_req->SetFinish(error_code); @@ -472,7 +472,7 @@ void EloqStoreDataStore::SwitchToReadWrite() void EloqStoreDataStore::ScanDelete(DeleteRangeRequest *delete_range_req) { - ::kvstore::TableIdent eloq_store_table_id; + ::eloqstore::TableIdent eloq_store_table_id; eloq_store_table_id.tbl_name_ = delete_range_req->GetTableName(); eloq_store_table_id.partition_id_ = delete_range_req->GetPartitionId(); @@ -486,7 +486,7 @@ void EloqStoreDataStore::ScanDelete(DeleteRangeRequest *delete_range_req) PoolableGuard op_guard(scan_del_op); - ::kvstore::ScanRequest &kv_scan_req = scan_del_op->EloqStoreScanRequest(); + ::eloqstore::ScanRequest &kv_scan_req = scan_del_op->EloqStoreScanRequest(); kv_scan_req.SetArgs(eloq_store_table_id, start_key, end_key, true); kv_scan_req.SetPagination(1024, 0); @@ -505,7 +505,7 @@ void EloqStoreDataStore::ScanDelete(DeleteRangeRequest *delete_range_req) op_guard.Release(); } -void EloqStoreDataStore::OnScanDelete(::kvstore::KvRequest *req) +void EloqStoreDataStore::OnScanDelete(::eloqstore::KvRequest *req) { ScanDeleteOperationData *scan_del_op = static_cast( @@ -522,18 +522,18 @@ void EloqStoreDataStore::OnScanDelete(::kvstore::KvRequest *req) if (scan_del_op->OperationStage() == ScanDeleteOperationData::Stage::SCAN) { // scan stage - ::kvstore::ScanRequest *scan_req = - static_cast<::kvstore::ScanRequest *>(req); + ::eloqstore::ScanRequest *scan_req = + static_cast<::eloqstore::ScanRequest *>(req); - if (scan_req->Error() != ::kvstore::KvError::NoError) + if (scan_req->Error() != ::eloqstore::KvError::NoError) { - LOG_IF(ERROR, scan_req->Error() != ::kvstore::KvError::NotFound) + LOG_IF(ERROR, scan_req->Error() != ::eloqstore::KvError::NotFound) << "Scan from EloqStore failed with error code: " << static_cast(scan_req->Error()) << ", error message: " << scan_req->ErrMessage() << ". Table: " << req->TableId(); - if (scan_req->Error() == ::kvstore::KvError::NotFound) + if (scan_req->Error() == ::eloqstore::KvError::NotFound) { assert(!scan_req->HasRemaining() && scan_del_op->entries_.size() == 0); @@ -558,19 +558,19 @@ void EloqStoreDataStore::OnScanDelete(::kvstore::KvRequest *req) // delete this batch keys scan_del_op->UpdateOperationStage( ScanDeleteOperationData::Stage::DELETE); - ::kvstore::BatchWriteRequest &kv_write_req = + ::eloqstore::BatchWriteRequest &kv_write_req = scan_del_op->EloqStoreWriteRequest(); uint64_t delete_ts = scan_del_op->OpTs(); - std::vector<::kvstore::WriteDataEntry> delete_entries; + std::vector<::eloqstore::WriteDataEntry> delete_entries; delete_entries.reserve(scan_key_cnt); for (auto &entry : scan_req->Entries()) { delete_entries.emplace_back(std::move(entry.key_), std::move(entry.value_), delete_ts, - ::kvstore::WriteOp::Delete); + ::eloqstore::WriteOp::Delete); } kv_write_req.SetArgs(scan_req->TableId(), @@ -606,10 +606,10 @@ void EloqStoreDataStore::OnScanDelete(::kvstore::KvRequest *req) assert(scan_del_op->OperationStage() == ScanDeleteOperationData::Stage::DELETE); - ::kvstore::BatchWriteRequest *write_req = - static_cast<::kvstore::BatchWriteRequest *>(req); + ::eloqstore::BatchWriteRequest *write_req = + static_cast<::eloqstore::BatchWriteRequest *>(req); - if (write_req->Error() != ::kvstore::KvError::NoError) + if (write_req->Error() != ::eloqstore::KvError::NoError) { LOG(ERROR) << "Delete batch keys from EloqStore failed with error code: " @@ -631,7 +631,7 @@ void EloqStoreDataStore::OnScanDelete(::kvstore::KvRequest *req) scan_del_op->UpdateOperationStage( ScanDeleteOperationData::Stage::SCAN); - ::kvstore::ScanRequest &kv_scan_req = + ::eloqstore::ScanRequest &kv_scan_req = scan_del_op->EloqStoreScanRequest(); kv_scan_req.SetArgs(write_req->TableId(), last_scan_end_key, @@ -669,20 +669,20 @@ void EloqStoreDataStore::Floor(ScanRequest *scan_req) { assert(scan_req->BatchSize() == 1); - ::kvstore::TableIdent eloq_store_table_id; + ::eloqstore::TableIdent eloq_store_table_id; eloq_store_table_id.tbl_name_ = scan_req->GetTableName(); eloq_store_table_id.partition_id_ = scan_req->GetPartitionId(); const std::string_view start_key = scan_req->GetStartKey(); // Floor from eloqstore async - EloqStoreOperationData<::kvstore::FloorRequest> *floor_op = + EloqStoreOperationData<::eloqstore::FloorRequest> *floor_op = eloq_store_floor_op_pool_.NextObject(); floor_op->Reset(scan_req); PoolableGuard op_guard(floor_op); - ::kvstore::FloorRequest &kv_floor_req = floor_op->EloqStoreRequest(); + ::eloqstore::FloorRequest &kv_floor_req = floor_op->EloqStoreRequest(); kv_floor_req.SetArgs(eloq_store_table_id, start_key); uint64_t user_data = reinterpret_cast(floor_op); @@ -697,22 +697,22 @@ void EloqStoreDataStore::Floor(ScanRequest *scan_req) op_guard.Release(); } -void EloqStoreDataStore::OnFloor(::kvstore::KvRequest *req) +void EloqStoreDataStore::OnFloor(::eloqstore::KvRequest *req) { - EloqStoreOperationData<::kvstore::FloorRequest> *floor_op = - static_cast *>( + EloqStoreOperationData<::eloqstore::FloorRequest> *floor_op = + static_cast *>( reinterpret_cast(req->UserData())); assert(req == &floor_op->EloqStoreRequest()); - ::kvstore::FloorRequest *floor_req = - static_cast<::kvstore::FloorRequest *>(req); + ::eloqstore::FloorRequest *floor_req = + static_cast<::eloqstore::FloorRequest *>(req); PoolableGuard op_guard(floor_op); ScanRequest *ds_scan_req = static_cast(floor_op->DataStoreRequest()); - if (floor_req->Error() != ::kvstore::KvError::NoError) + if (floor_req->Error() != ::eloqstore::KvError::NoError) { LOG(ERROR) << "Floor from EloqStore failed with error code: " << static_cast(floor_req->Error()) diff --git a/eloq_data_store_service/eloq_store_data_store.h b/eloq_data_store_service/eloq_store_data_store.h index 91f7494..4a9c180 100644 --- a/eloq_data_store_service/eloq_store_data_store.h +++ b/eloq_data_store_service/eloq_store_data_store.h @@ -82,7 +82,7 @@ struct ScanDeleteOperationData : public Poolable ScanDeleteOperationData(const ScanDeleteOperationData &rhs) = delete; ScanDeleteOperationData(ScanDeleteOperationData &&rhs) = delete; - void Reset(Poolable *ds_req_ptr, ::kvstore::EloqStore *store) + void Reset(Poolable *ds_req_ptr, ::eloqstore::EloqStore *store) { data_store_request_ptr_ = ds_req_ptr; op_ts_ = @@ -101,12 +101,12 @@ struct ScanDeleteOperationData : public Poolable entries_.clear(); } - ::kvstore::ScanRequest &EloqStoreScanRequest() + ::eloqstore::ScanRequest &EloqStoreScanRequest() { return kv_scan_req_; } - ::kvstore::BatchWriteRequest &EloqStoreWriteRequest() + ::eloqstore::BatchWriteRequest &EloqStoreWriteRequest() { return kv_write_req_; } @@ -141,20 +141,20 @@ struct ScanDeleteOperationData : public Poolable return op_stage_; } - ::kvstore::EloqStore *EloqStoreService() const + ::eloqstore::EloqStore *EloqStoreService() const { return eloq_store_; } private: - ::kvstore::ScanRequest kv_scan_req_; - ::kvstore::BatchWriteRequest kv_write_req_; + ::eloqstore::ScanRequest kv_scan_req_; + ::eloqstore::BatchWriteRequest kv_write_req_; Poolable *data_store_request_ptr_{nullptr}; uint64_t op_ts_{0}; - std::vector<::kvstore::WriteDataEntry> entries_; + std::vector<::eloqstore::WriteDataEntry> entries_; std::string last_scan_end_key_{""}; Stage op_stage_{Stage::SCAN}; - ::kvstore::EloqStore *eloq_store_{nullptr}; + ::eloqstore::EloqStore *eloq_store_{nullptr}; friend class EloqStoreDataStore; }; @@ -171,7 +171,7 @@ class EloqStoreDataStore : public DataStore public: EloqStoreDataStore(uint32_t shard_id, DataStoreService *data_store_service, - const kvstore::KvOptions &configs); + const eloqstore::KvOptions &configs); ~EloqStoreDataStore() = default; @@ -182,13 +182,13 @@ class EloqStoreDataStore : public DataStore bool StartDB(std::string cookie = "", std::string prev_cookie = "") override { - ::kvstore::KvError res = eloq_store_service_.Start(); - if (res != ::kvstore::KvError::NoError) + ::eloqstore::KvError res = eloq_store_service_.Start(); + if (res != ::eloqstore::KvError::NoError) { LOG(ERROR) << "EloqStore start failed with error code: " << static_cast(res); } - return res == ::kvstore::KvError::NoError; + return res == ::eloqstore::KvError::NoError; } void Shutdown() override @@ -262,16 +262,16 @@ class EloqStoreDataStore : public DataStore void SwitchToReadWrite() override; private: - static void OnRead(::kvstore::KvRequest *req); - static void OnBatchWrite(::kvstore::KvRequest *req); - static void OnDeleteRange(::kvstore::KvRequest *req); - static void OnScanNext(::kvstore::KvRequest *req); - static void OnScanDelete(::kvstore::KvRequest *req); - static void OnFloor(::kvstore::KvRequest *req); + static void OnRead(::eloqstore::KvRequest *req); + static void OnBatchWrite(::eloqstore::KvRequest *req); + static void OnDeleteRange(::eloqstore::KvRequest *req); + static void OnScanNext(::eloqstore::KvRequest *req); + static void OnScanDelete(::eloqstore::KvRequest *req); + static void OnFloor(::eloqstore::KvRequest *req); void ScanDelete(DeleteRangeRequest *delete_range_req); void Floor(ScanRequest *scan_req); - ::kvstore::EloqStore eloq_store_service_; + ::eloqstore::EloqStore eloq_store_service_; }; } // namespace EloqDS diff --git a/eloq_data_store_service/eloq_store_data_store_factory.h b/eloq_data_store_service/eloq_store_data_store_factory.h index 6da5a07..a8925c8 100644 --- a/eloq_data_store_service/eloq_store_data_store_factory.h +++ b/eloq_data_store_service/eloq_store_data_store_factory.h @@ -43,7 +43,7 @@ class EloqStoreDataStoreFactory : public DataStoreFactory DataStoreService *data_store_service, bool start_db = true) override { - ::kvstore::KvOptions store_config; + ::eloqstore::KvOptions store_config; store_config.num_threads = eloq_store_configs_.worker_count_; store_config.store_path.emplace_back() .append(eloq_store_configs_.storage_path_) diff --git a/eloq_data_store_service/main.cpp b/eloq_data_store_service/main.cpp index 8a0534c..22fd834 100644 --- a/eloq_data_store_service/main.cpp +++ b/eloq_data_store_service/main.cpp @@ -377,7 +377,7 @@ int main(int argc, char *argv[]) data_store_service_.get()); #elif defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE) - ::kvstore::KvOptions store_config; + ::eloqstore::KvOptions store_config; store_config.num_threads = eloq_store_config.worker_count_; store_config.store_path.emplace_back() .append(eloq_store_config.storage_path_)