Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blob support to DBIter #7731

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bf6cd03
Store Version pointer in DBIter
ltamasi Nov 19, 2020
64b38ac
Make it possible to use different input and output Slices with Versio…
ltamasi Nov 23, 2020
0245e5f
Add a blob_value_ member to DBIter, expose it from value() when using…
ltamasi Nov 24, 2020
3752447
Read blob value in FindNextUserEntryInternal/FindValueForCurrentKey(U…
ltamasi Nov 24, 2020
5863278
Return 'BlobDB does not support merge operator' error status when hit…
ltamasi Nov 24, 2020
43abd51
Add some TODOs
ltamasi Nov 24, 2020
fc4f95c
Use the correct user key in FindValueForCurrentKey(UsingSeek)
ltamasi Nov 25, 2020
7aaf11f
Fix up method comments
ltamasi Nov 30, 2020
65ffda9
Clean up some parameter names
ltamasi Nov 30, 2020
52fedc7
Factor out the blob reading logic into a helper method SetBlobValueIf…
ltamasi Nov 30, 2020
6b6742c
Add support for iter_start_{seqnum,ts}
ltamasi Nov 30, 2020
e8acd9f
Move the setting of is_blob_ into SetBlobValueIfNeeded
ltamasi Nov 30, 2020
14e1bcb
Remove a couple of TODOs
ltamasi Nov 30, 2020
1d78128
Pass in allow_blob==true in SstFileReader so that blob indexes are ex…
ltamasi Nov 30, 2020
5f60493
Pass read_tier and verify_checksums to GetBlob
ltamasi Dec 1, 2020
e8a31d1
Rename allow_blob to expose_blob_index
ltamasi Dec 1, 2020
23051d5
More of the same
ltamasi Dec 1, 2020
12cc9c6
Fix up DBBlobIndexTest.Iterate
ltamasi Dec 1, 2020
05daf70
Add assertion to SetBlobValueIfNeeded
ltamasi Dec 1, 2020
63a4e2c
Add a unit test
ltamasi Dec 1, 2020
6c021e7
Small cleanup
ltamasi Dec 1, 2020
e851ea4
Fix formatting
ltamasi Dec 1, 2020
3cd9dc2
Fix handling of iter_start_{seqnum,ts} (ouch)
ltamasi Dec 4, 2020
89a99d3
Revert to original if..else if..else style control flow in DBIter::va…
ltamasi Dec 4, 2020
619b341
Name NewDBIterator's parameters (like sequence/max_sequential_skip_in…
ltamasi Dec 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
return db_iter_->GetProperty(prop_name, prop);
}

void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob,
bool allow_refresh) {
void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
cf_options.user_comparator, nullptr, sequence,
true, max_sequential_skip_in_iteration,
cf_options.user_comparator, nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob);
sv_number_ = version_number;
read_options_ = read_options;
Expand Down Expand Up @@ -72,7 +70,8 @@ Status ArenaWrappedDBIter::Refresh() {
read_callback_->Refresh(latest_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->current, latest_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
allow_refresh_);

Expand All @@ -90,14 +89,14 @@ Status ArenaWrappedDBIter::Refresh() {
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh) {
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh);
iter->Init(env, read_options, cf_options, mutable_cf_options, version,
sequence, max_sequential_skip_in_iterations, version_number,
read_callback, db_impl, cfd, allow_blob, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, allow_blob);
}
Expand Down
13 changes: 7 additions & 6 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
namespace ROCKSDB_NAMESPACE {

class Arena;
class Version;

// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed to be allocated. This class is used as an entry point of
Expand Down Expand Up @@ -72,7 +73,7 @@ class ArenaWrappedDBIter : public Iterator {

void Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
Expand Down Expand Up @@ -106,9 +107,9 @@ class ArenaWrappedDBIter : public Iterator {
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false,
bool allow_refresh = true);
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool allow_blob = false, bool allow_refresh = true);
} // namespace ROCKSDB_NAMESPACE
8 changes: 4 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2804,7 +2804,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
/* allow_unprepared_value */ true);
result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
this, cfd);
#endif
Expand Down Expand Up @@ -2890,8 +2890,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, allow_blob,
read_options.snapshot != nullptr ? false : allow_refresh);

Expand Down Expand Up @@ -2930,7 +2930,7 @@ Status DBImpl::NewIterators(
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback, this, cfd));
}
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
ReadCallback* read_callback = nullptr; // No read callback provided.
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
read_seq,
super_version->current, read_seq,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter = NewInternalIterator(
Expand Down Expand Up @@ -115,7 +115,8 @@ Status DBImplReadOnly::NewIterators(
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
auto* sv = cfd->GetSuperVersion()->Ref();
auto* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback);
auto* internal_iter = NewInternalIterator(
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
snapshot,
super_version->current, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter = NewInternalIterator(
Expand Down
17 changes: 10 additions & 7 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ namespace ROCKSDB_NAMESPACE {
DBIter::DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
bool arena_mode, uint64_t max_sequential_skip_in_iterations,
const Comparator* cmp, InternalIterator* iter,
const Version* version, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob)
: prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
Expand All @@ -46,6 +47,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
user_comparator_(cmp),
merge_operator_(cf_options.merge_operator),
iter_(iter),
version_(version),
read_callback_(read_callback),
sequence_(s),
statistics_(cf_options.statistics),
Expand Down Expand Up @@ -1458,15 +1460,16 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator,
InternalIterator* internal_iter,
InternalIterator* internal_iter, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob) {
DBIter* db_iter = new DBIter(
env, read_options, cf_options, mutable_cf_options, user_key_comparator,
internal_iter, sequence, false, max_sequential_skip_in_iterations,
read_callback, db_impl, cfd, allow_blob);
DBIter* db_iter =
new DBIter(env, read_options, cf_options, mutable_cf_options,
user_key_comparator, internal_iter, version, sequence, false,
max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
allow_blob);
return db_iter;
}

Expand Down
16 changes: 11 additions & 5 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

namespace ROCKSDB_NAMESPACE {

class Version;

// This file declares the factory functions of DBIter, in its original form
// or a wrapped form with class ArenaWrappedDBIter, which is defined here.
// Class DBIter, which is declared and implemented inside db_iter.cc, is
Expand Down Expand Up @@ -114,8 +116,8 @@ class DBIter final : public Iterator {
DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
InternalIterator* iter, const Version* version, SequenceNumber s,
bool arena_mode, uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob);

Expand Down Expand Up @@ -293,6 +295,7 @@ class DBIter final : public Iterator {
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
IteratorWrapper iter_;
const Version* version_;
ReadCallback* read_callback_;
// Max visible sequence number. It is normally the snapshot seq unless we have
// uncommitted data in db as in WriteUnCommitted.
Expand Down Expand Up @@ -335,6 +338,8 @@ class DBIter final : public Iterator {
// Expect the inner iterator to maintain a total order.
// prefix_extractor_ must be non-NULL if the value is false.
const bool expect_total_order_inner_iter_;
// Whether the iterator is allowed to expose blob references. Set to true when
// the stacked BlobDB implementation is used, false otherwise.
bool allow_blob_;
bool is_blob_;
bool arena_mode_;
Expand Down Expand Up @@ -367,8 +372,9 @@ extern Iterator* NewDBIterator(
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator, InternalIterator* internal_iter,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false);
const Version* version, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool allow_blob = false);

} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion db/db_iter_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ TEST_F(DBIteratorStressTest, StressTest) {
db_iter.reset(NewDBIterator(
env_, ropt, ImmutableCFOptions(options),
MutableCFOptions(options), BytewiseComparator(),
internal_iter, sequence,
internal_iter, nullptr, sequence,
options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
}
Expand Down
Loading