diff --git a/CMakeLists.txt b/CMakeLists.txt index 9210fefd5..3d1050205 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -595,6 +595,7 @@ set(SOURCES db/art/filter_cache_heap.cc db/art/filter_cache_item.cc db/art/filter_cache.cc + db/art/filter_cache_client.cc db/art/greedy_algo.cc db/art/heat_group.cc db/art/lock.cc diff --git a/TARGETS b/TARGETS index 27a138ce0..f18fc69c9 100644 --- a/TARGETS +++ b/TARGETS @@ -452,6 +452,7 @@ cpp_library( "db/art/filter_cache_heap.cc", "db/art/filter_cache_item.cc", "db/art/filter_cache.cc", + "db/art/filter_cache_client.cc", "db/art/greedy_algo.cc", "db/art/heat_group.cc", "db/art/lock.cc", diff --git a/db/art/filter_cache.cc b/db/art/filter_cache.cc index b5744e9fb..85dff016a 100644 --- a/db/art/filter_cache.cc +++ b/db/art/filter_cache.cc @@ -482,7 +482,7 @@ bool FilterCacheManager::adjust_cache_and_heap() { if (can_adjust) { std::unordered_map segment_units_num_recorder; std::set empty_level_0_segment_ids; // no level 0 segment in heaps, dont worry - std::set empty_failed_segment_ids; + std::set empty_failed_segment_ids; // force to update segments' filter units group, so dont worry for cache space segment_units_num_recorder.insert(std::make_pair(result.enable_segment_id, result.enable_segment_next_units_num)); segment_units_num_recorder.insert(std::make_pair(result.disable_segment_id, result.disable_segment_next_units_num)); filter_cache_.update_for_segments(segment_units_num_recorder, true, empty_level_0_segment_ids, empty_failed_segment_ids); diff --git a/db/art/filter_cache_client.cc b/db/art/filter_cache_client.cc index 2938e400f..ac77d0b0b 100644 --- a/db/art/filter_cache_client.cc +++ b/db/art/filter_cache_client.cc @@ -4,14 +4,13 @@ namespace ROCKSDB_NAMESPACE { task_thread_pool::task_thread_pool FilterCacheClient::pool_{FILTER_CACHE_THREADS_NUM}; FilterCacheManager FilterCacheClient::filter_cache_manager_; -bool FilterCacheClient::filter_cache_ready_; bool FilterCacheClient::heat_buckets_ready_; -void FilterCacheClient::do_prepare_heat_buckets(const std::string& key, std::unordered_map>*& segment_info_recorder) { +void FilterCacheClient::do_prepare_heat_buckets(const std::string& key, std::unordered_map>* const segment_info_recorder) { filter_cache_manager_.make_heat_buckets_ready(key, *segment_info_recorder); } -bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unordered_map>*& segment_info_recorder) { +bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unordered_map>* const segment_info_recorder) { heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready(); if (!heat_buckets_ready_) { // if heat_buckets_ready_ false @@ -21,10 +20,10 @@ bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unorde return heat_buckets_ready_; } -void FilterCacheClient::do_retrain_or_keep_model(std::vector*& features_nums, - std::map*& level_recorder, - std::map>*& segment_ranges_recorder, - std::map*& unit_size_recorder) { +void FilterCacheClient::do_retrain_or_keep_model(std::vector* const features_nums, + std::map* const level_recorder, + std::map>* const segment_ranges_recorder, + std::map* const unit_size_recorder) { // if this func background monitor signal, how can it receive latest argument? input pointer! while (!filter_cache_manager_.ready_work()); // wait for manager ready assert(filter_cache_manager_.heat_buckets_ready()); // must guarantee that heat buckets ready before we make filter cache manager ready @@ -43,10 +42,10 @@ void FilterCacheClient::do_retrain_or_keep_model(std::vector*& feature // this loop never end } -void FilterCacheClient::retrain_or_keep_model(std::vector*& features_nums, - std::map*& level_recorder, - std::map>*& segment_ranges_recorder, - std::map*& unit_size_recorder) { +void FilterCacheClient::retrain_or_keep_model(std::vector* const features_nums, + std::map* const level_recorder, + std::map>* const segment_ranges_recorder, + std::map* const unit_size_recorder) { pool_.submit_detach(do_retrain_or_keep_model, features_nums, level_recorder, segment_ranges_recorder, unit_size_recorder); // if first model training not end, python lgb_model server still return default units num // then retrain model when every long period end. if model still work well, keep this model instead @@ -72,26 +71,33 @@ void FilterCacheClient::get_updating_work(const std::string& key) { } void FilterCacheClient::do_make_adjustment() { - filter_cache_manager_.adjust_cache_and_heap(); + while (true) { + // never stop making heap adjustment + filter_cache_manager_.adjust_cache_and_heap(); + } } void FilterCacheClient::make_adjustment() { pool_.submit_detach(do_make_adjustment); } -void FilterCacheClient::do_batch_insert_segments(std::vector*& merged_segment_ids, std::vector*& new_segment_ids, - std::map>*& inherit_infos_recorder, - std::map*& level_recorder, const uint32_t& level_0_base_count, - std::map>*& segment_ranges_recorder) { +void FilterCacheClient::do_batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, + std::map>* const inherit_infos_recorder, + std::map* const level_recorder, const uint32_t& level_0_base_count, + std::map>* const segment_ranges_recorder) { filter_cache_manager_.insert_segments(*merged_segment_ids, *new_segment_ids, *inherit_infos_recorder, *level_recorder, level_0_base_count, *segment_ranges_recorder); } -void FilterCacheClient::batch_insert_segments(std::vector*& merged_segment_ids, std::vector*& new_segment_ids, - std::map>*& inherit_infos_recorder, - std::map*& level_recorder, const uint32_t& level_0_base_count, - std::map>*& segment_ranges_recorder) { - pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, level_0_base_count, segment_ranges_recorder); +void FilterCacheClient::batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, + std::map>* const inherit_infos_recorder, + std::map* const level_recorder, const uint32_t& level_0_base_count, + std::map>* const segment_ranges_recorder) { + if (level_0_base_count == 0) { + pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, INIT_LEVEL_0_COUNT, segment_ranges_recorder); + } else { + pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, level_0_base_count, segment_ranges_recorder); + } } } \ No newline at end of file diff --git a/db/art/filter_cache_client.h b/db/art/filter_cache_client.h index 01dddf04e..52b05663c 100644 --- a/db/art/filter_cache_client.h +++ b/db/art/filter_cache_client.h @@ -17,17 +17,16 @@ class FilterCacheClient { // we need heat_buckets_ready_ to become true before filter_cache_ready_ // In YCSB benchmark, we first load data (insert key-value pairs) then may try get operation // so we can guarantee that heat_buckets_ready_ become true before filter_cache_ready_ - static bool filter_cache_ready_; // the same as FilterCacheManager.is_ready_ static bool heat_buckets_ready_; // the same as FilterCacheManager.heat_buckets_.is_ready() // background thread part of prepare_heat_buckets - static void do_prepare_heat_buckets(const std::string& key, std::unordered_map>*& segment_info_recorder); + static void do_prepare_heat_buckets(const std::string& key, std::unordered_map>* const segment_info_recorder); // background thread part of retrain_or_keep_model - static void do_retrain_or_keep_model(std::vector*& features_nums, - std::map*& level_recorder, - std::map>*& segment_ranges_recorder, - std::map*& unit_size_recorder); + static void do_retrain_or_keep_model(std::vector* const features_nums, + std::map* const level_recorder, + std::map>* const segment_ranges_recorder, + std::map* const unit_size_recorder); // background thread part of check_key static void do_hit_count_recorder(const uint32_t& segment_id); @@ -39,13 +38,12 @@ class FilterCacheClient { static void do_make_adjustment(); // background thread part of batch_insert_segments - static void do_batch_insert_segments(std::vector*& merged_segment_ids, std::vector*& new_segment_ids, - std::map>*& inherit_infos_recorder, - std::map*& level_recorder, const uint32_t& level_0_base_count, - std::map>*& segment_ranges_recorder); + static void do_batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, + std::map>* const inherit_infos_recorder, + std::map* const level_recorder, const uint32_t& level_0_base_count, + std::map>* const segment_ranges_recorder); public: FilterCacheClient() { - filter_cache_ready_ = false; heat_buckets_ready_ = false; } @@ -56,7 +54,7 @@ class FilterCacheClient { // (we can modify micro SAMPLES_MAXCNT to fit in the YCSB load period, simply, SAMPLES_MAXCNT should be at least 50%-75% of load data num ???) // set SAMPLES_MAXCNT < YCSB load kv nums, to make sure that we can make heat_buckets ready in YCSB load period // if segment_info_recorder is empty, try default key ranges num and divide - bool prepare_heat_buckets(const std::string& key, std::unordered_map>*& segment_info_recorder); + bool prepare_heat_buckets(const std::string& key, std::unordered_map>* const segment_info_recorder); // correspinding to FilterCacheManager work: monitor manager ready_work(), call manager make_clf_model_ready and train first model // lastly call update_cache_and_heap @@ -65,10 +63,10 @@ class FilterCacheClient { // please ensure that 3 recorders need to keep the same segments set, or error will occur in train func // you can use mutex in compaction and flushing to guarantee this // then when every long period end, try to retrain a new model or keep last model - void retrain_or_keep_model(std::vector*& features_nums, - std::map*& level_recorder, - std::map>*& segment_ranges_recorder, - std::map*& unit_size_recorder); + void retrain_or_keep_model(std::vector* const features_nums, + std::map* const level_recorder, + std::map>* const segment_ranges_recorder, + std::map* const unit_size_recorder); // correespinding to FilterCacheManager work: check_key and hit_count_recorder // return FilterCacheManager.check_key() and leave hit_count_recorder to background @@ -81,10 +79,10 @@ class FilterCacheClient { void make_adjustment(); // batch insert segments into filter cache manager - void batch_insert_segments(std::vector*& merged_segment_ids, std::vector*& new_segment_ids, - std::map>*& inherit_infos_recorder, - std::map*& level_recorder, const uint32_t& level_0_base_count, - std::map>*& segment_ranges_recorder); + void batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, + std::map>* const inherit_infos_recorder, + std::map* const level_recorder, const uint32_t& level_0_base_count, + std::map>* const segment_ranges_recorder); }; } diff --git a/db/art/macros.h b/db/art/macros.h index c8d53f5f5..631e27d99 100644 --- a/db/art/macros.h +++ b/db/art/macros.h @@ -203,6 +203,8 @@ namespace ROCKSDB_NAMESPACE { // filter cache map threshold #define FULL_RATE 0.98 #define READY_RATE 0.70 +// default init L0 counts +#define INIT_LEVEL_0_COUNT 0 // filter cache client background threads num #define FILTER_CACHE_THREADS_NUM 10 diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0935c645b..94f42c6bc 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1878,6 +1878,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); +#ifndef ART_PLUS sv->current->Get( read_options, lkey, get_impl_options.value, timestamp, &s, &merge_context, &max_covering_tombstone_seq, @@ -1886,6 +1887,17 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, get_impl_options.get_value ? get_impl_options.callback : nullptr, get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr, get_impl_options.get_value); +#else + sv->current->Get( + filter_cache_, + read_options, lkey, get_impl_options.value, timestamp, &s, + &merge_context, &max_covering_tombstone_seq, + get_impl_options.get_value ? get_impl_options.value_found : nullptr, + nullptr, nullptr, + get_impl_options.get_value ? get_impl_options.callback : nullptr, + get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr, + get_impl_options.get_value); +#endif RecordTick(stats_, MEMTABLE_MISS); get_in_ssd.fetch_add(1); } else { @@ -1927,6 +1939,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, return s; } +// TODO: WaLSM+ Benchmark dont use MultiGet interface std::vector DBImpl::MultiGet( const ReadOptions& read_options, const std::vector& column_family, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 805eb2dce..751ab7df2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -26,7 +26,10 @@ #include "db/art/vlog_manager.h" #include "db/art/heat_buckets.h" #include "db/art/clf_model.h" +#include "db/art/filter_cache_item.h" #include "db/art/filter_cache_heap.h" +#include "db/art/filter_cache.h" +#include "db/art/filter_cache_client.h" #include "db/art/greedy_algo.h" #include "db/column_family.h" #include "db/compaction/compaction_job.h" @@ -1903,6 +1906,9 @@ class DBImpl : public DB { HeatGroupManager* group_manager_; #ifdef ART_PLUS + // TODO: add necessary filter cache info structures + FilterCacheClient filter_cache_; // already contain FilterCacheManager + /* HeatBuckets heat_buckets_; diff --git a/db/table_cache.cc b/db/table_cache.cc index e685bb5f0..663ce8a94 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -470,6 +470,97 @@ Status TableCache::Get(const ReadOptions& options, return s; } +#ifdef ART_PLUS +Status TableCache::Get(FilterCacheClient& filter_cache, + const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, const Slice& k, + GetContext* get_context, + const SliceTransform* prefix_extractor, + HistogramImpl* file_read_hist, bool skip_filters, + int level, size_t max_file_size_for_l0_meta_pin) { + auto& fd = file_meta.fd; + std::string* row_cache_entry = nullptr; + bool done = false; +#ifndef ROCKSDB_LITE + IterKey row_cache_key; + std::string row_cache_entry_buffer; + + // Check row cache if enabled. Since row cache does not currently store + // sequence numbers, we cannot use it if we need to fetch the sequence. + if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { + auto user_key = ExtractUserKey(k); + CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key); + done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(), + get_context); + if (!done) { + row_cache_entry = &row_cache_entry_buffer; + } + } +#endif // ROCKSDB_LITE + Status s; + TableReader* t = fd.table_reader; + Cache::Handle* handle = nullptr; + if (!done) { + assert(s.ok()); + if (t == nullptr) { + s = FindTable(options, file_options_, internal_comparator, fd, &handle, + prefix_extractor, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, + level, true /* prefetch_index_and_filter_in_cache */, + max_file_size_for_l0_meta_pin); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + } + } + SequenceNumber* max_covering_tombstone_seq = + get_context->max_covering_tombstone_seq(); + if (s.ok() && max_covering_tombstone_seq != nullptr && + !options.ignore_range_deletions) { + std::unique_ptr range_del_iter( + t->NewRangeTombstoneIterator(options)); + if (range_del_iter != nullptr) { + *max_covering_tombstone_seq = std::max( + *max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k))); + } + } + if (s.ok()) { + get_context->SetReplayLog(row_cache_entry); // nullptr if no cache. + // only add filter_cache argument + s = t->Get(filter_cache, options, k, get_context, prefix_extractor, skip_filters); + get_context->SetReplayLog(nullptr); + } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { + // Couldn't find Table in cache but treat as kFound if no_io set + get_context->MarkKeyMayExist(); + s = Status::OK(); + done = true; + } + } + +#ifndef ROCKSDB_LITE + // Put the replay log in row cache only if something was found. + if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) { + size_t charge = + row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string); + void* row_ptr = new std::string(std::move(*row_cache_entry)); + // If row cache is full, it's OK to continue. + ioptions_.row_cache + ->Insert(row_cache_key.GetUserKey(), row_ptr, charge, + &DeleteEntry) + .PermitUncheckedError(); + } +#endif // ROCKSDB_LITE + + if (handle != nullptr) { + ReleaseHandle(handle); + } + return s; +} + +#endif + Status TableCache::InitFileTableReader(const ReadOptions& options, const InternalKeyComparator& internal_comparator, FileMetaData& file_meta) { @@ -491,6 +582,7 @@ Status TableCache::InitFileTableReader(const ReadOptions& options, } // Batched version of TableCache::MultiGet. +// TODO: WaLSM+ Benchmark dont use MultiGet interface Status TableCache::MultiGet(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, diff --git a/db/table_cache.h b/db/table_cache.h index d0933483f..6a1ed0e90 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -14,6 +14,7 @@ #include #include +#include "db/art/filter_cache_client.h" #include "db/dbformat.h" #include "db/range_del_aggregator.h" #include "options/cf_options.h" @@ -96,6 +97,17 @@ class TableCache { HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, int level = -1, size_t max_file_size_for_l0_meta_pin = 0); +#ifdef ART_PLUS + Status Get(FilterCacheClient& filter_cache, + const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, const Slice& k, + GetContext* get_context, + const SliceTransform* prefix_extractor = nullptr, + HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, + int level = -1, size_t max_file_size_for_l0_meta_pin = 0); +#endif + Status InitFileTableReader(const ReadOptions& options, const InternalKeyComparator& internal_comparator, FileMetaData& file_meta); @@ -117,6 +129,7 @@ class TableCache { // in the embedded GetContext // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" + // TODO: WaLSM+ Benchmark dont use MultiGet interface Status MultiGet(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, diff --git a/db/version_set.cc b/db/version_set.cc index 19c8abb63..799e1ca2d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1832,6 +1832,183 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } +#ifdef ART_PLUS +void Version::Get(FilterCacheClient& filter_cache, + const ReadOptions& read_options, const LookupKey& k, + PinnableSlice* value, std::string* timestamp, Status* status, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, bool* value_found, + bool* key_exists, SequenceNumber* seq, ReadCallback* callback, + bool* is_blob, bool do_merge) { + Slice ikey = k.internal_key(); + Slice user_key = k.user_key(); + + assert(status->ok() || status->IsMergeInProgress()); + + if (key_exists != nullptr) { + // will falsify below if not found + *key_exists = true; + } + + PinnedIteratorsManager pinned_iters_mgr; + uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId; + if (vset_ && vset_->block_cache_tracer_ && + vset_->block_cache_tracer_->is_tracing_enabled()) { + tracing_get_id = vset_->block_cache_tracer_->NextGetId(); + } + // determine hit partition + auto* hit_partition = storage_info_.GetHitPartition(user_key); + GetContext get_context( + user_comparator(), merge_operator_, info_log_, db_statistics_, nullptr, + status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, + do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found, + merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq, + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, + tracing_get_id); + + // Pin blocks that we read to hold merge operands + if (merge_operator_) { + pinned_iters_mgr.StartPinning(); + } + + std::vector hit_files[storage_info_.num_levels_]; + hit_files[0] = storage_info_.files_[0]; + for (int i = 1; i < storage_info_.num_levels_; i++) { + hit_files[i] = hit_partition->files_[i]; + } + FilePicker fp(hit_files, user_key, ikey, &storage_info_.level_files_brief_, + static_cast(storage_info_.num_levels_), + &storage_info_.file_indexer_, user_comparator(), + internal_comparator()); + FileMetaData* f = fp.GetNextFile(); + + int prev_level = 0; + while (f != nullptr) { + if (fp.GetCurrentLevel() != prev_level) { + prev_level = fp.GetCurrentLevel(); + hit_partition->queries[prev_level]++; + } + if (*max_covering_tombstone_seq > 0) { + // The remaining files we look at will only contain covered keys, so we + // stop here. + break; + } + if (get_context.sample()) { + sample_file_read_inc(f); + } + + // set counter + if (hit_partition->is_compaction_work[fp.GetCurrentLevel()]) { + get_context.SetSearchCount( + &hit_partition->search_counter[fp.GetCurrentLevel()]); + } else { + get_context.SetSearchCount(nullptr); + } + + bool timer_enabled = + GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && + get_perf_context()->per_level_perf_context_enabled; + StopWatchNano timer(env_, timer_enabled /* auto_start */); + // we only add filter_cache argument in this new Get method + *status = table_cache_->Get( + filter_cache, + read_options, *internal_comparator(), *f, ikey, &get_context, + mutable_cf_options_.prefix_extractor.get(), + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()), + fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_); + // TODO: examine the behavior for corrupted key + if (timer_enabled) { + PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), + fp.GetHitFileLevel()); + } + if (!status->ok()) { + return; + } + + // report the counters before returning + if (get_context.State() != GetContext::kNotFound && + get_context.State() != GetContext::kMerge && + db_statistics_ != nullptr) { + get_context.ReportCounters(); + } + if (db_statistics_ != nullptr) { + if (fp.GetCurrentLevel() == 0) { + RecordTick(db_statistics_, GET_MISS_L0); + } else if (fp.GetCurrentLevel() == 1) { + RecordTick(db_statistics_, GET_MISS_L1); + } else if (fp.GetCurrentLevel() >= 2) { + RecordTick(db_statistics_, GET_MISS_L2_AND_UP); + } + } + switch (get_context.State()) { + case GetContext::kNotFound: + // Keep searching in other files + break; + case GetContext::kMerge: + // TODO: update per-level perfcontext user_key_return_count for kMerge + break; + case GetContext::kFound: + if (fp.GetHitFileLevel() == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (fp.GetHitFileLevel() == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (fp.GetHitFileLevel() >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, + fp.GetHitFileLevel()); + return; + case GetContext::kDeleted: + // Use empty error message for speed + *status = Status::NotFound(); + return; + case GetContext::kCorrupt: + *status = Status::Corruption("corrupted key for ", user_key); + return; + case GetContext::kBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); + return; + } + f = fp.GetNextFile(); + } + if (db_statistics_ != nullptr) { + get_context.ReportCounters(); + } + if (GetContext::kMerge == get_context.State()) { + if (!do_merge) { + *status = Status::OK(); + return; + } + if (!merge_operator_) { + *status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + return; + } + // merge_operands are in saver and we hit the beginning of the key history + // do a final merge of nullptr and operands; + std::string* str_value = value != nullptr ? value->GetSelf() : nullptr; + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, merge_context->GetOperands(), + str_value, info_log_, db_statistics_, env_, + nullptr /* result_operand */, true); + if (LIKELY(value != nullptr)) { + value->PinSelf(); + } + } else { + if (key_exists != nullptr) { + *key_exists = false; + } + *status = Status::NotFound(); // Use an empty error message for speed + } +} +#endif + +// TODO: WaLSM+ Benchmark dont use MultiGet interface void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, ReadCallback* callback, bool* is_blob) { PinnedIteratorsManager pinned_iters_mgr; diff --git a/db/version_set.h b/db/version_set.h index 05abcc26c..e18bb5e6a 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -31,6 +31,7 @@ #include #include +#include "db/art/filter_cache_client.h" #include "db/blob/blob_file_meta.h" #include "db/column_family.h" #include "db/compaction/compaction.h" @@ -892,6 +893,16 @@ class Version { SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, bool* is_blob = nullptr, bool do_merge = true); +#ifdef ART_PLUS + void Get(FilterCacheClient& filter_cache, + const ReadOptions&, const LookupKey& key, PinnableSlice* value, + std::string* timestamp, Status* status, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + bool* value_found = nullptr, bool* key_exists = nullptr, + SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, + bool* is_blob = nullptr, bool do_merge = true); +#endif + // TODO: WaLSM+ Benchmark dont use MultiGet interface void MultiGet(const ReadOptions&, MultiGetRange* range, ReadCallback* callback = nullptr, bool* is_blob = nullptr); diff --git a/src.mk b/src.mk index 67e5c0410..f39f6809c 100644 --- a/src.mk +++ b/src.mk @@ -35,6 +35,7 @@ LIB_SOURCES = \ db/art/filter_cache_heap.cc \ db/art/filter_cache_item.cc \ db/art/filter_cache.cc \ + db/art/filter_cache_client.cc \ db/art/greedy_algo.cc \ db/art/heat_group.cc \ db/art/lock.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 32925fe3e..2257d10ea 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2194,6 +2194,48 @@ bool BlockBasedTable::FullFilterKeyMayMatch( return may_match; } +#ifdef ART_PLUS +bool BlockBasedTable::FullFilterKeyMayMatch( + FilterCacheClient& filter_cache, + const ReadOptions& read_options, FilterBlockReader* filter, + const Slice& internal_key, const bool no_io, + const SliceTransform* prefix_extractor, GetContext* get_context, + BlockCacheLookupContext* lookup_context) const { + if (filter == nullptr || filter->IsBlockBased()) { + return true; + } + Slice user_key = ExtractUserKey(internal_key); + const Slice* const const_ikey_ptr = &internal_key; + bool may_match = true; + if (rep_->whole_key_filtering) { + size_t ts_sz = + rep_->internal_comparator.user_comparator()->timestamp_size(); + Slice user_key_without_ts = StripTimestampFromUserKey(user_key, ts_sz); + // add filter_cache argument here + may_match = + filter->KeyMayMatch(filter_cache, + user_key_without_ts, prefix_extractor, kNotValid, + no_io, const_ikey_ptr, get_context, lookup_context); + } else if (!read_options.total_order_seek && prefix_extractor && + rep_->table_properties->prefix_extractor_name.compare( + prefix_extractor->Name()) == 0 && + prefix_extractor->InDomain(user_key) && + !filter->PrefixMayMatch(prefix_extractor->Transform(user_key), + prefix_extractor, kNotValid, no_io, + const_ikey_ptr, get_context, + lookup_context)) { + // WaLSM+ dont use prefix filter, so this branch will not reach + may_match = false; + } + if (may_match) { + RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_POSITIVE); + PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, 1, rep_->level); + } + return may_match; +} +#endif + +// TODO: not used in WaLSM+ Benchmark, meybe used in MultiGet interface ? void BlockBasedTable::FullFilterKeysMayMatch( const ReadOptions& read_options, FilterBlockReader* filter, MultiGetRange* range, const bool no_io, @@ -2298,7 +2340,195 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, prefix_extractor, v.handle.offset(), no_io, /*const_ikey_ptr=*/nullptr, get_context, &lookup_context); + + if (not_exist_in_filter) { + // Not found + // TODO: think about interaction with Merge. If a user key cannot + // cross one data block, we should be fine. + RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); + PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level); + break; + } + + if (!v.first_internal_key.empty() && !skip_filters && + UserComparatorWrapper(rep_->internal_comparator.user_comparator()) + .Compare(ExtractUserKey(key), + ExtractUserKey(v.first_internal_key)) < 0) { + // The requested key falls between highest key in previous block and + // lowest key in current block. + break; + } + + BlockCacheLookupContext lookup_data_block_context{ + TableReaderCaller::kUserGet, tracing_get_id, + /*get_from_user_specified_snapshot=*/read_options.snapshot != + nullptr}; + bool does_referenced_key_exist = false; + DataBlockIter biter; + uint64_t referenced_data_size = 0; + NewDataBlockIterator( + read_options, v.handle, &biter, BlockType::kData, get_context, + &lookup_data_block_context, + /*s=*/Status(), /*prefetch_buffer*/ nullptr); + + if (no_io && biter.status().IsIncomplete()) { + // couldn't get block from block_cache + // Update Saver.state to Found because we are only looking for + // whether we can guarantee the key is not there when "no_io" is set + get_context->MarkKeyMayExist(); + break; + } + if (!biter.status().ok()) { + s = biter.status(); + break; + } + + bool may_exist = biter.SeekForGet(key); + // If user-specified timestamp is supported, we cannot end the search + // just because hash index lookup indicates the key+ts does not exist. + if (!may_exist && ts_sz == 0) { + // HashSeek cannot find the key this block and the the iter is not + // the end of the block, i.e. cannot be in the following blocks + // either. In this case, the seek_key cannot be found, so we break + // from the top level for-loop. + done = true; + } else { + // Call the *saver function on each entry/block until it returns false + for (; biter.Valid(); biter.Next()) { + ParsedInternalKey parsed_key; + if (ParseInternalKey(biter.key(), &parsed_key) != Status::OK()) { + s = Status::Corruption(Slice()); + } + + if (!get_context->SaveValue( + parsed_key, biter.value(), &matched, + biter.IsValuePinned() ? &biter : nullptr)) { + if (get_context->State() == GetContext::GetState::kFound) { + does_referenced_key_exist = true; + referenced_data_size = biter.key().size() + biter.value().size(); + } + done = true; + break; + } + } + s = biter.status(); + } + // Write the block cache access record. + if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) { + // Avoid making copy of block_key, cf_name, and referenced_key when + // constructing the access record. + Slice referenced_key; + if (does_referenced_key_exist) { + referenced_key = biter.key(); + } else { + referenced_key = key; + } + BlockCacheTraceRecord access_record( + rep_->ioptions.env->NowMicros(), + /*block_key=*/"", lookup_data_block_context.block_type, + lookup_data_block_context.block_size, rep_->cf_id_for_tracing(), + /*cf_name=*/"", rep_->level_for_tracing(), + rep_->sst_number_for_tracing(), lookup_data_block_context.caller, + lookup_data_block_context.is_cache_hit, + lookup_data_block_context.no_insert, + lookup_data_block_context.get_id, + lookup_data_block_context.get_from_user_specified_snapshot, + /*referenced_key=*/"", referenced_data_size, + lookup_data_block_context.num_keys_in_block, + does_referenced_key_exist); + // TODO: Should handle status here? + block_cache_tracer_ + ->WriteBlockAccess(access_record, + lookup_data_block_context.block_key, + rep_->cf_name_for_tracing(), referenced_key) + .PermitUncheckedError(); + } + + if (done) { + // Avoid the extra Next which is expensive in two-level indexes + break; + } + } + if (matched && filter != nullptr && !filter->IsBlockBased()) { + RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE); + PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1, + rep_->level); + } + if (s.ok() && !iiter->status().IsNotFound()) { + s = iiter->status(); + } + } + + return s; +} + +#ifdef ART_PLUS +Status BlockBasedTable::Get(FilterCacheClient& filter_cache, + const ReadOptions& read_options, const Slice& key, + GetContext* get_context, + const SliceTransform* prefix_extractor, + bool skip_filters) { + assert(key.size() >= kNumInternalBytes); // key must be internal key + assert(get_context != nullptr); + Status s; + const bool no_io = read_options.read_tier == kBlockCacheTier; + + FilterBlockReader* const filter = + !skip_filters ? rep_->filter.get() : nullptr; + + // First check the full filter + // If full filter not useful, Then go into each block + uint64_t tracing_get_id = get_context->get_tracing_get_id(); + BlockCacheLookupContext lookup_context{ + TableReaderCaller::kUserGet, tracing_get_id, + /*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr}; + if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) { + // Trace the key since it contains both user key and sequence number. + lookup_context.referenced_key = key.ToString(); + lookup_context.get_from_user_specified_snapshot = + read_options.snapshot != nullptr; + } + TEST_SYNC_POINT("BlockBasedTable::Get:BeforeFilterMatch"); + // add filter_cache argument here + const bool may_match = + FullFilterKeyMayMatch(filter_cache, + read_options, filter, key, no_io, prefix_extractor, + get_context, &lookup_context); + TEST_SYNC_POINT("BlockBasedTable::Get:AfterFilterMatch"); + if (!may_match) { + RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); + PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level); + } else { + IndexBlockIter iiter_on_stack; + // if prefix_extractor found in block differs from options, disable + // BlockPrefixIndex. Only do this check when index_type is kHashSearch. + bool need_upper_bound_check = false; + if (rep_->index_type == BlockBasedTableOptions::kHashSearch) { + need_upper_bound_check = PrefixExtractorChanged( + rep_->table_properties.get(), prefix_extractor); + } + auto iiter = + NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack, + get_context, &lookup_context); + std::unique_ptr> iiter_unique_ptr; + if (iiter != &iiter_on_stack) { + iiter_unique_ptr.reset(iiter); + } + size_t ts_sz = + rep_->internal_comparator.user_comparator()->timestamp_size(); + bool matched = false; // if such user key matched a key in SST + bool done = false; + for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { + IndexValue v = iiter->value(); + // WaLSM+ dont use BlockBasedFilter, so not_exist_in_filter always false + bool not_exist_in_filter = + filter != nullptr && filter->IsBlockBased() == true && + !filter->KeyMayMatch(ExtractUserKeyAndStripTimestamp(key, ts_sz), + prefix_extractor, v.handle.offset(), no_io, + /*const_ikey_ptr=*/nullptr, get_context, + &lookup_context); + if (not_exist_in_filter) { // Not found // TODO: think about interaction with Merge. If a user key cannot @@ -2419,7 +2649,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, return s; } +#endif +// TODO: WaLSM+ Benchmark dont use MultiGet interface using MultiGetRange = MultiGetContext::Range; void BlockBasedTable::MultiGet(const ReadOptions& read_options, const MultiGetRange* mget_range, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 4d7d3f1cb..ce7c4ed8a 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -10,6 +10,7 @@ #pragma once #include +#include "db/art/filter_cache_client.h" #include "db/range_tombstone_fragmenter.h" #include "file/filename.h" #include "rocksdb/comparator.h" @@ -137,8 +138,16 @@ class BlockBasedTable : public TableReader { GetContext* get_context, const SliceTransform* prefix_extractor, bool skip_filters = false) override; +#ifdef ART_PLUS + Status Get(FilterCacheClient& filter_cache, + const ReadOptions& readOptions, const Slice& key, + GetContext* get_context, const SliceTransform* prefix_extractor, + bool skip_filters = false) override; +#endif + // WaLSM+ Note: call FullFilterKeyMayMatch() method in this file // PERF count False Positive in the end + // TODO: WaLSM+ Benchmark dont use MultiGet interface void MultiGet(const ReadOptions& readOptions, const MultiGetContext::Range* mget_range, const SliceTransform* prefix_extractor, @@ -409,6 +418,16 @@ class BlockBasedTable : public TableReader { const SliceTransform* prefix_extractor, GetContext* get_context, BlockCacheLookupContext* lookup_context) const; +#ifdef ART_PLUS + bool FullFilterKeyMayMatch(FilterCacheClient& filter_cache, + const ReadOptions& read_options, + FilterBlockReader* filter, const Slice& user_key, + const bool no_io, + const SliceTransform* prefix_extractor, + GetContext* get_context, + BlockCacheLookupContext* lookup_context) const; +#endif + // TODO: not used in WaLSM+ Benchmark, meybe used in MultiGet interface ? // WaLSM+ Note: filter->PrefixesMayMatch() or filter->KeyMayMatch() void FullFilterKeysMayMatch(const ReadOptions& read_options, FilterBlockReader* filter, MultiGetRange* range, diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index 48bad15b2..e077603f4 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -256,6 +256,26 @@ bool PartitionedFilterBlockReader::KeyMayMatch( &FullFilterBlockReader::KeyMayMatch); } +#ifdef ART_PLUS +bool PartitionedFilterBlockReader::KeyMayMatch( + FilterCacheClient& filter_cache, + const Slice& key, const SliceTransform* prefix_extractor, + uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr, + GetContext* get_context, BlockCacheLookupContext* lookup_context) { + assert(const_ikey_ptr != nullptr); + assert(block_offset == kNotValid); + if (!whole_key_filtering()) { + return true; + } + + return MayMatch(filter_cache, + key, prefix_extractor, block_offset, no_io, const_ikey_ptr, + get_context, lookup_context, + &FullFilterBlockReader::KeyMayMatch); +} +#endif + +// TODO: not used in WaLSM+ Benchmark, meybe used in MultiGet interface ? void PartitionedFilterBlockReader::KeysMayMatch( MultiGetRange* range, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, @@ -269,6 +289,7 @@ void PartitionedFilterBlockReader::KeysMayMatch( &FullFilterBlockReader::KeysMayMatch); } +// not use prefix filter in WaLSM+ Benchmark bool PartitionedFilterBlockReader::PrefixMayMatch( const Slice& prefix, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr, @@ -417,6 +438,70 @@ bool PartitionedFilterBlockReader::MayMatch( lookup_context); } +#ifdef ART_PLUS +bool PartitionedFilterBlockReader::MayMatch( + FilterCacheClient& filter_cache, + const Slice& slice, const SliceTransform* prefix_extractor, + uint64_t block_offset, bool no_io, const Slice* const_ikey_ptr, + GetContext* get_context, BlockCacheLookupContext* lookup_context, + FilterFunction filter_function) const { + /* + simple example of filter cache object: + uint32_t segment_id = 100; + std::string key = "k"; + bool result = filter_cache.check_key(segment_id, k); + */ + // TODO: leave filter unit data or filter unit reader into filter_cache, so block cache only need to cache filter index? + CachableEntry filter_block; + Status s = + GetOrReadFilterBlock(no_io, get_context, lookup_context, &filter_block); + if (UNLIKELY(!s.ok())) { + IGNORE_STATUS_IF_ERROR(s); + return true; + } + + if (UNLIKELY(filter_block.GetValue()->size() == 0)) { + return true; + } + + #ifdef ART_PLUS + // find key "0 original_internal key". filter_index=segment_id=0. (WaLSM+) + // segment_id itself is useless in comparison, + // but must be appended otherwise the extracted user key will be incorrect. + std::unique_ptr modified_key_buf; + Slice modified_key = + generate_modified_internal_key(modified_key_buf, *const_ikey_ptr, 0, 0); + auto filter_handle = GetFilterPartitionHandle(filter_block, modified_key); + #else + auto filter_handle = GetFilterPartitionHandle(filter_block, *const_ikey_ptr); + #endif + if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range + return false; + } + + // TODO: get some filter blocks from the filter cache and check (WaLSM+) + CachableEntry filter_partition_block; + s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle, + no_io, get_context, lookup_context, + &filter_partition_block); + if (UNLIKELY(!s.ok())) { + IGNORE_STATUS_IF_ERROR(s); + return true; + } + + FullFilterBlockReader filter_partition(table(), + std::move(filter_partition_block)); + // initialize the reader with hash_id (WaLSM+) + // FullFilterBlockReader filter_partition(table(), + // std::move(filter_partition_block), + // 1); + return (filter_partition.*filter_function)( + slice, prefix_extractor, block_offset, no_io, const_ikey_ptr, get_context, + lookup_context); +} +#endif + +// TODO: used when calling MultiGet, but we dont use MultiGet in WaLSM+ Benchmark // TODO: retrieve filter block from filter cache (WaLSM+) void PartitionedFilterBlockReader::MayMatch( MultiGetRange* range, const SliceTransform* prefix_extractor, diff --git a/table/block_based/partitioned_filter_block.h b/table/block_based/partitioned_filter_block.h index c681454b3..0d970a7a6 100644 --- a/table/block_based/partitioned_filter_block.h +++ b/table/block_based/partitioned_filter_block.h @@ -12,6 +12,7 @@ #include #include #include "db/dbformat.h" +#include "db/art/filter_cache_client.h" #include "index_builder.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" @@ -100,11 +101,20 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon { uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr, GetContext* get_context, BlockCacheLookupContext* lookup_context) override; +#ifdef ART_PLUS + bool KeyMayMatch(FilterCacheClient& filter_cache, + const Slice& key, const SliceTransform* prefix_extractor, + uint64_t block_offset, const bool no_io, + const Slice* const const_ikey_ptr, GetContext* get_context, + BlockCacheLookupContext* lookup_context); +#endif + // TODO: not used in WaLSM+ Benchmark, meybe used in MultiGet interface ? void KeysMayMatch(MultiGetRange* range, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, BlockCacheLookupContext* lookup_context) override; + // not use prefix filter in WaLSM+ experiments bool PrefixMayMatch(const Slice& prefix, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, @@ -137,6 +147,15 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon { GetContext* get_context, BlockCacheLookupContext* lookup_context, FilterFunction filter_function) const; +#ifdef ART_PLUS + bool MayMatch(FilterCacheClient& filter_cache, + const Slice& slice, const SliceTransform* prefix_extractor, + uint64_t block_offset, bool no_io, const Slice* const_ikey_ptr, + GetContext* get_context, + BlockCacheLookupContext* lookup_context, + FilterFunction filter_function) const; +#endif + // TODO: used when calling MultiGet, but we dont use MultiGet in WaLSM+ Benchmark using FilterManyFunction = void (FullFilterBlockReader::*)( MultiGetRange* range, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io,