Skip to content

Commit

Permalink
Merge pull request #50636 from ifed01/wip-ifed-bound-rm-range-keys-qui
Browse files Browse the repository at this point in the history
quincy: kv/RocksDBStore: cumulative backport for rm_range_keys and around

Reviewed-by: Adam Kupczyk <akupczyk@redhat.com>
  • Loading branch information
yuriw committed Oct 9, 2023
2 parents cdc9ca1 + 08252a2 commit c0cafa4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 19 deletions.
62 changes: 47 additions & 15 deletions src/kv/RocksDBStore.cc
Expand Up @@ -1707,7 +1707,7 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix
{
auto p_iter = db->cf_handles.find(prefix);
if (p_iter == db->cf_handles.end()) {
uint64_t cnt = db->delete_range_threshold;
uint64_t cnt = db->get_delete_range_threshold();
bat.SetSavePoint();
auto it = db->get_iterator(prefix);
for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
Expand All @@ -1726,10 +1726,10 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix
} else {
ceph_assert(p_iter->second.handles.size() >= 1);
for (auto cf : p_iter->second.handles) {
uint64_t cnt = db->delete_range_threshold;
uint64_t cnt = db->get_delete_range_threshold();
bat.SetSavePoint();
auto it = db->new_shard_iterator(cf);
for (it->SeekToFirst(); it->Valid() && (--cnt) != 0; it->Next()) {
for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
bat.Delete(cf, it->key());
}
if (cnt == 0) {
Expand All @@ -1747,18 +1747,24 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
const string &start,
const string &end)
{
ldout(db->cct, 10) << __func__ << " enter start=" << start
<< " end=" << end << dendl;
ldout(db->cct, 10) << __func__
<< " enter prefix=" << prefix
<< " start=" << pretty_binary_string(start)
<< " end=" << pretty_binary_string(end) << dendl;
auto p_iter = db->cf_handles.find(prefix);
uint64_t cnt = db->get_delete_range_threshold();
if (p_iter == db->cf_handles.end()) {
uint64_t cnt = db->delete_range_threshold;
uint64_t cnt0 = cnt;
bat.SetSavePoint();
auto it = db->get_iterator(prefix);
for (it->lower_bound(start);
it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
it->next()) {
bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
}
ldout(db->cct, 15) << __func__
<< " count = " << cnt0 - cnt
<< dendl;
if (cnt == 0) {
ldout(db->cct, 10) << __func__ << " p_iter == end(), resorting to DeleteRange"
<< dendl;
Expand All @@ -1769,18 +1775,31 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
} else {
bat.PopSavePoint();
}
} else if (cnt == 0) {
ceph_assert(p_iter->second.handles.size() >= 1);
for (auto cf : p_iter->second.handles) {
ldout(db->cct, 10) << __func__ << " p_iter != end(), resorting to DeleteRange"
<< dendl;
bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
}
} else {
auto bounds = KeyValueDB::IteratorBounds();
bounds.lower_bound = start;
bounds.upper_bound = end;
ceph_assert(p_iter->second.handles.size() >= 1);
for (auto cf : p_iter->second.handles) {
uint64_t cnt = db->delete_range_threshold;
cnt = db->get_delete_range_threshold();
uint64_t cnt0 = cnt;
bat.SetSavePoint();
rocksdb::Iterator* it = db->new_shard_iterator(cf);
ceph_assert(it != nullptr);
for (it->Seek(start);
it->Valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
it->Next()) {
auto it = db->new_shard_iterator(cf, prefix, bounds);
for (it->lower_bound(start);
it->valid() && (--cnt) != 0;
it->next()) {
bat.Delete(cf, it->key());
}
ldout(db->cct, 10) << __func__
<< " count = " << cnt0 - cnt
<< dendl;
if (cnt == 0) {
ldout(db->cct, 10) << __func__ << " p_iter != end(), resorting to DeleteRange"
<< dendl;
Expand All @@ -1789,7 +1808,6 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
} else {
bat.PopSavePoint();
}
delete it;
}
}
ldout(db->cct, 10) << __func__ << " end" << dendl;
Expand Down Expand Up @@ -3021,9 +3039,23 @@ KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, Itera
}
}

rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf)
RocksDBStore::WholeSpaceIterator RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf)
{
return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
this,
cf,
0);
}

KeyValueDB::Iterator RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf,
const std::string& prefix,
IteratorBounds bounds)
{
return db->NewIterator(rocksdb::ReadOptions(), cf);
return std::make_shared<CFIteratorImpl>(
this,
prefix,
cf,
std::move(bounds));
}

RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts)
Expand Down
12 changes: 8 additions & 4 deletions src/kv/RocksDBStore.h
Expand Up @@ -200,7 +200,10 @@ class RocksDBStore : public KeyValueDB {
/// compact the underlying rocksdb store
bool compact_on_mount;
bool disableWAL;
const uint64_t delete_range_threshold;
uint64_t get_delete_range_threshold() const {
return cct->_conf.get_val<uint64_t>("rocksdb_delete_range_threshold");
}

void compact() override;

void compact_async() override {
Expand Down Expand Up @@ -245,8 +248,7 @@ class RocksDBStore : public KeyValueDB {
compact_queue_stop(false),
compact_thread(this),
compact_on_mount(false),
disableWAL(false),
delete_range_threshold(cct->_conf.get_val<uint64_t>("rocksdb_delete_range_threshold"))
disableWAL(false)
{}

~RocksDBStore() override;
Expand Down Expand Up @@ -388,7 +390,9 @@ class RocksDBStore : public KeyValueDB {
Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0, IteratorBounds = IteratorBounds()) override;
private:
/// this iterator spans single cf
rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
WholeSpaceIterator new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
Iterator new_shard_iterator(rocksdb::ColumnFamilyHandle* cf,
const std::string& prefix, IteratorBounds bound);
public:
/// Utility
static std::string combine_strings(const std::string &prefix, const std::string &value) {
Expand Down

0 comments on commit c0cafa4

Please sign in to comment.