Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the unify WAL iterator #2040

Merged
merged 14 commits into from
Jan 26, 2024
129 changes: 125 additions & 4 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "db_util.h"

namespace engine {
DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot)
DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot)
: storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_));
Expand Down Expand Up @@ -80,7 +80,7 @@ void DBIterator::Reset() {
if (metadata_iter_) metadata_iter_.reset();
}

void DBIterator::Seek(const std::string& target) {
void DBIterator::Seek(const std::string &target) {
if (!metadata_iter_) return;

// Iterate with the slot id but storage didn't enable slot id encoding
Expand Down Expand Up @@ -112,7 +112,7 @@ std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
return std::make_unique<SubKeyIterator>(storage_, read_options_, type, std::move(prefix));
}

SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
Expand Down Expand Up @@ -145,7 +145,7 @@ Slice SubKeyIterator::UserKey() const {
return internal_key.GetSubKey();
}

rocksdb::ColumnFamilyHandle* SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; }
rocksdb::ColumnFamilyHandle *SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; }

Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); }

Expand All @@ -164,4 +164,125 @@ void SubKeyIterator::Reset() {
if (iter_) iter_.reset();
}

rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) {
if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypePut, column_family_id, key.ToString(), value.ToString());
return rocksdb::Status::OK();
}

rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) {
if (slot_ != -1 && slot_ != ExtractSlotId(key)) {
return rocksdb::Status::OK();
}
items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id, key.ToString(), std::string{});
return rocksdb::Status::OK();
}

rocksdb::Status WALBatchExtractor::DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key,
const rocksdb::Slice &end_key) {
items_.emplace_back(WALItem::Type::kTypeDeleteRange, column_family_id, begin_key.ToString(), end_key.ToString());
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
return rocksdb::Status::OK();
}

void WALBatchExtractor::LogData(const rocksdb::Slice &blob) {
items_.emplace_back(WALItem::Type::kTypeLogData, 0, blob.ToString(), std::string{});
};

void WALBatchExtractor::Clear() { items_.clear(); }

WALBatchExtractor::Iter WALBatchExtractor::GetIter() { return Iter(&items_); }

bool WALBatchExtractor::Iter::Valid() { return items_ && cur_ < items_->size(); }

void WALBatchExtractor::Iter::Next() { cur_++; }

WALItem WALBatchExtractor::Iter::Value() {
if (!Valid()) {
return {};
}
return (*items_)[cur_];
}

void WALIterator::Reset() {
if (iter_) {
iter_.reset();
}
if (batch_iter_) {
batch_iter_.reset();
}
extractor_.Clear();
next_batch_seq_ = 0;
}

bool WALIterator::Valid() const { return (batch_iter_ && batch_iter_->Valid()) || (iter_ && iter_->Valid()); }

void WALIterator::nextBatch() {
if (!iter_ || !iter_->Valid()) {
Reset();
return;
}

auto batch = iter_->GetBatch();
if (batch.sequence != next_batch_seq_ || !batch.writeBatchPtr) {
Reset();
return;
}

extractor_.Clear();

auto s = batch.writeBatchPtr->Iterate(&extractor_);
if (!s.ok()) {
Reset();
return;
}

next_batch_seq_ += batch.writeBatchPtr->Count();
batch_iter_ = std::make_unique<WALBatchExtractor::Iter>(extractor_.GetIter());
}

void WALIterator::Seek(rocksdb::SequenceNumber seq) {
if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
Reset();
return;
}

auto s = storage_->GetWALIter(seq, &iter_);
if (!s.IsOK()) {
Reset();
return;
}

next_batch_seq_ = seq;

nextBatch();
}

WALItem WALIterator::Item() {
if (batch_iter_ && batch_iter_->Valid()) {
return batch_iter_->Value();
}
return {};
}

rocksdb::SequenceNumber WALIterator::NextSequenceNumber() const { return next_batch_seq_; }

void WALIterator::Next() {
if (!Valid()) {
Reset();
return;
}

if (batch_iter_ && batch_iter_->Valid()) {
batch_iter_->Next();
if (batch_iter_->Valid()) {
return;
}
}

iter_->Next();
nextBatch();
}

} // namespace engine
82 changes: 82 additions & 0 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,86 @@ class DBIterator {
std::unique_ptr<SubKeyIterator> subkey_iter_;
};

struct WALItem {
enum class Type : uint8_t {
kTypeInvalid = 0,
kTypeLogData = 1,
kTypePut = 2,
kTypeDelete = 3,
kTypeDeleteRange = 4,
};

WALItem() = default;
WALItem(WALItem::Type t, uint32_t cf_id, std::string k, std::string v)
: type(t), column_family_id(cf_id), key(std::move(k)), value(std::move(v)) {}

WALItem::Type type = WALItem::Type::kTypeInvalid;
uint32_t column_family_id = 0;
std::string key;
std::string value;
};

class WALBatchExtractor : public rocksdb::WriteBatch::Handler {
public:
// If set slot, storage must enable slot id encoding
explicit WALBatchExtractor(int slot = -1) : slot_(slot) {}

rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) override;

rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override;

rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key,
const rocksdb::Slice &end_key) override;

void LogData(const rocksdb::Slice &blob) override;

void Clear();

class Iter {
friend class WALBatchExtractor;

public:
bool Valid();
void Next();
WALItem Value();

private:
explicit Iter(std::vector<WALItem> *items) : items_(items), cur_(0) {}
std::vector<WALItem> *items_;
size_t cur_;
};

WALBatchExtractor::Iter GetIter();

private:
std::vector<WALItem> items_;
int slot_;
};

class WALIterator {
public:
explicit WALIterator(engine::Storage *storage, int slot = -1)
: storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){};
~WALIterator() = default;

bool Valid() const;
void Seek(rocksdb::SequenceNumber seq);
void Next();
WALItem Item();

rocksdb::SequenceNumber NextSequenceNumber() const;
void Reset();

private:
void nextBatch();

engine::Storage *storage_;
int slot_;

std::unique_ptr<rocksdb::TransactionLogIterator> iter_;
WALBatchExtractor extractor_;
std::unique_ptr<WALBatchExtractor::Iter> batch_iter_;
rocksdb::SequenceNumber next_batch_seq_;
};

} // namespace engine
11 changes: 11 additions & 0 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ bool InternalKey::operator==(const InternalKey &that) const {
return version_ == that.version_;
}

// Must slot encoded
uint16_t ExtractSlotId(Slice ns_key) {
uint8_t namespace_size = 0;
GetFixed8(&ns_key, &namespace_size);
ns_key.remove_prefix(namespace_size);

uint16_t slot_id = HASH_SLOTS_SIZE;
GetFixed16(&ns_key, &slot_id);
return slot_id;
}

template <typename T>
std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded) {
uint8_t namespace_size = 0;
Expand Down
1 change: 1 addition & 0 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ struct KeyNumStats {
uint64_t avg_ttl = 0;
};

[[nodiscard]] uint16_t ExtractSlotId(Slice ns_key);
template <typename T = Slice>
[[nodiscard]] std::tuple<T, T> ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded);
[[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice &key, bool slot_id_encoded);
Expand Down