Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to trigger flush when the number of range deletions reach a threshold #11358

Closed
wants to merge 16 commits into from
Closed
36 changes: 36 additions & 0 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3475,6 +3475,42 @@ TEST_F(DBRangeDelTest, NonBottommostCompactionDropRangetombstone) {
db_->ReleaseSnapshot(snapshot);
}

TEST_F(DBRangeDelTest, MemtableMaxRangeDeletions) {
// Tests option `memtable_max_range_deletions`.
Options options = CurrentOptions();
options.level_compaction_dynamic_file_size = false;
options.memtable_max_range_deletions = 50;
options.level0_file_num_compaction_trigger = 5;
DestroyAndReopen(options);

for (int i = 0; i < 50; ++i) {
// Intentionally delete overlapping ranges to see if the option
// checks number of range tombstone fragments instead.
ASSERT_OK(Put(Key(i), "val1"));
ASSERT_OK(Put(Key(i + 1), "val2"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(i), Key(i + 2)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
}
// One more write to trigger flush.
ASSERT_OK(Put(Key(50), "val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(1, NumTableFilesAtLevel(0));

// This should take effect for the next new memtable.
ASSERT_OK(db_->SetOptions({{"memtable_max_range_deletions", "1"}}));
ASSERT_OK(Flush());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(50), Key(100)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
// One more write to trigger flush.
ASSERT_OK(Put(Key(50), "new val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(3, NumTableFilesAtLevel(0));
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
8 changes: 7 additions & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ Status FlushJob::WriteLevel0Table() {
uint64_t total_num_entries = 0, total_num_deletes = 0;
uint64_t total_data_size = 0;
size_t total_memory_usage = 0;
uint64_t total_num_range_deletes = 0;
// Used for testing:
uint64_t mems_size = mems_.size();
(void)mems_size; // avoids unused variable error when
Expand All @@ -883,15 +884,20 @@ Status FlushJob::WriteLevel0Table() {
total_num_deletes += m->num_deletes();
total_data_size += m->get_data_size();
total_memory_usage += m->ApproximateMemoryUsage();
total_num_range_deletes += m->num_range_deletes();
}

// TODO(cbi): when memtable is flushed due to number of range deletions
// hitting limit memtable_max_range_deletions, flush_reason_ is still
// "Write Buffer Full", should make update flush_reason_ accordingly.
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems_.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "total_data_size"
<< total_data_size << "memory_usage"
<< total_memory_usage << "flush_reason"
<< total_memory_usage << "num_range_deletes"
<< total_num_range_deletes << "flush_reason"
<< GetFlushReasonString(flush_reason_);

{
Expand Down
19 changes: 18 additions & 1 deletion db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
data_size_(0),
num_entries_(0),
num_deletes_(0),
num_range_deletes_(0),
write_buffer_size_(mutable_cf_options.write_buffer_size),
flush_in_progress_(false),
flush_completed_(false),
Expand All @@ -114,7 +115,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
ioptions.memtable_insert_with_hint_prefix_extractor.get()),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
approximate_memory_usage_(0),
memtable_max_range_deletions_(
mutable_cf_options.memtable_max_range_deletions) {
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
Expand Down Expand Up @@ -174,6 +177,14 @@ size_t MemTable::ApproximateMemoryUsage() {
}

bool MemTable::ShouldFlushNow() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to log the reason for flush? Maybe consider adding the reason to FlushReason .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let me see if i can identify where to generate the flush reason.

// This is set if memtable_max_range_deletions is > 0,
// and that many range deletions are done
if (memtable_max_range_deletions_ > 0 &&
num_range_deletes_.load(std::memory_order_relaxed) >=
static_cast<uint64_t>(memtable_max_range_deletions_)) {
return true;
}

size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
// In a lot of times, we cannot allocate arena blocks that exactly matches the
// buffer size. Thus we have to decide if we should over-allocate or
Expand Down Expand Up @@ -756,6 +767,9 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
type == kTypeDeletionWithTimestamp) {
num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
} else if (type == kTypeRangeDeletion) {
cbi42 marked this conversation as resolved.
Show resolved Hide resolved
uint64_t val = num_range_deletes_.load(std::memory_order_relaxed) + 1;
num_range_deletes_.store(val, std::memory_order_relaxed);
}

if (bloom_filter_ && prefix_extractor_ &&
Expand Down Expand Up @@ -822,6 +836,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
size_t size = cached_range_tombstone_.Size();
if (allow_concurrent) {
post_process_info->num_range_deletes++;
range_del_mutex_.lock();
}
for (size_t i = 0; i < size; ++i) {
Expand All @@ -840,6 +855,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
new_local_cache_ref, new_cache.get()),
std::memory_order_relaxed);
}

if (allow_concurrent) {
range_del_mutex_.unlock();
}
Expand Down Expand Up @@ -1268,6 +1284,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
// Avoiding recording stats for speed.
return false;
}

PERF_TIMER_GUARD(get_from_memtable_time);

std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
Expand Down
17 changes: 17 additions & 0 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct MemTablePostProcessInfo {
uint64_t data_size = 0;
uint64_t num_entries = 0;
uint64_t num_deletes = 0;
uint64_t num_range_deletes = 0;
};

using MultiGetRange = MultiGetContext::Range;
Expand Down Expand Up @@ -332,6 +333,10 @@ class MemTable {
num_deletes_.fetch_add(update_counters.num_deletes,
std::memory_order_relaxed);
}
if (update_counters.num_range_deletes > 0) {
num_range_deletes_.fetch_add(update_counters.num_range_deletes,
std::memory_order_relaxed);
}
UpdateFlushState();
}

Expand All @@ -349,6 +354,13 @@ class MemTable {
return num_deletes_.load(std::memory_order_relaxed);
}

// Get total number of range deletions in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_range_deletes() const {
return num_range_deletes_.load(std::memory_order_relaxed);
}

uint64_t get_data_size() const {
return data_size_.load(std::memory_order_relaxed);
}
Expand Down Expand Up @@ -565,6 +577,7 @@ class MemTable {
std::atomic<uint64_t> data_size_;
std::atomic<uint64_t> num_entries_;
std::atomic<uint64_t> num_deletes_;
std::atomic<uint64_t> num_range_deletes_;

// Dynamically changeable memtable option
std::atomic<size_t> write_buffer_size_;
Expand Down Expand Up @@ -626,6 +639,10 @@ class MemTable {
// Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;

// max range deletions in a memtable, before automatic flushing, 0 for
// unlimited.
uint32_t memtable_max_range_deletions_ = 0;

// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;

Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ DECLARE_bool(allow_data_in_errors);

DECLARE_bool(enable_thread_tracking);

DECLARE_uint32(memtable_max_range_deletions);

// Tiered storage
DECLARE_bool(enable_tiered_storage); // set last_level_temperature
DECLARE_int64(preclude_last_level_data_seconds);
Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,4 +1102,8 @@ DEFINE_uint64(stats_dump_period_sec,
DEFINE_bool(use_io_uring, false, "Enable the use of IO uring on Posix");
extern "C" bool RocksDbIOUringEnable() { return FLAGS_use_io_uring; }

DEFINE_uint32(memtable_max_range_deletions, 0,
"If nonzero, RocksDB will try to flush the current memtable"
"after the number of range deletions is >= this limit");

#endif // GFLAGS
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3278,6 +3278,8 @@ void InitializeOptionsFromFlags(
options.allow_data_in_errors = FLAGS_allow_data_in_errors;

options.enable_thread_tracking = FLAGS_enable_thread_tracking;

options.memtable_max_range_deletions = FLAGS_memtable_max_range_deletions;
}

void InitializeOptionsGeneral(
Expand Down
11 changes: 11 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: nullptr
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;

// RocksDB will try to flush the current memtable after the number of range
// deletions is >= this limit. For workloads with many range
// deletions, limiting the number of range deletions in memtable can help
// prevent performance degradation and/or OOM caused by too many range
// tombstones in a single memtable.
//
// Default: 0 (disabled)
//
// Dynamically changeable through SetOptions() API
uint32_t memtable_max_range_deletions = 0;

// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
Expand Down
47 changes: 47 additions & 0 deletions java/rocksjni/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3904,6 +3904,29 @@ jbyte Java_org_rocksdb_Options_prepopulateBlobCache(JNIEnv*, jobject,
opts->prepopulate_blob_cache);
}

/*
* Class: org_rocksdb_Options
* Method: setMemtableMaxRangeDeletions
* Signature: (JI)V
*/
void Java_org_rocksdb_Options_setMemtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle, jint jmemtable_max_range_deletions) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
opts->memtable_max_range_deletions =
static_cast<int32_t>(jmemtable_max_range_deletions);
}

/*
* Class: org_rocksdb_Options
* Method: memtableMaxRangeDeletions
* Signature: (J)I
*/
jint Java_org_rocksdb_Options_memtableMaxRangeDeletions(JNIEnv*, jobject,
jlong jhandle) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
return static_cast<jint>(opts->memtable_max_range_deletions);
}

//////////////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::ColumnFamilyOptions

Expand Down Expand Up @@ -5770,6 +5793,30 @@ jbyte Java_org_rocksdb_ColumnFamilyOptions_prepopulateBlobCache(JNIEnv*,
opts->prepopulate_blob_cache);
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setMemtableMaxRangeDeletions
* Signature: (JI)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setMemtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle, jint jmemtable_max_range_deletions) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
opts->memtable_max_range_deletions = jmemtable_max_range_deletions;
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: memtableMaxRangeDeletions
* Signature: (J)I
*/
jint Java_org_rocksdb_ColumnFamilyOptions_memtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
return static_cast<jint>(opts->memtable_max_range_deletions);
}

/////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::DBOptions

Expand Down
13 changes: 13 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,17 @@ public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}

@Override
public ColumnFamilyOptions setMemtableMaxRangeDeletions(final int count) {
setMemtableMaxRangeDeletions(nativeHandle_, count);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return memtableMaxRangeDeletions(nativeHandle_);
}

//
// BEGIN options for blobs (integrated BlobDB)
//
Expand Down Expand Up @@ -1498,6 +1509,8 @@ private native void setForceConsistencyChecks(final long handle,
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long compactionThreadLimiterHandle);
private native void setMemtableMaxRangeDeletions(final long handle, final int count);
private native int memtableMaxRangeDeletions(final long handle);

private native void setEnableBlobFiles(final long nativeHandle_, final boolean enableBlobFiles);
private native boolean enableBlobFiles(final long nativeHandle_);
Expand Down
17 changes: 17 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,23 @@ T setCompressionOptions(
@Experimental("Caution: this option is experimental")
SstPartitionerFactory sstPartitionerFactory();

/**
* Sets the maximum range delete calls, after which memtable is flushed.
* This applies to the mutable memtable.
*
* @param count a positive integer, 0 (default) to disable the feature.
* @return the reference of the current options.
*/
T setMemtableMaxRangeDeletions(final int count);

/**
* Gets the current setting of maximum range deletes allowed
* 0(default) indicates that feature is disabled.
*
* @return current value of memtable_max_range_deletions
*/
int memtableMaxRangeDeletions();

/**
* Compaction concurrent thread limiter for the column family.
* If non-nullptr, use given concurrent thread limiter to control
Expand Down
13 changes: 13 additions & 0 deletions java/src/main/java/org/rocksdb/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,17 @@ public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}

@Override
public Options setMemtableMaxRangeDeletions(final int count) {
setMemtableMaxRangeDeletions(nativeHandle_, count);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return memtableMaxRangeDeletions(nativeHandle_);
}

@Override
public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {
setCompactionThreadLimiter(nativeHandle_, compactionThreadLimiter.nativeHandle_);
Expand Down Expand Up @@ -2502,6 +2513,8 @@ private native void setAtomicFlush(final long handle,
final boolean atomicFlush);
private native boolean atomicFlush(final long handle);
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private native void setMemtableMaxRangeDeletions(final long handle, final int count);
private native int memtableMaxRangeDeletions(final long handle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long newLimiterHandle);
private static native void setAvoidUnnecessaryBlockingIO(
Expand Down
10 changes: 10 additions & 0 deletions java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -709,4 +709,14 @@ public void cfPaths() throws IOException {
assertThat(options.cfPaths()).isEqualTo(paths);
}
}

@Test
public void memtableMaxRangeDeletions() {
try (final ColumnFamilyOptions options = new ColumnFamilyOptions()) {
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(0);
final int val = 32;
assertThat(options.setMemtableMaxRangeDeletions(val)).isEqualTo(options);
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(val);
}
}
}
Loading
Loading