Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce scope of compression dictionary to single SST #4952

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Add a place holder in manifest which indicate a record from future that can be safely ignored.
* Add support for trace sampling.
* Enable properties block checksum verification for block-based tables.
* For all users of dictionary compression, we now generate a separate dictionary for compressing each bottom-level SST file. Previously we reused a single dictionary for a whole compaction to bottom level. The new approach achieves better compression ratios; however, it uses more memory and CPU for buffering/sampling data blocks and training dictionaries.

### Public API Change
* Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped.
Expand Down
14 changes: 7 additions & 7 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,18 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict, const bool skip_filters,
const uint64_t creation_time, const uint64_t oldest_key_time) {
const bool skip_filters, const uint64_t creation_time,
const uint64_t oldest_key_time, const bool is_bottommost_level,
const uint64_t target_file_size) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, compression_dict, skip_filters,
column_family_name, level, creation_time,
oldest_key_time),
compression_opts, skip_filters, column_family_name,
level, creation_time, oldest_key_time,
is_bottommost_level, target_file_size),
column_family_id, file);
}

Expand Down Expand Up @@ -128,8 +129,7 @@ Status BuildTable(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,
column_family_name, file_writer.get(), compression, compression_opts,
level, nullptr /* compression_dict */, false /* skip_filters */,
creation_time, oldest_key_time);
level, false /* skip_filters */, creation_time, oldest_key_time);
}

MergeHelper merge(env, internal_comparator.user_comparator(),
Expand Down
6 changes: 2 additions & 4 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class InternalStats;
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown. It must outlive the
// TableBuilder returned by this function.
// @param compression_dict Data for presetting the compression library's
// dictionary, or nullptr.
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& options, const MutableCFOptions& moptions,
const InternalKeyComparator& internal_comparator,
Expand All @@ -50,9 +48,9 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict = nullptr,
const bool skip_filters = false, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0);
const uint64_t oldest_key_time = 0, const bool is_bottommost_level = false,
const uint64_t target_file_size = 0);

// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
Expand Down
113 changes: 4 additions & 109 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ struct CompactionJob::SubcompactionState {
uint64_t overlapped_bytes = 0;
// A flag determine whether the key has been seen in ShouldStopBefore()
bool seen_key = false;
std::string compression_dict;

SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0)
Expand All @@ -173,8 +172,7 @@ struct CompactionJob::SubcompactionState {
approx_size(size),
grandparent_index(0),
overlapped_bytes(0),
seen_key(false),
compression_dict() {
seen_key(false) {
assert(compaction != nullptr);
}

Expand All @@ -197,7 +195,6 @@ struct CompactionJob::SubcompactionState {
grandparent_index = std::move(o.grandparent_index);
overlapped_bytes = std::move(o.overlapped_bytes);
seen_key = std::move(o.seen_key);
compression_dict = std::move(o.compression_dict);
return *this;
}

Expand Down Expand Up @@ -865,42 +862,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}

const MutableCFOptions* mutable_cf_options =
sub_compact->compaction->mutable_cf_options();

// To build compression dictionary, we sample the first output file, assuming
// it'll reach the maximum length. We optionally pass these samples through
// zstd's dictionary trainer, or just use them directly. Then, the dictionary
// is used for compressing subsequent output files in the same subcompaction.
const bool kUseZstdTrainer =
sub_compact->compaction->output_compression_opts().zstd_max_train_bytes >
0;
const size_t kSampleBytes =
kUseZstdTrainer
? sub_compact->compaction->output_compression_opts()
.zstd_max_train_bytes
: sub_compact->compaction->output_compression_opts().max_dict_bytes;
const int kSampleLenShift = 6; // 2^6 = 64-byte samples
std::set<size_t> sample_begin_offsets;
if (bottommost_level_ && kSampleBytes > 0) {
const size_t kMaxSamples = kSampleBytes >> kSampleLenShift;
const size_t kOutFileLen =
static_cast<size_t>(MaxFileSizeForLevel(*mutable_cf_options,
compact_->compaction->output_level(),
cfd->ioptions()->compaction_style,
compact_->compaction->GetInputBaseLevel(),
cfd->ioptions()->level_compaction_dynamic_level_bytes));
if (kOutFileLen != port::kMaxSizet) {
const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift;
Random64 generator{versions_->NewFileNumber()};
for (size_t i = 0; i < kMaxSamples; ++i) {
sample_begin_offsets.insert(
static_cast<size_t>(generator.Uniform(kOutFileNumSamples))
<< kSampleLenShift);
}
}
}

MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
compaction_filter, db_options_.info_log.get(),
Expand Down Expand Up @@ -938,12 +899,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->current_output_file_size);
}
const auto& c_iter_stats = c_iter->iter_stats();
auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
// data_begin_offset and dict_sample_data are only valid while generating
// dictionary from the first output file.
size_t data_begin_offset = 0;
std::string dict_sample_data;
dict_sample_data.reserve(kSampleBytes);

while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
Expand Down Expand Up @@ -979,55 +934,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
key, c_iter->ikey().sequence);
sub_compact->num_output_records++;

if (sub_compact->outputs.size() == 1) { // first output file
// Check if this key/value overlaps any sample intervals; if so, appends
// overlapping portions to the dictionary.
for (const auto& data_elmt : {key, value}) {
size_t data_end_offset = data_begin_offset + data_elmt.size();
while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
*sample_begin_offset_iter < data_end_offset) {
size_t sample_end_offset =
*sample_begin_offset_iter + (1 << kSampleLenShift);
// Invariant: Because we advance sample iterator while processing the
// data_elmt containing the sample's last byte, the current sample
// cannot end before the current data_elmt.
assert(data_begin_offset < sample_end_offset);

size_t data_elmt_copy_offset, data_elmt_copy_len;
if (*sample_begin_offset_iter <= data_begin_offset) {
// The sample starts before data_elmt starts, so take bytes starting
// at the beginning of data_elmt.
data_elmt_copy_offset = 0;
} else {
// data_elmt starts before the sample starts, so take bytes starting
// at the below offset into data_elmt.
data_elmt_copy_offset =
*sample_begin_offset_iter - data_begin_offset;
}
if (sample_end_offset <= data_end_offset) {
// The sample ends before data_elmt ends, so take as many bytes as
// needed.
data_elmt_copy_len =
sample_end_offset - (data_begin_offset + data_elmt_copy_offset);
} else {
// data_elmt ends before the sample ends, so take all remaining
// bytes in data_elmt.
data_elmt_copy_len =
data_end_offset - (data_begin_offset + data_elmt_copy_offset);
}
dict_sample_data.append(&data_elmt.data()[data_elmt_copy_offset],
data_elmt_copy_len);
if (sample_end_offset > data_end_offset) {
// Didn't finish sample. Try to finish it with the next data_elmt.
break;
}
// Next sample may require bytes from same data_elmt.
sample_begin_offset_iter++;
}
data_begin_offset = data_end_offset;
}
}

// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
Expand Down Expand Up @@ -1069,18 +975,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&range_del_out_stats, next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (sub_compact->outputs.size() == 1) {
// Use samples from first output file to create dictionary for
// compression of subsequent files.
if (kUseZstdTrainer) {
sub_compact->compression_dict = ZSTD_TrainDictionary(
dict_sample_data, kSampleLenShift,
sub_compact->compaction->output_compression_opts()
.max_dict_bytes);
} else {
sub_compact->compression_dict = std::move(dict_sample_data);
}
}
}
}

Expand Down Expand Up @@ -1606,8 +1500,9 @@ Status CompactionJob::OpenCompactionOutputFile(
cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(),
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), &sub_compact->compression_dict,
skip_filters, output_file_creation_time));
sub_compact->compaction->output_level(), skip_filters,
output_file_creation_time, 0 /* oldest_key_time */, bottommost_level_,
sub_compact->compaction->max_output_file_size()));
LogFlush(db_options_.info_log);
return s;
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_block_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ TEST_F(DBBlockCacheTest, CompressedCache) {

TEST_F(DBBlockCacheTest, CacheCompressionDict) {
const int kNumFiles = 4;
const int kNumEntriesPerFile = 32;
const int kNumEntriesPerFile = 128;
const int kNumBytesPerEntry = 1024;

// Try all the available libraries that support dictionary compression
Expand Down
93 changes: 80 additions & 13 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1039,17 +1039,17 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
}

TEST_F(DBTest2, PresetCompressionDict) {
// Verifies that compression ratio improves when dictionary is enabled, and
// improves even further when the dictionary is trained by ZSTD.
const size_t kBlockSizeBytes = 4 << 10;
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;
const int kZstdTrainFactor = 16;

Options options;
options.env = CurrentOptions().env; // Make sure to use any custom env that the test is configured with.
options.allow_concurrent_memtable_write = false;
options.arena_block_size = kBlockSizeBytes;
options.compaction_style = kCompactionStyleUniversal;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
Expand Down Expand Up @@ -1091,16 +1091,15 @@ TEST_F(DBTest2, PresetCompressionDict) {
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 1:
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 2:
if (compression_type != kZSTD) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes =
kZstdTrainFactor * kBlockSizeBytes;
options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
break;
default:
assert(false);
Expand All @@ -1110,20 +1109,24 @@ TEST_F(DBTest2, PresetCompressionDict) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
std::string seq_data =
RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
std::string seq_datas[10];
for (int j = 0; j < 10; ++j) {
seq_datas[j] =
RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
}

ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
for (int j = 0; j < kNumL0Files; ++j) {
for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
ASSERT_OK(Put(1, Key(static_cast<int>(
j * (kL0FileBytes / kBlockSizeBytes) + k)),
seq_data));
auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
seq_datas[(key_num / 10) % 10]));
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
}
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
true /* disallow_trivial_move */);
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);

Expand All @@ -1138,7 +1141,7 @@ TEST_F(DBTest2, PresetCompressionDict) {

for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
j++) {
ASSERT_EQ(seq_data, Get(1, Key(static_cast<int>(j))));
ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
}
if (i) {
ASSERT_GT(prev_out_bytes, out_bytes);
Expand All @@ -1149,6 +1152,70 @@ TEST_F(DBTest2, PresetCompressionDict) {
}
}

TEST_F(DBTest2, PresetCompressionDictLocality) {
if (!ZSTD_Supported()) {
return;
}
// Verifies that compression dictionary is generated from local data. The
// verification simply checks all output SSTs have different compression
// dictionaries. We do not verify effectiveness as that'd likely be flaky in
// the future.
const int kNumEntriesPerFile = 1 << 10; // 1KB
const int kNumBytesPerEntry = 1 << 10; // 1KB
const int kNumFiles = 4;
Options options = CurrentOptions();
options.compression = kZSTD;
options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
options.statistics = rocksdb::CreateDBStatistics();
options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);

Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumEntriesPerFile; ++j) {
ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
RandomString(&rnd, kNumBytesPerEntry)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
}

// Store all the dictionaries generated during a full compaction.
std::vector<std::string> compression_dicts;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
[&](void* arg) {
compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions compact_range_opts;
compact_range_opts.bottommost_level_compaction =
BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));

// Dictionary compression should not be so good as to compress four totally
// random files into one. If it does then there's probably something wrong
// with the test.
ASSERT_GT(NumTableFilesAtLevel(1), 1);

// Furthermore, there should be one compression dictionary generated per file.
// And they should all be different from each other.
ASSERT_EQ(NumTableFilesAtLevel(1),
static_cast<int>(compression_dicts.size()));
for (size_t i = 1; i < compression_dicts.size(); ++i) {
std::string& a = compression_dicts[i - 1];
std::string& b = compression_dicts[i];
size_t alen = a.size();
size_t blen = b.size();
ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
}
}

class CompactionCompressionListener : public EventListener {
public:
explicit CompactionCompressionListener(Options* db_options)
Expand Down
Loading