Skip to content

Commit

Permalink
Merge branch 'main' into vrdhn-upstream-range-del-read
Browse files Browse the repository at this point in the history
  • Loading branch information
cbi42 committed Jul 5, 2023
2 parents 45bd3c7 + c53d604 commit 4c6d625
Show file tree
Hide file tree
Showing 91 changed files with 1,638 additions and 859 deletions.
32 changes: 32 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
# Rocksdb Change Log
> NOTE: Entries for next release do not go here. Follow instructions in `unreleased_history/README.txt`
## 8.4.0 (06/26/2023)
### New Features
* Add FSReadRequest::fs_scratch which is a data buffer allocated and provided by underlying FileSystem to RocksDB during reads, when FS wants to provide its own buffer with data instead of using RocksDB provided FSReadRequest::scratch. This can help in cpu optimization by avoiding copy from file system's buffer to RocksDB buffer. More details on how to use/enable it in file_system.h. Right now its supported only for MultiReads(async + sync) with non direct io.
* Start logging non-zero user-defined timestamp sizes in WAL to signal user key format in subsequent records and use it during recovery. This change will break recovery from WAL files written by early versions that contain user-defined timestamps. The workaround is to ensure there are no WAL files to recover (i.e. by flushing before close) before upgrade.
* Added new property "rocksdb.obsolete-sst-files-size-property" that reports the size of SST files that have become obsolete but have not yet been deleted or scheduled for deletion
* Start to record the value of the flag `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` in the Manifest and table properties for a SST file when it is created. And use the recorded flag when creating a table reader for the SST file. This flag is only explicitly record if it's false.
* Add a new option OptimisticTransactionDBOptions::shared_lock_buckets that enables sharing mutexes for validating transactions between DB instances, for better balancing memory efficiency and validation contention across DB instances. Different column families and DBs also now use different hash seeds in this validation, so that the same set of key names will not contend across DBs or column families.
* Add a new ticker `rocksdb.files.marked.trash.deleted` to track the number of trash files deleted by background thread from the trash queue.
* Add an API NewTieredVolatileCache() in include/rocksdb/cache.h to allocate an instance of a block cache with a primary block cache tier and a compressed secondary cache tier. A cache of this type distributes memory reservations against the block cache, such as WriteBufferManager, table reader memory etc., proportionally across both the primary and compressed secondary cache.
* Add `WaitForCompact()` to wait for all flush and compactions jobs to finish. Jobs to wait include the unscheduled (queued, but not scheduled yet).
* Add `WriteBatch::Release()` that releases the batch's serialized data to the caller.

### Public API Changes
* Add parameter `deletion_ratio` to C API `rocksdb_options_add_compact_on_deletion_collector_factory`.
* change the FileSystem::use_async_io() API to SupportedOps API in order to extend it to various operations supported by underlying FileSystem. Right now it contains FSSupportedOps::kAsyncIO and FSSupportedOps::kFSBuffer. More details about FSSupportedOps in filesystem.h
* Add new tickers: `rocksdb.error.handler.bg.error.count`, `rocksdb.error.handler.bg.io.error.count`, `rocksdb.error.handler.bg.retryable.io.error.count` to replace the misspelled ones: `rocksdb.error.handler.bg.errro.count`, `rocksdb.error.handler.bg.io.errro.count`, `rocksdb.error.handler.bg.retryable.io.errro.count` ('error' instead of 'errro'). Users should switch to use the new tickers before 9.0 release as the misspelled old tickers will be completely removed then.
* Overload the API CreateColumnFamilyWithImport() to support creating ColumnFamily by importing multiple ColumnFamilies It requires that CFs should not overlap in user key range.

### Behavior Changes
* Change the default value for option `level_compaction_dynamic_level_bytes` to true. This affects users who use leveled compaction and do not set this option explicitly. These users may see additional background compactions following DB open. These compactions help to shape the LSM according to `level_compaction_dynamic_level_bytes` such that the size of each level Ln is approximately size of Ln-1 * `max_bytes_for_level_multiplier`. Turning on this option has other benefits too: see more detail in wiki: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#option-level_compaction_dynamic_level_bytes-and-levels-target-size and in option comment in advanced_options.h (#11525).
* For Leveled Compaction users, `CompactRange()` will now always try to compact to the last non-empty level. (#11468)
For Leveled Compaction users, `CompactRange()` with `bottommost_level_compaction = BottommostLevelCompaction::kIfHaveCompactionFilter` will behave similar to `kForceOptimized` in that it will skip files created during this manual compaction when compacting files in the bottommost level. (#11468)
* RocksDB will try to drop range tombstones during non-bottommost compaction when it is safe to do so. (#11459)
* When a DB is openend with `allow_ingest_behind=true` (currently only Universal compaction is supported), files in the last level, i.e. the ingested files, will not be included in any compaction. (#11489)
* Statistics `rocksdb.sst.read.micros` scope is expanded to all SST reads except for file ingestion and column family import (some compaction reads were previously excluded).

### Bug Fixes
* Reduced cases of illegally using Env::Default() during static destruction by never destroying the internal PosixEnv itself (except for builds checking for memory leaks). (#11538)
* Fix extra prefetching during seek in async_io when BlockBasedTableOptions.num_file_reads_for_auto_readahead is 1 leading to extra reads than required.
* Fix a bug where compactions that are qualified to be run as 2 subcompactions were only run as one subcompaction.
* Fix a use-after-move bug in block.cc.

## 8.3.0 (05/19/2023)
### New Features
* Introduced a new option `block_protection_bytes_per_key`, which can be used to enable per key-value integrity protection for in-memory blocks in block cache (#11287).
Expand Down
2 changes: 1 addition & 1 deletion cache/secondary_cache_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ void CacheWithSecondaryAdapter::WaitAll(AsyncLookupHandle* async_handles,
for (AsyncLookupHandle* cur : my_pending) {
my_secondary_handles.push_back(cur->pending_handle);
}
secondary_cache_->WaitAll(my_secondary_handles);
secondary_cache_->WaitAll(std::move(my_secondary_handles));
}

// Process results
Expand Down
4 changes: 3 additions & 1 deletion db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ void ArenaWrappedDBIter::Init(
read_options_ = read_options;
allow_refresh_ = allow_refresh;
memtable_range_tombstone_iter_ = nullptr;
if (!env->GetFileSystem()->use_async_io()) {

if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,11 @@ void BlobFileReader::MultiGetBlob(
assert(req->offset >= adjustment);
adjustments.push_back(adjustment);

FSReadRequest read_req = {};
FSReadRequest read_req;
read_req.offset = req->offset - adjustment;
read_req.len = req->len + adjustment;
read_reqs.emplace_back(read_req);
total_len += read_req.len;
read_reqs.emplace_back(std::move(read_req));
}

RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);
Expand Down
35 changes: 29 additions & 6 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "db/blob/blob_file_builder.h"
#include "db/compaction/compaction_iterator.h"
#include "db/dbformat.h"
#include "db/event_helpers.h"
#include "db/internal_stats.h"
#include "db/merge_helper.h"
Expand Down Expand Up @@ -205,21 +206,38 @@ Status BuildTable(
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);

const size_t ts_sz = ucmp->timestamp_size();
const bool strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;

std::string key_after_flush_buf;
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
// Generate a rolling 64-bit hash of the key and values
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
s = output_validator.Add(key, value);
Slice key_after_flush = key;
// If user defined timestamps will be stripped from user key after flush,
// the in memory version of the key act logically the same as one with a
// minimum timestamp. We update the timestamp here so file boundary and
// output validator, block builder all see the effect of the stripping.
if (strip_timestamp) {
key_after_flush_buf.clear();
ReplaceInternalKeyWithMinTimestamp(&key_after_flush_buf, key, ts_sz);
key_after_flush = key_after_flush_buf;
}

// Generate a rolling 64-bit hash of the key and values
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
s = output_validator.Add(key_after_flush, value);
if (!s.ok()) {
break;
}
builder->Add(key, value);
builder->Add(key_after_flush, value);

s = meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type);
s = meta->UpdateBoundaries(key_after_flush, value, ikey.sequence,
ikey.type);
if (!s.ok()) {
break;
}
Expand All @@ -244,6 +262,7 @@ Status BuildTable(
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
// TODO(yuzhangyu): handle range deletion for UDT in memtables only.
builder->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_end = tombstone.SerializeEndKey();
meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_,
Expand Down Expand Up @@ -293,6 +312,8 @@ Status BuildTable(
meta->fd.file_size = file_size;
meta->tail_size = builder->GetTailSize();
meta->marked_for_compaction = builder->NeedCompact();
meta->user_defined_timestamps_persisted =
ioptions.persist_user_defined_timestamps;
assert(meta->fd.GetFileSize() > 0);
tp = builder
->GetTableProperties(); // refresh now that builder is finished
Expand Down Expand Up @@ -352,6 +373,8 @@ Status BuildTable(
s = *io_status;
}

// TODO(yuzhangyu): handle the key copy in the blob when ts should be
// stripped.
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
Expand Down
45 changes: 40 additions & 5 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,36 @@ rocksdb_column_family_handle_t* rocksdb_create_column_family(
return handle;
}

rocksdb_column_family_handle_t** rocksdb_create_column_families(
rocksdb_t* db, const rocksdb_options_t* column_family_options,
int num_column_families, const char* const* column_family_names,
size_t* lencfs, char** errptr) {
std::vector<ColumnFamilyHandle*> handles;
std::vector<std::string> names;
for (int i = 0; i != num_column_families; ++i) {
names.push_back(std::string(column_family_names[i]));
}
SaveError(errptr, db->rep->CreateColumnFamilies(
ColumnFamilyOptions(column_family_options->rep), names,
&handles));

*lencfs = handles.size();
rocksdb_column_family_handle_t** c_handles =
static_cast<rocksdb_column_family_handle_t**>(
malloc(sizeof(rocksdb_column_family_handle_t*) * handles.size()));
for (size_t i = 0; i != handles.size(); ++i) {
c_handles[i] = new rocksdb_column_family_handle_t;
c_handles[i]->rep = handles[i];
}

return c_handles;
}

void rocksdb_create_column_families_destroy(
rocksdb_column_family_handle_t** list) {
free(list);
}

rocksdb_column_family_handle_t* rocksdb_create_column_family_with_ttl(
rocksdb_t* db, const rocksdb_options_t* column_family_options,
const char* column_family_name, int ttl, char** errptr) {
Expand Down Expand Up @@ -3740,16 +3770,21 @@ void rocksdb_options_set_hash_link_list_rep(rocksdb_options_t* opt,
ROCKSDB_NAMESPACE::NewHashLinkListRepFactory(bucket_count));
}

void rocksdb_options_set_plain_table_factory(rocksdb_options_t* opt,
uint32_t user_key_len,
int bloom_bits_per_key,
double hash_table_ratio,
size_t index_sparseness) {
void rocksdb_options_set_plain_table_factory(
rocksdb_options_t* opt, uint32_t user_key_len, int bloom_bits_per_key,
double hash_table_ratio, size_t index_sparseness, size_t huge_page_tlb_size,
char encoding_type, unsigned char full_scan_mode,
unsigned char store_index_in_file) {
ROCKSDB_NAMESPACE::PlainTableOptions options;
options.user_key_len = user_key_len;
options.bloom_bits_per_key = bloom_bits_per_key;
options.hash_table_ratio = hash_table_ratio;
options.index_sparseness = index_sparseness;
options.huge_page_tlb_size = huge_page_tlb_size;
options.encoding_type =
static_cast<ROCKSDB_NAMESPACE::EncodingType>(encoding_type);
options.full_scan_mode = full_scan_mode;
options.store_index_in_file = store_index_in_file;

ROCKSDB_NAMESPACE::TableFactory* factory =
ROCKSDB_NAMESPACE::NewPlainTableFactory(options);
Expand Down
18 changes: 15 additions & 3 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1672,7 +1672,8 @@ int main(int argc, char** argv) {
rocksdb_options_set_prefix_extractor(
options, rocksdb_slicetransform_create_fixed_prefix(3));
rocksdb_options_set_hash_skip_list_rep(options, 5000, 4, 4);
rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16);
rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16, 0, 0, 0,
0);
rocksdb_options_set_allow_concurrent_memtable_write(options, 0);

db = rocksdb_open(options, dbname, &err);
Expand Down Expand Up @@ -3374,8 +3375,19 @@ int main(int argc, char** argv) {
rocksdb_put(db, woptions, "key", 3, "value", 5, &err);
CheckNoError(err);
rocksdb_column_family_handle_t *cfh1, *cfh2;
cfh1 = rocksdb_create_column_family(db, db_options, "txn_db_cf1", &err);
cfh2 = rocksdb_create_column_family(db, db_options, "txn_db_cf2", &err);
char** list_const_cf_names = (char**)malloc(2 * sizeof(char*));
list_const_cf_names[0] = "txn_db_cf1";
list_const_cf_names[1] = "txn_db_cf2";
size_t cflen;
rocksdb_column_family_handle_t** list_cfh = rocksdb_create_column_families(
db, db_options, 2, (const char* const*)list_const_cf_names, &cflen,
&err);
free(list_const_cf_names);
CheckNoError(err);
assert(cflen == 2);
cfh1 = list_cfh[0];
cfh2 = list_cfh[1];
rocksdb_create_column_families_destroy(list_cfh);
txn = rocksdb_optimistictransaction_begin(otxn_db, woptions, otxn_options,
NULL);
rocksdb_transaction_put_cf(txn, cfh1, "key_cf1", 7, "val_cf1", 7, &err);
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ class CompactionJobTestBase : public testing::Test {
kUnknownFileCreationTime,
versions_->GetColumnFamilySet()->GetDefault()->NewEpochNumber(),
kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2,
0, 0);
/*compensated_range_deletion_size=*/0, /*tail_size=*/0,
/*user_defined_timestamps_persisted=*/true);

mutex_.Lock();
EXPECT_OK(versions_->LogAndApply(
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Status CompactionOutputs::Finish(const Status& intput_status,
meta->fd.file_size = current_bytes;
meta->tail_size = builder_->GetTailSize();
meta->marked_for_compaction = builder_->NeedCompact();
meta->user_defined_timestamps_persisted = static_cast<bool>(
builder_->GetTableProperties().user_defined_timestamps_persisted);
}
current_output().finished = true;
stats_.bytes_written += current_bytes;
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ class CompactionPickerTestBase : public testing::Test {
smallest_seq, largest_seq, marked_for_compact, temperature,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, epoch_number, kUnknownFileChecksum,
kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0);
kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0,
true /* user_defined_timestamps_persisted */);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
f->oldest_ancester_time = oldest_ancestor_time;
Expand Down
6 changes: 4 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
f->marked_for_compaction, f->temperature, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time, f->epoch_number,
f->file_checksum, f->file_checksum_func_name, f->unique_id,
f->compensated_range_deletion_size, f->tail_size);
f->compensated_range_deletion_size, f->tail_size,
f->user_defined_timestamps_persisted);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
Expand Down Expand Up @@ -3510,7 +3511,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->epoch_number, f->file_checksum,
f->file_checksum_func_name, f->unique_id,
f->compensated_range_deletion_size, f->tail_size);
f->compensated_range_deletion_size, f->tail_size,
f->user_defined_timestamps_persisted);

ROCKS_LOG_BUFFER(
log_buffer,
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->epoch_number, f->file_checksum,
f->file_checksum_func_name, f->unique_id,
f->compensated_range_deletion_size, f->tail_size);
f->compensated_range_deletion_size, f->tail_size,
f->user_defined_timestamps_persisted);
}

status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
Expand Down
19 changes: 10 additions & 9 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ Status DBImpl::Recover(
f->file_creation_time, f->epoch_number,
f->file_checksum, f->file_checksum_func_name,
f->unique_id, f->compensated_range_deletion_size,
f->tail_size);
f->tail_size, f->user_defined_timestamps_persisted);
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Moving #%" PRIu64
" from from_level-%d to from_level-%d %" PRIu64
Expand Down Expand Up @@ -1689,14 +1689,15 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
constexpr int level = 0;

if (s.ok() && has_output) {
edit->AddFile(
level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(),
meta.smallest, meta.largest, meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.marked_for_compaction, meta.temperature,
meta.oldest_blob_file_number, meta.oldest_ancester_time,
meta.file_creation_time, meta.epoch_number, meta.file_checksum,
meta.file_checksum_func_name, meta.unique_id,
meta.compensated_range_deletion_size, meta.tail_size);
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.temperature,
meta.oldest_blob_file_number, meta.oldest_ancester_time,
meta.file_creation_time, meta.epoch_number,
meta.file_checksum, meta.file_checksum_func_name,
meta.unique_id, meta.compensated_range_deletion_size,
meta.tail_size, meta.user_defined_timestamps_persisted);

for (const auto& blob : blob_file_additions) {
edit->AddBlobFile(blob);
Expand Down
Loading

0 comments on commit 4c6d625

Please sign in to comment.