Skip to content
Permalink
Browse files

Support for SingleDelete()

Summary:
This patch fixes #7460559. It introduces SingleDelete as a new database
operation. This operation can be used to delete keys that were never
overwritten (no put following another put of the same key). If an overwritten
key is single deleted the behavior is undefined. Single deletion of a
non-existent key has no effect but multiple consecutive single deletions are
not allowed (see limitations).

In contrast to the conventional Delete() operation, the deletion entry is
removed along with the value when the two are lined up in a compaction. Note:
The semantics are similar to @igor's prototype that allowed to have this
behavior on the granularity of a column family (
https://reviews.facebook.net/D42093 ). This new patch, however, is more
aggressive when it comes to removing tombstones: It removes the SingleDelete
together with the value whenever there is no snapshot between them while the
older patch only did this when the sequence number of the deletion was older
than the earliest snapshot.

Most of the complex additions are in the Compaction Iterator, all other changes
should be relatively straightforward. The patch also includes basic support for
single deletions in db_stress and db_bench.

Limitations:
- Not compatible with cuckoo hash tables
- Single deletions cannot be used in combination with merges and normal
  deletions on the same key (other keys are not affected by this)
- Consecutive single deletions are currently not allowed (and older version of
  this patch supported this so it could be resurrected if needed)

Test Plan: make all check

Reviewers: yhchiang, sdong, rven, anthony, yoshinorim, igor

Reviewed By: igor

Subscribers: maykov, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43179
  • Loading branch information...
4tXJ7f committed Sep 17, 2015
1 parent f35560d commit 014fd55adca7b217d08f579f78303eef39b834f2
Showing with 1,359 additions and 310 deletions.
  1. +7 −0 HISTORY.md
  2. +4 −0 Makefile
  3. +14 −15 db/builder.cc
  4. +91 −26 db/compaction_iterator.cc
  5. +18 −4 db/compaction_iterator.h
  6. +70 −0 db/compaction_iterator_test.cc
  7. +166 −4 db/compaction_job_test.cc
  8. +74 −1 db/db_bench.cc
  9. +13 −0 db/db_impl.cc
  10. +4 −2 db/db_impl.h
  11. +6 −0 db/db_impl_readonly.h
  12. +16 −14 db/db_iter.cc
  13. +62 −4 db/db_iter_test.cc
  14. +163 −7 db/db_test.cc
  15. +2 −2 db/dbformat.cc
  16. +41 −12 db/dbformat.h
  17. +1 −1 db/internal_stats.cc
  18. +3 −2 db/memtable.cc
  19. +7 −2 db/merge_helper.cc
  20. +3 −1 db/merge_helper.h
  21. +9 −14 db/merge_helper_test.cc
  22. +7 −1 db/table_properties_collector.cc
  23. +50 −32 db/table_properties_collector_test.cc
  24. +105 −32 db/write_batch.cc
  25. +13 −0 db/write_batch_base.cc
  26. +6 −0 db/write_batch_internal.h
  27. +127 −55 db/write_batch_test.cc
  28. +11 −0 include/rocksdb/db.h
  29. +1 −1 include/rocksdb/perf_context.h
  30. +1 −0 include/rocksdb/table_properties.h
  31. +7 −0 include/rocksdb/utilities/stackable_db.h
  32. +12 −1 include/rocksdb/utilities/write_batch_with_index.h
  33. +44 −19 include/rocksdb/write_batch.h
  34. +11 −0 include/rocksdb/write_batch_base.h
  35. +3 −0 table/get_context.cc
  36. +126 −43 tools/db_stress.cc
  37. +11 −0 util/db_test_util.cc
  38. +4 −0 util/db_test_util.h
  39. +9 −0 util/testutil.cc
  40. +4 −0 util/testutil.h
  41. +26 −13 utilities/write_batch_with_index/write_batch_with_index.cc
  42. +6 −1 utilities/write_batch_with_index/write_batch_with_index_internal.cc
  43. +1 −1 utilities/write_batch_with_index/write_batch_with_index_internal.h
@@ -1,5 +1,12 @@
# Rocksdb Change Log

## Unreleased
### New Features
* Added single delete operation as a more efficient way to delete keys that have not been overwritten.

### Public API Changes
* Added SingleDelete() to the DB interface.

## 4.0.0 (9/9/2015)
### New Features
* Added support for transactions. See include/rocksdb/utilities/transaction.h for more info.
@@ -295,6 +295,7 @@ TESTS = \
flush_job_test \
wal_manager_test \
listener_test \
compaction_iterator_test \
compaction_job_test \
thread_list_test \
sst_dump_test \
@@ -773,6 +774,9 @@ write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_i
flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

compaction_iterator_test: db/compaction_iterator_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@@ -110,13 +110,15 @@ Status BuildTable(
}

// Finish and check for builder errors
bool empty = builder->NumEntries() == 0;
s = c_iter.status();
if (s.ok()) {
s = builder->Finish();
} else {
if (!s.ok() || empty) {
builder->Abandon();
} else {
s = builder->Finish();
}
if (s.ok()) {

if (s.ok() && !empty) {
meta->fd.file_size = builder->FileSize();
meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetFileSize() > 0);
@@ -127,28 +129,27 @@ Status BuildTable(
delete builder;

// Finish and check for file errors
if (s.ok() && !ioptions.disable_data_sync) {
if (s.ok() && !empty && !ioptions.disable_data_sync) {
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
file_writer->Sync(ioptions.use_fsync);
}
if (s.ok()) {
if (s.ok() && !empty) {
s = file_writer->Close();
}

if (s.ok()) {
if (s.ok() && !empty) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(
std::unique_ptr<Iterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
false);
false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {}
for (it->SeekToFirst(); it->Valid(); it->Next()) {
}
s = it->status();
}

delete it;
}
}

@@ -157,9 +158,7 @@ Status BuildTable(
s = iter->status();
}

if (s.ok() && meta->fd.GetFileSize() > 0) {
// Keep it
} else {
if (!s.ok() || meta->fd.GetFileSize() == 0) {
env->DeleteFile(fname);
}
return s;
@@ -71,6 +71,10 @@ void CompactionIterator::Next() {
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to be valid.
assert(valid_key);
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
} else {
// MergeHelper moves the iterator to the first record after the merged
@@ -79,18 +83,22 @@ void CompactionIterator::Next() {
NextFromInput();
}
} else {
// Only advance the input iterator if there is no merge output.
input_->Next();
// Only advance the input iterator if there is no merge output and the
// iterator is not already at the next record.
if (!at_next_) {
input_->Next();
}
NextFromInput();
}

PrepareOutput();
}

void CompactionIterator::NextFromInput() {
at_next_ = false;
valid_ = false;

while (input_->Valid()) {
while (!valid_ && input_->Valid()) {
key_ = input_->key();
value_ = input_->value();
iter_stats_.num_input_records++;
@@ -100,10 +108,11 @@ void CompactionIterator::NextFromInput() {
// and let the caller decide what to do with it.
// TODO(noetzli): We should have a more elegant solution for this.
if (expect_valid_internal_key_) {
assert(!"corrupted internal key is not expected");
assert(!"Corrupted internal key not expected.");
status_ = Status::Corruption("Corrupted internal key not expected.");
break;
}
current_user_key_.Clear();
key_ = current_key_.SetKey(key_);
has_current_user_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
@@ -113,16 +122,20 @@ void CompactionIterator::NextFromInput() {
}

// Update input statistics
if (ikey_.type == kTypeDeletion) {
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
iter_stats_.num_input_deletion_records++;
}
iter_stats_.total_input_raw_key_bytes += key_.size();
iter_stats_.total_input_raw_value_bytes += value_.size();

// Check whether the user key changed. After this if statement current_key_
// is a copy of the current input key (maybe converted to a delete by the
// compaction filter). ikey_.user_key is pointing to the copy.
if (!has_current_user_key_ ||
cmp_->Compare(ikey_.user_key, current_user_key_.GetKey()) != 0) {
!cmp_->Equal(ikey_.user_key, current_user_key_)) {
// First occurrence of this user key
current_user_key_.SetKey(ikey_.user_key);
key_ = current_key_.SetKey(key_, &ikey_);
current_user_key_ = ikey_.user_key;
has_current_user_key_ = true;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
@@ -145,20 +158,22 @@ void CompactionIterator::NextFromInput() {
env_ != nullptr ? timer.ElapsedNanos() : 0;
}
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key_.SetInternalKey(ExtractUserKey(key_), ikey_.sequence,
kTypeDeletion);
// anchor the key again
key_ = delete_key_.GetKey();
// needed because ikey_ is backed by key
ParseInternalKey(key_, &ikey_);
// convert the current key to a delete
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (value_changed) {
value_ = compaction_filter_value_;
}
}
} else {
// Update the current key to reflect the new sequence number/type without
// copying the user key.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
}

// If there are no snapshots, then this kv affect visibility at tip.
@@ -173,14 +188,66 @@ void CompactionIterator::NextFromInput() {
visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot(
ikey_.sequence, &prev_snapshot);

if (last_snapshot == current_user_key_snapshot_) {
if (ikey_.type == kTypeSingleDeletion) {
ParsedInternalKey next_ikey;
input_->Next();

// Check whether the current key is valid, not corrupt and the same
// as the single delete.
if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
// Mixing single deletes and merges is not supported. Consecutive
// single deletes are not valid.
if (next_ikey.type != kTypeValue) {
assert(false);
status_ =
Status::InvalidArgument("Put expected after single delete.");
break;
}

// Check whether the current key belongs to the same snapshot as the
// single delete.
if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) {
// Found the matching value, we can drop the single delete and the
// value.
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_obsolete;
input_->Next();
} else {
// We hit the next snapshot without hitting a put, so the iterator
// returns the single delete.
valid_ = true;
}
} else {
// We are at the end of the input, could not parse the next key, or hit
// the next key. The iterator returns the single delete if the key
// possibly exists beyond the current output level. We set
// has_current_user_key to false so that if the iterator is at the next
// key, we do not compare it again against the previous key at the next
// iteration. If the next key is corrupt, we return before the
// comparison, so the value of has_current_user_key does not matter.
has_current_user_key_ = false;
if (compaction_ != nullptr &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
++iter_stats_.num_record_drop_obsolete;
} else {
valid_ = true;
}
}

if (valid_) {
at_next_ = true;
}
} else if (last_snapshot == current_user_key_snapshot_) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibility of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert(last_sequence >= current_user_key_sequence_);
++iter_stats_.num_record_drop_hidden; // (A)
input_->Next();
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
ikey_.sequence <= earliest_snapshot_ &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
@@ -197,6 +264,7 @@ void CompactionIterator::NextFromInput() {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
++iter_stats_.num_record_drop_obsolete;
input_->Next();
} else if (ikey_.type == kTypeMerge) {
if (!merge_helper_->HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
@@ -222,14 +290,14 @@ void CompactionIterator::NextFromInput() {
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert(valid_key);
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
break;
} else {
valid_ = true;
break;
}

input_->Next();
}
}

@@ -240,12 +308,9 @@ void CompactionIterator::PrepareOutput() {
// then we can squash the seqno to zero.
if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ &&
ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
updated_key_.assign(key_.data(), key_.size());
UpdateInternalKey(&updated_key_, (uint64_t)0, ikey_.type);
key_ = Slice(updated_key_);
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type);
}
}

@@ -64,7 +64,7 @@ class CompactionIterator {
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
bool Valid() const { return valid_; }
Slice user_key() const { return current_user_key_.GetKey(); }
const Slice& user_key() const { return current_user_key_; }
const CompactionIteratorStats& iter_stats() const { return iter_stats_; }

private:
@@ -102,18 +102,32 @@ class CompactionIterator {
SequenceNumber latest_snapshot_;

// State
//
// Points to a copy of the current compaction iterator output (current_key_)
// if valid_.
Slice key_;
// Points to the value in the underlying iterator that corresponds to the
// current output.
Slice value_;
// The status is OK unless compaction iterator encounters a merge operand
// while not having a merge operator defined.
Status status_;
// Stores the user key, sequence number and type of the current compaction
// iterator output (or current key in the underlying iterator during
// NextFromInput()).
ParsedInternalKey ikey_;
// Stores whether ikey_.user_key is valid. If set to false, the user key is
// not compared against the current key in the underlying iterator.
bool has_current_user_key_ = false;
IterKey current_user_key_;
bool at_next_ = false; // If false, the iterator
// Holds a copy of the current compaction iterator output (or current key in
// the underlying iterator during NextFromInput()).
IterKey current_key_;
Slice current_user_key_;
SequenceNumber current_user_key_sequence_;
SequenceNumber current_user_key_snapshot_;
MergeOutputIterator merge_out_iter_;
std::string updated_key_;
std::string compaction_filter_value_;
IterKey delete_key_;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
// KeyNotExistsBeyondOutputLevel(). This allows future calls to the function

0 comments on commit 014fd55

Please sign in to comment.
You can’t perform that action at this time.