Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WritePrepared Txn: Compaction/Flush #2926

Closed
wants to merge 16 commits into from
5 changes: 3 additions & 2 deletions CMakeLists.txt
Expand Up @@ -543,10 +543,11 @@ set(SOURCES
utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction.cc
utilities/transactions/transaction_base.cc
utilities/transactions/pessimistic_transaction.cc
utilities/transactions/pessimistic_transaction_db.cc
utilities/transactions/snapshot_checker.cc
utilities/transactions/transaction_base.cc
utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/pessimistic_transaction.cc
utilities/transactions/transaction_lock_mgr.cc
utilities/transactions/transaction_util.cc
utilities/transactions/write_prepared_txn.cc
Expand Down
5 changes: 3 additions & 2 deletions TARGETS
Expand Up @@ -248,10 +248,11 @@ cpp_library(
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/pessimistic_transaction.cc",
"utilities/transactions/pessimistic_transaction_db.cc",
"utilities/transactions/snapshot_checker.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/pessimistic_transaction.cc",
"utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_txn.cc",
Expand Down
4 changes: 2 additions & 2 deletions db/builder.cc
Expand Up @@ -70,7 +70,7 @@ Status BuildTable(
uint32_t column_family_id, const std::string& column_family_name,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression,
SnapshotChecker* snapshot_checker, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
Expand Down Expand Up @@ -135,7 +135,7 @@ Status BuildTable(

CompactionIterator c_iter(
iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
&snapshots, earliest_write_conflict_snapshot, env,
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
true /* internal key corruption is not ok */, range_del_agg.get());
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
Expand Down
3 changes: 2 additions & 1 deletion db/builder.h
Expand Up @@ -29,6 +29,7 @@ struct FileMetaData;
class Env;
struct EnvOptions;
class Iterator;
class SnapshotChecker;
class TableCache;
class VersionEdit;
class TableBuilder;
Expand Down Expand Up @@ -71,7 +72,7 @@ extern Status BuildTable(
uint32_t column_family_id, const std::string& column_family_name,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression,
SnapshotChecker* snapshot_checker, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,
Expand Down
145 changes: 93 additions & 52 deletions db/compaction_iterator.cc
Expand Up @@ -4,6 +4,9 @@
// (found in the LICENSE.Apache file in the root directory).

#include "db/compaction_iterator.h"

#include "db/snapshot_checker.h"
#include "port/likely.h"
#include "rocksdb/listener.h"
#include "table/internal_iterator.h"

Expand Down Expand Up @@ -37,23 +40,25 @@ CompactionEventListener::CompactionListenerValueType fromInternalValueType(
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
CompactionEventListener* compaction_listener,
const std::atomic<bool>* shutting_down)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, env, expect_valid_internal_key,
range_del_agg,
earliest_write_conflict_snapshot, snapshot_checker, env,
expect_valid_internal_key, range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, compaction_listener, shutting_down) {}

CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
Expand All @@ -64,6 +69,7 @@ CompactionIterator::CompactionIterator(
merge_helper_(merge_helper),
snapshots_(snapshots),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
env_(env),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
Expand Down Expand Up @@ -166,6 +172,55 @@ void CompactionIterator::Next() {
PrepareOutput();
}

void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
ignore_snapshots_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
CompactionFilter::Decision filter;
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
{
StopWatchNano timer(env_, true);
filter = compaction_filter_->FilterV2(
compaction_->level(), ikey_.user_key,
CompactionFilter::ValueType::kValue, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0;
}

if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter::Decision::kKeep;
}

if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
*need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
*skip_until = compaction_filter_skip_until_.Encode();
}
}
}

void CompactionIterator::NextFromInput() {
at_next_ = false;
valid_ = false;
Expand Down Expand Up @@ -220,60 +275,22 @@ void CompactionIterator::NextFromInput() {
has_outputted_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
current_key_committed_ =
(snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber));

#ifndef ROCKSDB_LITE
if (compaction_listener_) {
compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key,
fromInternalValueType(ikey_.type),
value_, ikey_.sequence, true);
}
#endif // ROCKSDB_LITE

// apply the compaction filter to the first occurrence of the user key
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
ignore_snapshots_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
CompactionFilter::Decision filter;
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
{
StopWatchNano timer(env_, true);
filter = compaction_filter_->FilterV2(
compaction_->level(), ikey_.user_key,
CompactionFilter::ValueType::kValue, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0;
}

if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
cmp_->Compare(*compaction_filter_skip_until_.rep(),
ikey_.user_key) <= 0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter::Decision::kKeep;
}
#endif // !ROCKSDB_LITE

if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
skip_until = compaction_filter_skip_until_.Encode();
}
// Apply the compaction filter to the first committed version of the user
// key.
if (current_key_committed_) {
InvokeFilterIfNeeded(&need_skip, &skip_until);
}
} else {
#ifndef ROCKSDB_LITE
Expand All @@ -292,6 +309,26 @@ void CompactionIterator::NextFromInput() {
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();

// Note that newer version of a key is ordered before older versions. If a
// newer version of a key is committed, so as the older version. No need
// to query snapshot_checker_ in that case.
if (UNLIKELY(!current_key_committed_)) {
assert(snapshot_checker_ != nullptr);
current_key_committed_ =
snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber);
// Apply the compaction filter to the first committed version of the
// user key.
if (current_key_committed_) {
InvokeFilterIfNeeded(&need_skip, &skip_until);
}
}
}

if (UNLIKELY(!current_key_committed_)) {
assert(snapshot_checker_ != nullptr);
valid_ = true;
break;
}

// If there are no snapshots, then this kv affect visibility at tip.
Expand Down Expand Up @@ -557,6 +594,9 @@ void CompactionIterator::PrepareOutput() {
// only care about sequence number larger than any active snapshots.
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
ikey_.type != kTypeMerge &&
!cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
Expand All @@ -568,10 +608,11 @@ void CompactionIterator::PrepareOutput() {
inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot) {
assert(snapshots_->size());
SequenceNumber prev __attribute__((__unused__)) = kMaxSequenceNumber;
SequenceNumber prev = kMaxSequenceNumber;
for (const auto cur : *snapshots_) {
assert(prev == kMaxSequenceNumber || prev <= cur);
if (cur >= in) {
if (cur >= in && (snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(in, cur))) {
*prev_snapshot = prev == kMaxSequenceNumber ? 0 : prev;
return cur;
}
Expand Down
17 changes: 14 additions & 3 deletions db/compaction_iterator.h
Expand Up @@ -14,6 +14,7 @@
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "db/snapshot_checker.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"

Expand Down Expand Up @@ -59,7 +60,8 @@ class CompactionIterator {
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key,
RangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
Expand All @@ -71,7 +73,8 @@ class CompactionIterator {
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key,
RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
Expand Down Expand Up @@ -111,6 +114,9 @@ class CompactionIterator {
// compression.
void PrepareOutput();

// Invoke compaction filter if needed.
void InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);

// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
Expand All @@ -125,14 +131,15 @@ class CompactionIterator {
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
const SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
bool expect_valid_internal_key_;
RangeDelAggregator* range_del_agg_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
#ifndef ROCKSDB_LITE
CompactionEventListener* compaction_listener_;
#endif // ROCKSDB_LITE
#endif // !ROCKSDB_LITE
const std::atomic<bool>* shutting_down_;
bool bottommost_level_;
bool valid_ = false;
Expand Down Expand Up @@ -189,6 +196,10 @@ class CompactionIterator {
std::vector<size_t> level_ptrs_;
CompactionIterationStats iter_stats_;

// Used to avoid purging uncommitted values. The application can specify
// uncommitted values by providing a SnapshotChecker object.
bool current_key_committed_;

bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
Expand Down
7 changes: 5 additions & 2 deletions db/compaction_iterator_test.cc
Expand Up @@ -181,6 +181,8 @@ class CompactionIteratorTest : public testing::Test {
compaction_proxy_ = new FakeCompaction();
compaction.reset(compaction_proxy_);
}
// TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr;

merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter,
nullptr, false, 0, 0, nullptr,
Expand All @@ -189,8 +191,9 @@ class CompactionIteratorTest : public testing::Test {
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(),
std::move(compaction), filter, nullptr, &shutting_down_));
kMaxSequenceNumber, snapshot_checker, Env::Default(), false,
range_del_agg_.get(), std::move(compaction), filter, nullptr,
&shutting_down_));
}

void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }
Expand Down