Skip to content

Commit

Permalink
Digest ZSTD compression dictionary once per SST file (facebook#4251)
Browse files Browse the repository at this point in the history
Summary:
In RocksDB, for a given SST file, all data blocks are compressed with the same dictionary. When we compress a block using the dictionary's raw bytes, the compression library first has to digest the dictionary to get it into a usable form. This digestion work is redundant and ideally should be done once per file.

ZSTD offers APIs for the caller to create and reuse a digested dictionary object (`ZSTD_CDict`). In this PR, we call `ZSTD_createCDict` once per file to digest the raw bytes. Then we use `ZSTD_compress_usingCDict` to compress each data block using the pre-digested dictionary. Once the file's created `ZSTD_freeCDict` releases the resources held by the digested dictionary.

There are a couple other changes included in this PR:

- Changed the parameter object for (un)compression functions from `CompressionContext`/`UncompressionContext` to `CompressionInfo`/`UncompressionInfo`. This avoids the previous pattern, where `CompressionContext`/`UncompressionContext` had to be mutated before calling a (un)compression function depending on whether dictionary should be used. I felt that mutation was error-prone so eliminated it.
- Added support for digested uncompression dictionaries (`ZSTD_DDict`) as well. However, this PR does not support reusing them across uncompression calls for the same file. That work is deferred to a later PR when we will store the `ZSTD_DDict` objects in block cache.
Pull Request resolved: facebook#4251

Differential Revision: D9257078

Pulled By: ajkr

fbshipit-source-id: 21b8cb6bbdd48e459f1c62343780ab66c0a64438
  • Loading branch information
ajkr authored and facebook-github-bot committed Aug 24, 2018
1 parent ee234e8 commit 6c40806
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 177 deletions.
77 changes: 40 additions & 37 deletions table/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,27 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
} // namespace

// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
CompressionType* type, uint32_t format_version,
std::string* compressed_output) {
*type = compression_ctx.type();
if (compression_ctx.type() == kNoCompression) {
*type = compression_info.type();
if (compression_info.type() == kNoCompression) {
return raw;
}

// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
switch (compression_ctx.type()) {
switch (compression_info.type()) {
case kSnappyCompression:
if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
if (Snappy_Compress(compression_info, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kZlibCompression:
if (Zlib_Compress(
compression_ctx,
compression_info,
GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
Expand All @@ -132,7 +132,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
break; // fall back to no compression.
case kBZip2Compression:
if (BZip2_Compress(
compression_ctx,
compression_info,
GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
Expand All @@ -141,7 +141,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
break; // fall back to no compression.
case kLZ4Compression:
if (LZ4_Compress(
compression_ctx,
compression_info,
GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
Expand All @@ -150,7 +150,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
break; // fall back to no compression.
case kLZ4HCCompression:
if (LZ4HC_Compress(
compression_ctx,
compression_info,
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
Expand All @@ -166,7 +166,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
break;
case kZSTD:
case kZSTDNotFinalCompression:
if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
if (ZSTD_Compress(compression_info, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
Expand Down Expand Up @@ -260,8 +260,9 @@ struct BlockBasedTableBuilder::Rep {
PartitionedIndexBuilder* p_index_builder_ = nullptr;

std::string last_key;
// Compression dictionary or nullptr
const std::string* compression_dict;
CompressionType compression_type;
CompressionOptions compression_opts;
CompressionDict compression_dict;
CompressionContext compression_ctx;
std::unique_ptr<UncompressionContext> verify_ctx;
TableProperties props;
Expand Down Expand Up @@ -312,8 +313,9 @@ struct BlockBasedTableBuilder::Rep {
table_options.data_block_hash_table_util_ratio),
range_del_block(1 /* block_restart_interval */),
internal_prefix_transform(_moptions.prefix_extractor.get()),
compression_dict(_compression_dict),
compression_ctx(_compression_type, _compression_opts),
compression_type(_compression_type),
compression_opts(_compression_opts),
compression_ctx(_compression_type),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
compressed_cache_key_prefix_size(0),
Expand All @@ -324,6 +326,15 @@ struct BlockBasedTableBuilder::Rep {
column_family_name(_column_family_name),
creation_time(_creation_time),
oldest_key_time(_oldest_key_time) {
if (_compression_dict != nullptr) {
compression_dict.Init(*_compression_dict,
CompressionDict::Mode::kCompression,
_compression_type, _compression_opts.level);
} else {
compression_dict.Init(Slice() /* dict */,
CompressionDict::Mode::kEmpty,
_compression_type, _compression_opts.level);
}
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
Expand Down Expand Up @@ -354,7 +365,7 @@ struct BlockBasedTableBuilder::Rep {
_moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) {
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
compression_ctx.type()));
compression_type));
}
}

Expand Down Expand Up @@ -498,32 +509,20 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
assert(ok());
Rep* r = rep_;

auto type = r->compression_ctx.type();
auto type = r->compression_type;
Slice block_contents;
bool abort_compression = false;

StopWatchNano timer(r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));

if (raw_block_contents.size() < kCompressionSizeLimit) {
Slice compression_dict;
if (is_data_block && r->compression_dict && r->compression_dict->size()) {
r->compression_ctx.dict() = *r->compression_dict;
if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = *r->compression_dict;
}
} else {
// Clear dictionary
r->compression_ctx.dict() = Slice();
if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = Slice();
}
}

CompressionInfo compression_info(
r->compression_opts, r->compression_ctx,
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(),
r->compression_type);
block_contents =
CompressBlock(raw_block_contents, r->compression_ctx, &type,
CompressBlock(raw_block_contents, compression_info, &type,
r->table_options.format_version, &r->compressed_output);

// Some of the compression algorithms are known to be unreliable. If
Expand All @@ -532,8 +531,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
UncompressionInfo uncompression_info(
*r->verify_ctx,
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(),
r->compression_type);
Status stat = UncompressBlockContentsForCompressionType(
*r->verify_ctx, block_contents.data(), block_contents.size(),
uncompression_info, block_contents.data(), block_contents.size(),
&contents, r->table_options.format_version, r->ioptions);

if (stat.ok()) {
Expand Down Expand Up @@ -780,7 +783,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
? rep_->ioptions.merge_operator->Name()
: "nullptr";
rep_->props.compression_name =
CompressionTypeToString(rep_->compression_ctx.type());
CompressionTypeToString(rep_->compression_type);
rep_->props.prefix_extractor_name =
rep_->moptions.prefix_extractor != nullptr
? rep_->moptions.prefix_extractor->Name()
Expand Down Expand Up @@ -829,10 +832,10 @@ void BlockBasedTableBuilder::WritePropertiesBlock(

void BlockBasedTableBuilder::WriteCompressionDictBlock(
MetaIndexBuilder* meta_index_builder) {
if (rep_->compression_dict && rep_->compression_dict->size()) {
if (rep_->compression_dict.GetRawDict().size()) {
BlockHandle compression_dict_block_handle;
if (ok()) {
WriteRawBlock(*rep_->compression_dict, kNoCompression,
WriteRawBlock(rep_->compression_dict.GetRawDict(), kNoCompression,
&compression_dict_block_handle);
}
if (ok()) {
Expand Down
2 changes: 1 addition & 1 deletion table/block_based_table_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class BlockBasedTableBuilder : public TableBuilder {
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
};

Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
CompressionType* type, uint32_t format_version,
std::string* compressed_output);

Expand Down
21 changes: 13 additions & 8 deletions table/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1226,9 +1226,12 @@ Status BlockBasedTable::GetDataBlockFromCache(

// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
UncompressionContext context(compressed_block->compression_type());
CompressionDict dict;
dict.Init(compression_dict, CompressionDict::Mode::kUncompression,
compressed_block->compression_type());
UncompressionInfo info(context, dict, compressed_block->compression_type());
s = UncompressBlockContents(info, compressed_block->data(),
compressed_block->size(), &contents,
format_version, ioptions);

Expand Down Expand Up @@ -1301,11 +1304,13 @@ Status BlockBasedTable::PutDataBlockToCache(
BlockContents contents;
Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) {
UncompressionContext uncompression_ctx(raw_block->compression_type(),
compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents, format_version,
ioptions);
UncompressionContext context(raw_block->compression_type());
CompressionDict dict;
dict.Init(compression_dict, CompressionDict::Mode::kUncompression,
raw_block->compression_type());
UncompressionInfo info(context, dict, raw_block->compression_type());
s = UncompressBlockContents(info, raw_block->data(), raw_block->size(),
&contents, format_version, ioptions);
}
if (!s.ok()) {
delete raw_block;
Expand Down
11 changes: 7 additions & 4 deletions table/block_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,13 @@ Status BlockFetcher::ReadBlockContents() {

if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
status_ =
UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_);
UncompressionContext context(compression_type);
CompressionDict dict;
dict.Init(compression_dict_, CompressionDict::Mode::kUncompression,
compression_type);
UncompressionInfo info(context, dict, compression_type);
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_);
} else {
GetBlockContents();
}
Expand Down
21 changes: 11 additions & 10 deletions table/format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,18 +284,18 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
}

Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
const UncompressionInfo& uncompression_info, const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions) {
std::unique_ptr<char[]> ubuf;

assert(uncompression_ctx.type() != kNoCompression &&
assert(uncompression_info.type() != kNoCompression &&
"Invalid compression type");

StopWatchNano timer(ioptions.env,
ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
int decompress_size = 0;
switch (uncompression_ctx.type()) {
switch (uncompression_info.type()) {
case kSnappyCompression: {
size_t ulength = 0;
static char snappy_corrupt_msg[] =
Expand All @@ -312,7 +312,7 @@ Status UncompressBlockContentsForCompressionType(
}
case kZlibCompression:
ubuf.reset(Zlib_Uncompress(
uncompression_ctx, data, n, &decompress_size,
uncompression_info, data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version)));
if (!ubuf) {
static char zlib_corrupt_msg[] =
Expand All @@ -336,7 +336,7 @@ Status UncompressBlockContentsForCompressionType(
break;
case kLZ4Compression:
ubuf.reset(LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
uncompression_info, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version)));
if (!ubuf) {
static char lz4_corrupt_msg[] =
Expand All @@ -348,7 +348,7 @@ Status UncompressBlockContentsForCompressionType(
break;
case kLZ4HCCompression:
ubuf.reset(LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
uncompression_info, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
if (!ubuf) {
static char lz4hc_corrupt_msg[] =
Expand All @@ -370,7 +370,8 @@ Status UncompressBlockContentsForCompressionType(
break;
case kZSTD:
case kZSTDNotFinalCompression:
ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
ubuf.reset(
ZSTD_Uncompress(uncompression_info, data, n, &decompress_size));
if (!ubuf) {
static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents";
Expand Down Expand Up @@ -400,14 +401,14 @@ Status UncompressBlockContentsForCompressionType(
// buffer is returned via 'result' and it is upto the caller to
// free this buffer.
// format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
Status UncompressBlockContents(const UncompressionInfo& uncompression_info,
const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions) {
assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type());
assert(data[n] == uncompression_info.type());
return UncompressBlockContentsForCompressionType(
uncompression_ctx, data, n, contents, format_version, ioptions);
uncompression_info, data, n, contents, format_version, ioptions);
}

} // namespace rocksdb
11 changes: 6 additions & 5 deletions table/format.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,17 @@ extern Status ReadBlockContents(
// free this buffer.
// For description of compress_format_version and possible values, see
// util/compression.h
extern Status UncompressBlockContents(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions);
extern Status UncompressBlockContents(const UncompressionInfo& info,
const char* data, size_t n,
BlockContents* contents,
uint32_t compress_format_version,
const ImmutableCFOptions& ioptions);

// This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks
// with no compression header.
extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
const UncompressionInfo& info, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions);

Expand Down
Loading

0 comments on commit 6c40806

Please sign in to comment.