Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blob support to DBIter #7731

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bf6cd03
Store Version pointer in DBIter
ltamasi Nov 19, 2020
64b38ac
Make it possible to use different input and output Slices with Versio…
ltamasi Nov 23, 2020
0245e5f
Add a blob_value_ member to DBIter, expose it from value() when using…
ltamasi Nov 24, 2020
3752447
Read blob value in FindNextUserEntryInternal/FindValueForCurrentKey(U…
ltamasi Nov 24, 2020
5863278
Return 'BlobDB does not support merge operator' error status when hit…
ltamasi Nov 24, 2020
43abd51
Add some TODOs
ltamasi Nov 24, 2020
fc4f95c
Use the correct user key in FindValueForCurrentKey(UsingSeek)
ltamasi Nov 25, 2020
7aaf11f
Fix up method comments
ltamasi Nov 30, 2020
65ffda9
Clean up some parameter names
ltamasi Nov 30, 2020
52fedc7
Factor out the blob reading logic into a helper method SetBlobValueIf…
ltamasi Nov 30, 2020
6b6742c
Add support for iter_start_{seqnum,ts}
ltamasi Nov 30, 2020
e8acd9f
Move the setting of is_blob_ into SetBlobValueIfNeeded
ltamasi Nov 30, 2020
14e1bcb
Remove a couple of TODOs
ltamasi Nov 30, 2020
1d78128
Pass in allow_blob==true in SstFileReader so that blob indexes are ex…
ltamasi Nov 30, 2020
5f60493
Pass read_tier and verify_checksums to GetBlob
ltamasi Dec 1, 2020
e8a31d1
Rename allow_blob to expose_blob_index
ltamasi Dec 1, 2020
23051d5
More of the same
ltamasi Dec 1, 2020
12cc9c6
Fix up DBBlobIndexTest.Iterate
ltamasi Dec 1, 2020
05daf70
Add assertion to SetBlobValueIfNeeded
ltamasi Dec 1, 2020
63a4e2c
Add a unit test
ltamasi Dec 1, 2020
6c021e7
Small cleanup
ltamasi Dec 1, 2020
e851ea4
Fix formatting
ltamasi Dec 1, 2020
3cd9dc2
Fix handling of iter_start_{seqnum,ts} (ouch)
ltamasi Dec 4, 2020
89a99d3
Revert to original if..else if..else style control flow in DBIter::va…
ltamasi Dec 4, 2020
619b341
Name NewDBIterator's parameters (like sequence/max_sequential_skip_in…
ltamasi Dec 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,18 @@ Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
return db_iter_->GetProperty(prop_name, prop);
}

void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob,
bool allow_refresh) {
void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
cf_options.user_comparator, nullptr, sequence,
true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob);
cf_options.user_comparator, /* iter */ nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
Expand Down Expand Up @@ -72,8 +70,9 @@ Status ArenaWrappedDBIter::Refresh() {
read_callback_->Refresh(latest_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
sv->current, latest_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
allow_refresh_);

InternalIterator* internal_iter = db_impl_->NewInternalIterator(
Expand All @@ -90,16 +89,16 @@ Status ArenaWrappedDBIter::Refresh() {
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh) {
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh);
iter->Init(env, read_options, cf_options, mutable_cf_options, version,
sequence, max_sequential_skip_in_iterations, version_number,
read_callback, db_impl, cfd, expose_blob_index, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, allow_blob);
iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index);
}

return iter;
Expand Down
21 changes: 11 additions & 10 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
namespace ROCKSDB_NAMESPACE {

class Arena;
class Version;

// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed to be allocated. This class is used as an entry point of
Expand Down Expand Up @@ -72,20 +73,20 @@ class ArenaWrappedDBIter : public Iterator {

void Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh);
bool expose_blob_index, bool allow_refresh);

// Store some parameters so we can refresh the iterator at a later point
// with these same params
void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd,
ReadCallback* read_callback, bool allow_blob) {
ReadCallback* read_callback, bool expose_blob_index) {
db_impl_ = db_impl;
cfd_ = cfd;
read_callback_ = read_callback;
allow_blob_ = allow_blob;
expose_blob_index_ = expose_blob_index;
}

private:
Expand All @@ -96,7 +97,7 @@ class ArenaWrappedDBIter : public Iterator {
DBImpl* db_impl_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool allow_blob_ = false;
bool expose_blob_index_ = false;
bool allow_refresh_ = true;
};

Expand All @@ -106,9 +107,9 @@ class ArenaWrappedDBIter : public Iterator {
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false,
bool allow_refresh = true);
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool expose_blob_index = false, bool allow_refresh = true);
} // namespace ROCKSDB_NAMESPACE
13 changes: 8 additions & 5 deletions db/blob/db_blob_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class DBBlobIndexTest : public DBTestBase {
ArenaWrappedDBIter* GetBlobIterator() {
return dbfull()->NewIteratorImpl(
ReadOptions(), cfd(), dbfull()->GetLatestSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/);
nullptr /*read_callback*/, true /*expose_blob_index*/);
}

Options GetTestOptions() {
Expand Down Expand Up @@ -238,8 +238,11 @@ TEST_F(DBBlobIndexTest, Updated) {
}
}

// Iterator should get blob value if allow_blob flag is set,
// otherwise return Status::NotSupported status.
// Note: the following test case pertains to the StackableDB-based BlobDB
// implementation. When a blob iterator is used, it should set the
// expose_blob_index flag for the underlying DBIter, and retrieve/return the
// corresponding blob value. If a regular DBIter is created (i.e.
// expose_blob_index is not set), it should return Status::Corruption.
TEST_F(DBBlobIndexTest, Iterate) {
const std::vector<std::vector<ValueType>> data = {
/*00*/ {kTypeValue},
Expand Down Expand Up @@ -384,8 +387,8 @@ TEST_F(DBBlobIndexTest, Iterate) {
MoveDataTo(tier);

// Normal iterator
verify(1, Status::kNotSupported, "", "", create_normal_iterator);
verify(3, Status::kNotSupported, "", "", create_normal_iterator);
verify(1, Status::kCorruption, "", "", create_normal_iterator);
verify(3, Status::kCorruption, "", "", create_normal_iterator);
verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
create_normal_iterator);
verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
Expand Down
12 changes: 6 additions & 6 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2804,7 +2804,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
/* allow_unprepared_value */ true);
result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
this, cfd);
#endif
Expand All @@ -2825,7 +2825,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob,
bool expose_blob_index,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);

Expand Down Expand Up @@ -2890,9 +2890,9 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, allow_blob,
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, expose_blob_index,
read_options.snapshot != nullptr ? false : allow_refresh);

InternalIterator* internal_iter = NewInternalIterator(
Expand Down Expand Up @@ -2930,7 +2930,7 @@ Status DBImpl::NewIterators(
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback, this, cfd));
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ class DBImpl : public DB {
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob = false,
bool expose_blob_index = false,
bool allow_refresh = true);

virtual SequenceNumber GetLastPublishedSequence() const {
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
ReadCallback* read_callback = nullptr; // No read callback provided.
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
read_seq,
super_version->current, read_seq,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter = NewInternalIterator(
Expand Down Expand Up @@ -115,7 +115,8 @@ Status DBImplReadOnly::NewIterators(
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
auto* sv = cfd->GetSuperVersion()->Ref();
auto* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback);
auto* internal_iter = NewInternalIterator(
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
snapshot,
super_version->current, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter = NewInternalIterator(
Expand Down
Loading