Skip to content

Commit

Permalink
feat(cluster): impl slot data delete (#1224)
Browse files Browse the repository at this point in the history
* feat(cluster): impl slot data delete

Signed-off-by: adi_holden <adi@dragonflydb.io>
  • Loading branch information
adiholden committed May 17, 2023
1 parent 5a76c91 commit 5c46c3d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
51 changes: 51 additions & 0 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,44 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
return true;
}

void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
// Slot deletion can take time as it traverses all the database, hence it runs in fiber.
// We want to flush all the data of a slot that was added till the time the call to FlushSlotsFb
// was made. Therefore we delete slots entries with version < next_version
uint64_t next_version = NextVersion();

std::string tmp;
auto del_entry_cb = [&](PrimeTable::iterator it) {
std::string_view key = it->first.GetSlice(&tmp);
SlotId sid = ClusterConfig::KeySlot(key);
if (slot_ids.contains(sid) && it.GetVersion() < next_version) {
PerformDeletion(it, shard_owner(), db_arr_[0].get());
}
return true;
};

PrimeTable* pt = &db_arr_[0]->prime;
PrimeTable::Cursor cursor;
uint64_t i = 0;
do {
PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb);
++i;
cursor = next;
if (i % 100 == 0) {
ThisFiber::Yield();
}

} while (cursor);
mi_heap_collect(ServerState::tlocal()->data_heap(), true);
}

void DbSlice::FlushSlots(SlotSet slot_ids) {
InvalidateSlotWatches(slot_ids);
util::MakeFiber([this, slot_ids = std::move(slot_ids)]() mutable {
FlushSlotsFb(slot_ids);
}).Detach();
}

void DbSlice::FlushDb(DbIndex db_ind) {
// TODO: to add preeemptiveness by yielding inside clear.

Expand Down Expand Up @@ -548,6 +586,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
for (auto& db : all_dbs) {
db.reset();
}
mi_heap_collect(ServerState::tlocal()->data_heap(), true);
}).Detach();
}

Expand Down Expand Up @@ -1068,4 +1107,16 @@ void DbSlice::InvalidateDbWatches(DbIndex db_indx) {
}
}

void DbSlice::InvalidateSlotWatches(const SlotSet& slot_ids) {
for (const auto& [key, conn_list] : db_arr_[0]->watched_keys) {
SlotId sid = ClusterConfig::KeySlot(key);
if (!slot_ids.contains(sid)) {
continue;
}
for (auto conn_ptr : conn_list) {
conn_ptr->watched_dirty.store(true, memory_order_relaxed);
}
}
}

} // namespace dfly
15 changes: 12 additions & 3 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ class DbSlice {
*/
void FlushDb(DbIndex db_ind);

using SlotSet = absl::flat_hash_set<SlotId>;
// Flushes the data of given slot ids.
void FlushSlots(SlotSet slot_ids);

EngineShard* shard_owner() const {
return owner_;
}
Expand Down Expand Up @@ -310,14 +314,19 @@ class DbSlice {
// Unregisted all watched key entries for connection.
void UnregisterConnectionWatches(ConnectionState::ExecInfo* exec_info);

// Invalidate all watched keys in database. Used on FLUSH.
void InvalidateDbWatches(DbIndex db_indx);

private:
std::pair<PrimeIterator, bool> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update) noexcept(false);

void FlushSlotsFb(const SlotSet& slot_ids);

// Invalidate all watched keys in database. Used on FLUSH.
void InvalidateDbWatches(DbIndex db_indx);

// Invalidate all watched keys for given slots. Used on FlushSlots.
void InvalidateSlotWatches(const SlotSet& slot_ids);

void CreateDb(DbIndex index);
size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table);

Expand Down

0 comments on commit 5c46c3d

Please sign in to comment.