Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ set(SOURCES
db/art/filter_cache_heap.cc
db/art/filter_cache_item.cc
db/art/filter_cache.cc
db/art/filter_cache_client.cc
db/art/greedy_algo.cc
db/art/heat_group.cc
db/art/lock.cc
Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ cpp_library(
"db/art/filter_cache_heap.cc",
"db/art/filter_cache_item.cc",
"db/art/filter_cache.cc",
"db/art/filter_cache_client.cc",
"db/art/greedy_algo.cc",
"db/art/heat_group.cc",
"db/art/lock.cc",
Expand Down
2 changes: 1 addition & 1 deletion db/art/filter_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ bool FilterCacheManager::adjust_cache_and_heap() {
if (can_adjust) {
std::unordered_map<uint32_t, uint16_t> segment_units_num_recorder;
std::set<uint32_t> empty_level_0_segment_ids; // no level 0 segment in heaps, dont worry
std::set<uint32_t> empty_failed_segment_ids;
std::set<uint32_t> empty_failed_segment_ids; // force to update segments' filter units group, so dont worry for cache space
segment_units_num_recorder.insert(std::make_pair(result.enable_segment_id, result.enable_segment_next_units_num));
segment_units_num_recorder.insert(std::make_pair(result.disable_segment_id, result.disable_segment_next_units_num));
filter_cache_.update_for_segments(segment_units_num_recorder, true, empty_level_0_segment_ids, empty_failed_segment_ids);
Expand Down
48 changes: 27 additions & 21 deletions db/art/filter_cache_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ namespace ROCKSDB_NAMESPACE {

task_thread_pool::task_thread_pool FilterCacheClient::pool_{FILTER_CACHE_THREADS_NUM};
FilterCacheManager FilterCacheClient::filter_cache_manager_;
bool FilterCacheClient::filter_cache_ready_;
bool FilterCacheClient::heat_buckets_ready_;

void FilterCacheClient::do_prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>*& segment_info_recorder) {
void FilterCacheClient::do_prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>* const segment_info_recorder) {
filter_cache_manager_.make_heat_buckets_ready(key, *segment_info_recorder);
}

bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>*& segment_info_recorder) {
bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>* const segment_info_recorder) {
heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready();
if (!heat_buckets_ready_) {
// if heat_buckets_ready_ false
Expand All @@ -21,10 +20,10 @@ bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unorde
return heat_buckets_ready_;
}

void FilterCacheClient::do_retrain_or_keep_model(std::vector<uint16_t>*& features_nums,
std::map<uint32_t, uint16_t>*& level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder,
std::map<uint32_t, uint32_t>*& unit_size_recorder) {
void FilterCacheClient::do_retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder) {
// if this func background monitor signal, how can it receive latest argument? input pointer!
while (!filter_cache_manager_.ready_work()); // wait for manager ready
assert(filter_cache_manager_.heat_buckets_ready()); // must guarantee that heat buckets ready before we make filter cache manager ready
Expand All @@ -43,10 +42,10 @@ void FilterCacheClient::do_retrain_or_keep_model(std::vector<uint16_t>*& feature
// this loop never end
}

void FilterCacheClient::retrain_or_keep_model(std::vector<uint16_t>*& features_nums,
std::map<uint32_t, uint16_t>*& level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder,
std::map<uint32_t, uint32_t>*& unit_size_recorder) {
void FilterCacheClient::retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder) {
pool_.submit_detach(do_retrain_or_keep_model, features_nums, level_recorder, segment_ranges_recorder, unit_size_recorder);
// if first model training not end, python lgb_model server still return default units num
// then retrain model when every long period end. if model still work well, keep this model instead
Expand All @@ -72,26 +71,33 @@ void FilterCacheClient::get_updating_work(const std::string& key) {
}

void FilterCacheClient::do_make_adjustment() {
filter_cache_manager_.adjust_cache_and_heap();
while (true) {
// never stop making heap adjustment
filter_cache_manager_.adjust_cache_and_heap();
}
}

void FilterCacheClient::make_adjustment() {
pool_.submit_detach(do_make_adjustment);
}

void FilterCacheClient::do_batch_insert_segments(std::vector<uint32_t>*& merged_segment_ids, std::vector<uint32_t>*& new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>*& inherit_infos_recorder,
std::map<uint32_t, uint16_t>*& level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder) {
void FilterCacheClient::do_batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder) {
filter_cache_manager_.insert_segments(*merged_segment_ids, *new_segment_ids, *inherit_infos_recorder,
*level_recorder, level_0_base_count, *segment_ranges_recorder);
}

void FilterCacheClient::batch_insert_segments(std::vector<uint32_t>*& merged_segment_ids, std::vector<uint32_t>*& new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>*& inherit_infos_recorder,
std::map<uint32_t, uint16_t>*& level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder) {
pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, level_0_base_count, segment_ranges_recorder);
void FilterCacheClient::batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder) {
if (level_0_base_count == 0) {
pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, INIT_LEVEL_0_COUNT, segment_ranges_recorder);
} else {
pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, level_0_base_count, segment_ranges_recorder);
}
}

}
38 changes: 18 additions & 20 deletions db/art/filter_cache_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ class FilterCacheClient {
// we need heat_buckets_ready_ to become true before filter_cache_ready_
// In YCSB benchmark, we first load data (insert key-value pairs) then may try get operation
// so we can guarantee that heat_buckets_ready_ become true before filter_cache_ready_
static bool filter_cache_ready_; // the same as FilterCacheManager.is_ready_
static bool heat_buckets_ready_; // the same as FilterCacheManager.heat_buckets_.is_ready()

// background thread part of prepare_heat_buckets
static void do_prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>*& segment_info_recorder);
static void do_prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>* const segment_info_recorder);

// background thread part of retrain_or_keep_model
static void do_retrain_or_keep_model(std::vector<uint16_t>*& features_nums,
std::map<uint32_t, uint16_t>*& level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder,
std::map<uint32_t, uint32_t>*& unit_size_recorder);
static void do_retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder);

// background thread part of check_key
static void do_hit_count_recorder(const uint32_t& segment_id);
Expand All @@ -39,13 +38,12 @@ class FilterCacheClient {
static void do_make_adjustment();

// background thread part of batch_insert_segments
static void do_batch_insert_segments(std::vector<uint32_t>*& merged_segment_ids, std::vector<uint32_t>*& new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>*& inherit_infos_recorder,
std::map<uint32_t, uint16_t>*& level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder);
static void do_batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder);
public:
FilterCacheClient() {
filter_cache_ready_ = false;
heat_buckets_ready_ = false;
}

Expand All @@ -56,7 +54,7 @@ class FilterCacheClient {
// (we can modify micro SAMPLES_MAXCNT to fit in the YCSB load period, simply, SAMPLES_MAXCNT should be at least 50%-75% of load data num ???)
// set SAMPLES_MAXCNT < YCSB load kv nums, to make sure that we can make heat_buckets ready in YCSB load period
// if segment_info_recorder is empty, try default key ranges num and divide
bool prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>*& segment_info_recorder);
bool prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>* const segment_info_recorder);

// correspinding to FilterCacheManager work: monitor manager ready_work(), call manager make_clf_model_ready and train first model
// lastly call update_cache_and_heap
Expand All @@ -65,10 +63,10 @@ class FilterCacheClient {
// please ensure that 3 recorders need to keep the same segments set, or error will occur in train func
// you can use mutex in compaction and flushing to guarantee this
// then when every long period end, try to retrain a new model or keep last model
void retrain_or_keep_model(std::vector<uint16_t>*& features_nums,
std::map<uint32_t, uint16_t>*& level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder,
std::map<uint32_t, uint32_t>*& unit_size_recorder);
void retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder);

// correespinding to FilterCacheManager work: check_key and hit_count_recorder
// return FilterCacheManager.check_key() and leave hit_count_recorder to background
Expand All @@ -81,10 +79,10 @@ class FilterCacheClient {
void make_adjustment();

// batch insert segments into filter cache manager
void batch_insert_segments(std::vector<uint32_t>*& merged_segment_ids, std::vector<uint32_t>*& new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>*& inherit_infos_recorder,
std::map<uint32_t, uint16_t>*& level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>*& segment_ranges_recorder);
void batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder);
};

}
2 changes: 2 additions & 0 deletions db/art/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ namespace ROCKSDB_NAMESPACE {
// filter cache map threshold
#define FULL_RATE 0.98
#define READY_RATE 0.70
// default init L0 counts
#define INIT_LEVEL_0_COUNT 0

// filter cache client background threads num
#define FILTER_CACHE_THREADS_NUM 10
Expand Down
13 changes: 13 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,

if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
#ifndef ART_PLUS
sv->current->Get(
read_options, lkey, get_impl_options.value, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
Expand All @@ -1886,6 +1887,17 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
#else
sv->current->Get(
filter_cache_,
read_options, lkey, get_impl_options.value, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
#endif
RecordTick(stats_, MEMTABLE_MISS);
get_in_ssd.fetch_add(1);
} else {
Expand Down Expand Up @@ -1927,6 +1939,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
return s;
}

// TODO: WaLSM+ Benchmark dont use MultiGet interface
std::vector<Status> DBImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
Expand Down
6 changes: 6 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
#include "db/art/vlog_manager.h"
#include "db/art/heat_buckets.h"
#include "db/art/clf_model.h"
#include "db/art/filter_cache_item.h"
#include "db/art/filter_cache_heap.h"
#include "db/art/filter_cache.h"
#include "db/art/filter_cache_client.h"
#include "db/art/greedy_algo.h"
#include "db/column_family.h"
#include "db/compaction/compaction_job.h"
Expand Down Expand Up @@ -1903,6 +1906,9 @@ class DBImpl : public DB {
HeatGroupManager* group_manager_;

#ifdef ART_PLUS
// TODO: add necessary filter cache info structures
FilterCacheClient filter_cache_; // already contain FilterCacheManager

/*
HeatBuckets heat_buckets_;

Expand Down
Loading