Skip to content

Commit

Permalink
Improve performance the mix of range-delete and read workloads.
Browse files Browse the repository at this point in the history
The idea is to force flush after few range-delete operations.
Experimentation shows 32 or 64 giving good performance.

max_tombstone_count is available as a ColumnFamilyOption.
The default is 0, which disables. it.
  • Loading branch information
vrdhn committed Apr 7, 2023
1 parent f631138 commit 2287f58
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 3 deletions.
21 changes: 20 additions & 1 deletion db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
data_size_(0),
num_entries_(0),
num_deletes_(0),
num_range_deletes_(0),
write_buffer_size_(mutable_cf_options.write_buffer_size),
flush_in_progress_(false),
flush_completed_(false),
Expand All @@ -114,7 +115,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
ioptions.memtable_insert_with_hint_prefix_extractor.get()),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
approximate_memory_usage_(0),
max_tombstones_count_(mutable_cf_options.max_tombstones_count_) {
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
Expand Down Expand Up @@ -170,6 +172,11 @@ size_t MemTable::ApproximateMemoryUsage() {
}

bool MemTable::ShouldFlushNow() {
// Read path will set this if too many tombstones have been generated.
if (max_tombstones_reached_) {
return true;
}

size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
// In a lot of times, we cannot allocate arena blocks that exactly matches the
// buffer size. Thus we have to decide if we should over-allocate or
Expand Down Expand Up @@ -783,6 +790,10 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
type == kTypeDeletionWithTimestamp) {
num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
} else if (type == kTypeRangeDeletion) {
num_range_deletes_.store(
num_range_deletes_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
}

if (bloom_filter_ && prefix_extractor_ &&
Expand Down Expand Up @@ -1292,6 +1303,14 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
// Avoiding recording stats for speed.
return false;
}

// Arrange for a flush if too many tombstones have been created.
if (max_tombstones_count_ > 0 && max_tombstones_reached_ == false &&
num_range_deletes_.load(std::memory_order_relaxed) >
max_tombstones_count_) {
max_tombstones_reached_ = true;
}

PERF_TIMER_GUARD(get_from_memtable_time);

std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
Expand Down
6 changes: 6 additions & 0 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ class MemTable {
std::atomic<uint64_t> data_size_;
std::atomic<uint64_t> num_entries_;
std::atomic<uint64_t> num_deletes_;
std::atomic<uint64_t> num_range_deletes_;

// Dynamically changeable memtable option
std::atomic<size_t> write_buffer_size_;
Expand Down Expand Up @@ -614,6 +615,11 @@ class MemTable {
// Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;

// max tombstones before flushing, 0 for unlimited.
uint32_t max_tombstones_count_ = 0;
// Read will turn this flag on, if max_tombstones_count_ is reached.
// ShouldFlushNow() will check this.
bool max_tombstones_reached_ = false;
// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;

Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: nullptr
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;

// Automatic flush after tombstone count in memtable hits this limit.
// helps with delete-range + read workloads
// 0 to disable it completely
uint32_t max_tombstones_count = 0;

// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
Expand Down
46 changes: 46 additions & 0 deletions java/rocksjni/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3904,6 +3904,28 @@ jbyte Java_org_rocksdb_Options_prepopulateBlobCache(JNIEnv*, jobject,
opts->prepopulate_blob_cache);
}

/*
* Class: org_rocksdb_Options
* Method: setMaxTombstonesCount
* Signature: (JI)V
*/
void Java_org_rocksdb_Options_setMaxTombstonesCount(
JNIEnv*, jobject, jlong jhandle, jint jmax_tombstones_count) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
opts->max_tombstones_count = jmax_tombstones_count;
}

/*
* Class: org_rocksdb_Options
* Method: maxTombstonesCount
* Signature: (J)I
*/
jint Java_org_rocksdb_Options_maxTombstonesCount(JNIEnv*, jobject,
jlong jhandle) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
return static_cast<jint>(opts->max_tombstones_count);
}

//////////////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::ColumnFamilyOptions

Expand Down Expand Up @@ -5770,6 +5792,30 @@ jbyte Java_org_rocksdb_ColumnFamilyOptions_prepopulateBlobCache(JNIEnv*,
opts->prepopulate_blob_cache);
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setMaxTombstonesCount
* Signature: (JI)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setMaxTombstonesCount(
JNIEnv*, jobject, jlong jhandle, jint jmax_tombstones_count) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
opts->max_tombstones_count = jmax_tombstones_count;
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: maxTombstonesCount
* Signature: (J)I
*/
jint Java_org_rocksdb_ColumnFamilyOptions_maxTombstonesCount(JNIEnv*, jobject,
jlong jhandle) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
return static_cast<jint>(opts->max_tombstones_count);
}

/////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::DBOptions

Expand Down
13 changes: 13 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,17 @@ public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}

@Override
public ColumnFamilyOptions setMaxTombstonesCount(final int count) {
setMaxTombstonesCount(nativeHandle_, count);
return this;
}

@Override
public int maxTombstonesCount() {
return maxTombstonesCount(nativeHandle_);
}

//
// BEGIN options for blobs (integrated BlobDB)
//
Expand Down Expand Up @@ -1498,6 +1509,8 @@ private native void setForceConsistencyChecks(final long handle,
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long compactionThreadLimiterHandle);
private native void setMaxTombstonesCount(final long handle, final int count);
private native int maxTombstonesCount(final long handle);

private native void setEnableBlobFiles(final long nativeHandle_, final boolean enableBlobFiles);
private native boolean enableBlobFiles(final long nativeHandle_);
Expand Down
20 changes: 20 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,26 @@ T setCompressionOptions(
@Experimental("Caution: this option is experimental")
SstPartitionerFactory sstPartitionerFactory();

/**
* Sets the maximum tombstones ( range delete calls ) allowed
* in a delete-range + read work load.
* The value is only checked when a Get is called.
* This applies to the mutable memtable.
*
* @param a positive integer, 0 (default) to disable the feature.
* @return the reference of the current options.
*/
T setMaxTombstonesCount(final int count);

/**
* Gets the current setting of max tombstones allowed in a
* delete-range + read work load.
* 0(default) indicates that feature is disabled.
*
* @return current value of max_tombstones_count
*/
int maxTombstonesCount();

/**
* Compaction concurrent thread limiter for the column family.
* If non-nullptr, use given concurrent thread limiter to control
Expand Down
13 changes: 13 additions & 0 deletions java/src/main/java/org/rocksdb/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -1987,6 +1987,17 @@ public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}

@Override
public Options setMaxTombstonesCount(final int count) {
setMaxTombstonesCount(nativeHandle_, count);
return this;
}

@Override
public int maxTombstonesCount() {
return maxTombstonesCount(nativeHandle_);
}

@Override
public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {
setCompactionThreadLimiter(nativeHandle_, compactionThreadLimiter.nativeHandle_);
Expand Down Expand Up @@ -2506,6 +2517,8 @@ private native void setAtomicFlush(final long handle,
final boolean atomicFlush);
private native boolean atomicFlush(final long handle);
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private native void setMaxTombstonesCount(final long handle, final int count);
private native int maxTombstonesCount(final long handle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long newLimiterHandle);
private static native void setAvoidUnnecessaryBlockingIO(
Expand Down
10 changes: 10 additions & 0 deletions java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,14 @@ public void cfPaths() throws IOException {
assertThat(options.cfPaths()).isEqualTo(paths);
}
}

@Test
public void maxTombstonesCount() {
try (final ColumnFamilyOptions options = new ColumnFamilyOptions()) {
assertThat(options.maxTombstonesCount()).isEqualTo(0);
final int val = 32;
assertThat(options.setMaxTombstonesCount(val)).isEqualTo(options);
assertThat(options.maxTombstonesCount()).isEqualTo(val);
}
}
}
10 changes: 10 additions & 0 deletions java/src/test/java/org/rocksdb/OptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,16 @@ public void skipCheckingSstFileSizesOnDbOpen() {
}
}

@Test
public void maxTombstonesCount() {
try (final Options options = new Options()) {
assertThat(options.maxTombstonesCount()).isEqualTo(0);
final int val = 32;
assertThat(options.setMaxTombstonesCount(val)).isEqualTo(options);
assertThat(options.maxTombstonesCount()).isEqualTo(val);
}
}

@Test
public void eventListeners() {
final AtomicBoolean wasCalled1 = new AtomicBoolean();
Expand Down
7 changes: 5 additions & 2 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ struct MutableCFOptions {
options.memtable_protection_bytes_per_key),
sample_for_compression(
options.sample_for_compression), // TODO: is 0 fine here?
compression_per_level(options.compression_per_level) {
compression_per_level(options.compression_per_level),
max_tombstones_count_(options.max_tombstones_count) {
RefreshDerivedOptions(options.num_levels, options.compaction_style);
}

Expand Down Expand Up @@ -220,7 +221,8 @@ struct MutableCFOptions {
bottommost_compression(kDisableCompressionOption),
last_level_temperature(Temperature::kUnknown),
memtable_protection_bytes_per_key(0),
sample_for_compression(0) {}
sample_for_compression(0),
max_tombstones_count_(0) {}

explicit MutableCFOptions(const Options& options);

Expand Down Expand Up @@ -313,6 +315,7 @@ struct MutableCFOptions {

uint64_t sample_for_compression;
std::vector<CompressionType> compression_per_level;
uint32_t max_tombstones_count_;

// Derived options
// Per-level target file size.
Expand Down

0 comments on commit 2287f58

Please sign in to comment.