Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce NewMultiCfIterator API #12153

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1a5301c
MultiCfIterator Implementation - SeekToFirst() and Next() only
jaykorean Dec 19, 2023
ef13ec9
Take key comparator from users explicitly (with default set as Bytewi…
jaykorean Dec 19, 2023
571cc82
Add NewEmptyMultiColumnFamilyIterator and NewErrorMultiColumnFamilyIt…
jaykorean Dec 19, 2023
e326537
Remove util/heap.h dependency
jaykorean Dec 19, 2023
306712f
attempt to fix linter
jaykorean Dec 21, 2023
59f14f7
Rebase and merge
jaykorean Feb 12, 2024
cc072e5
Revise comment to include wide columns handling
jaykorean Feb 12, 2024
0cdd17b
Remove unnecessary externs
jaykorean Feb 12, 2024
452cc5c
Improve Next() algorithm
jaykorean Feb 12, 2024
6f2b8ec
Add check status
jaykorean Feb 12, 2024
52af201
Use unique_ptr instead of raw pointer in the API
jaykorean Feb 15, 2024
978043d
Use the comparator from the first cf and make sure it matches compara…
jaykorean Feb 15, 2024
6b8d06d
multi_cf_iterator->columns() implementation with the same invariant a…
jaykorean Feb 15, 2024
868239b
fix mingw build + other minor fixes
jaykorean Feb 16, 2024
341af76
linter fix
jaykorean Feb 16, 2024
c074aaf
Rebase and remove unnecessary include
jaykorean Feb 20, 2024
5a852d9
Fix linter and rebase
jaykorean Feb 23, 2024
f160b4e
Address feedback
jaykorean Feb 28, 2024
de2f338
Make MultiCFIter not valid for only one CF
jaykorean Feb 28, 2024
d221bf3
Use the generic iterator interface instead of multi_cf_iterator for u…
jaykorean Feb 28, 2024
8a22393
rename MultiCfIteratorImpl to MultiCfIterator
jaykorean Mar 4, 2024
c8b5805
remove unused include
jaykorean Mar 4, 2024
3a42a58
remove attribute_groups() api for now
jaykorean Mar 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ else()
endif()
if(MINGW)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wa,-mbig-obj")
add_definitions(-D_POSIX_C_SOURCE=1)
endif()
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
Expand Down Expand Up @@ -691,6 +692,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/multi_cf_iterator.cc
db/output_validator.cc
db/periodic_task_scheduler.cc
db/range_del_aggregator.cc
Expand Down Expand Up @@ -1343,6 +1345,7 @@ if(WITH_TESTS)
db/memtable_list_test.cc
db/merge_helper_test.cc
db/merge_test.cc
db/multi_cf_iterator_test.cc
db/options_file_test.cc
db/perf_context_test.cc
db/periodic_task_scheduler_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY)
dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/multi_cf_iterator.cc",
"db/output_validator.cc",
"db/periodic_task_scheduler.cc",
"db/range_del_aggregator.cc",
Expand Down Expand Up @@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="multi_cf_iterator_test",
srcs=["db/multi_cf_iterator_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="object_registry_test",
srcs=["utilities/object_registry_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
27 changes: 26 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/multi_cf_iterator.h"
#include "db/periodic_task_scheduler.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h"
Expand Down Expand Up @@ -3735,6 +3735,31 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
return db_iter;
}

std::unique_ptr<Iterator> DBImpl::NewMultiCfIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
if (column_families.size() == 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(
Status::InvalidArgument("No Column Family was provided")));
}
const Comparator* first_comparator = column_families[0]->GetComparator();
for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
if (first_comparator != cf_comparator &&
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(Status::InvalidArgument(
"Different comparators are being used across CFs")));
}
}
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (s.ok()) {
return std::make_unique<MultiCfIterator>(first_comparator, column_families,
std::move(child_iterators));
}
return std::unique_ptr<Iterator>(NewErrorIterator(s));
}

Status DBImpl::NewIterators(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
Expand Down
7 changes: 7 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ class DBImpl : public DB {

const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;

// UNDER CONSTRUCTION - DO NOT USE
// Return a cross-column-family iterator from a consistent database state.
std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;

// Create a timestamped snapshot. This snapshot can be shared by multiple
// readers. If any of them uses it for write conflict checking, then
// is_write_conflict_boundary is true. For simplicity, set it to true by
Expand Down
8 changes: 8 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3199,6 +3199,14 @@ class ModelDB : public DB {
std::vector<Iterator*>* /*iterators*/) override {
return Status::NotSupported("Not supported yet");
}

// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;
}

const Snapshot* GetSnapshot() override {
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;
Expand Down
62 changes: 62 additions & 0 deletions db/multi_cf_iterator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/multi_cf_iterator.h"

#include <cassert>

namespace ROCKSDB_NAMESPACE {

void MultiCfIterator::SeekToFirst() {
Reset();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& cfh = cfh_iter_pair.first;
auto& iter = cfh_iter_pair.second;
iter->SeekToFirst();
if (iter->Valid()) {
assert(iter->status().ok());
min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
} else {
considerStatus(iter->status());
}
++i;
}
}

void MultiCfIterator::Next() {
assert(Valid());
// 1. Keep the top iterator (by popping it from the heap)
// 2. Make sure all others have iterated past the top iterator key slice
// 3. Advance the top iterator, and add it back to the heap if valid
auto top = min_heap_.top();
min_heap_.pop();
if (!min_heap_.empty()) {
auto* current = min_heap_.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
current->Next();
if (current->Valid()) {
min_heap_.replace_top(min_heap_.top());
} else {
considerStatus(current->status());
min_heap_.pop();
}
if (!min_heap_.empty()) {
current = min_heap_.top().iterator;
}
}
}
top.iterator->Next();
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
min_heap_.push(top);
} else {
considerStatus(top.iterator->status());
}
}

} // namespace ROCKSDB_NAMESPACE
116 changes: 116 additions & 0 deletions db/multi_cf_iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"

namespace ROCKSDB_NAMESPACE {

// UNDER CONSTRUCTION - DO NOT USE
// A cross-column-family iterator from a consistent database state.
// When the same key exists in more than one column families, the iterator
// selects the value from the first column family containing the key, in the
// order provided in the `column_families` parameter.
class MultiCfIterator : public Iterator {
public:
MultiCfIterator(const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: comparator_(comparator),
min_heap_(MultiCfMinHeapItemComparator(comparator_)) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs_.emplace_back(
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
}
}
~MultiCfIterator() override { status_.PermitUncheckedError(); }

// No copy allowed
MultiCfIterator(const MultiCfIterator&) = delete;
MultiCfIterator& operator=(const MultiCfIterator&) = delete;

private:
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
ReadOptions read_options_;
Status status_;

AttributeGroups attribute_groups_;

struct MultiCfIteratorInfo {
Iterator* iterator;
ColumnFamilyHandle* cfh;
int order;
};

class MultiCfMinHeapItemComparator {
public:
explicit MultiCfMinHeapItemComparator(const Comparator* comparator)
: comparator_(comparator) {}

bool operator()(const MultiCfIteratorInfo& a,
const MultiCfIteratorInfo& b) const {
assert(a.iterator);
assert(b.iterator);
assert(a.iterator->Valid());
assert(b.iterator->Valid());
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
assert(c != 0 || a.order != b.order);
return c == 0 ? a.order - b.order > 0 : c > 0;
}

private:
const Comparator* comparator_;
};

const Comparator* comparator_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo, MultiCfMinHeapItemComparator>;
MultiCfMinHeap min_heap_;
// TODO: MaxHeap for Reverse Iteration
// TODO: Lower and Upper bounds

Slice key() const override {
assert(Valid());
return min_heap_.top().iterator->key();
}
bool Valid() const override { return !min_heap_.empty() && status_.ok(); }
Status status() const override { return status_; }
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
status_ = std::move(s);
}
}
void Reset() {
min_heap_.clear();
status_ = Status::OK();
}

void SeekToFirst() override;
void Next() override;

// TODO - Implement these
void Seek(const Slice& /*target*/) override {}
void SeekForPrev(const Slice& /*target*/) override {}
void SeekToLast() override {}
void Prev() override { assert(false); }
Slice value() const override {
assert(Valid());
return min_heap_.top().iterator->value();
}
const WideColumns& columns() const override {
assert(Valid());
return min_heap_.top().iterator->columns();
}
};

} // namespace ROCKSDB_NAMESPACE