Skip to content

Commit

Permalink
New BlockBasedTable version -- better compressed block format
Browse files Browse the repository at this point in the history
Summary:
This diff adds BlockBasedTable format_version = 2. New format version brings better compressed block format for these compressions:
1) Zlib -- encode decompressed size in compressed block header
2) BZip2 -- encode decompressed size in compressed block header
3) LZ4 and LZ4HC -- instead of doing memcpy of size_t encode size as varint32. memcpy is very bad because the DB is not portable accross big/little endian machines or even platforms where size_t might be 8 or 4 bytes.

It does not affect format for snappy.

If you write a new database with format_version = 2, it will not be readable by RocksDB versions before 3.10. DB::Open() will return corruption in that case.

Test Plan:
Added a new test in db_test.
I will also run db_bench and verify VSIZE when block_cache == 1GB

Reviewers: yhchiang, rven, MarkCallaghan, dhruba, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D31461
  • Loading branch information
igorcanadi committed Jan 15, 2015
1 parent 2355931 commit 9ab5adf
Show file tree
Hide file tree
Showing 13 changed files with 434 additions and 160 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
Lower numbered levels will be placed earlier in the db_paths and higher
numbered levels will be placed later in the db_paths vector.
* Potentially big performance improvements if you're using RocksDB with lots of column families (100-1000)
* Added BlockBasedTableOptions.format_version option, which allows user to specify which version of block based table he wants. As a general guidline, newer versions have more features, but might not be readable by older versions of RocksDB.
* Added new block based table format (version 2), which you can enable by setting BlockBasedTableOptions.format_version = 2. This format changes how we encode size information in compressed blocks and should help with memory allocations if you're using Zlib or BZip2 compressions.

### Public API changes
* Deprecated skip_log_error_on_recovery option
Expand Down
43 changes: 22 additions & 21 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1219,23 +1219,23 @@ class Benchmark {
name = "Snappy";
break;
case kZlibCompression:
result = Zlib_Compress(Options().compression_opts, text, strlen(text),
&compressed);
result = Zlib_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "Zlib";
break;
case kBZip2Compression:
result = BZip2_Compress(Options().compression_opts, text,
result = BZip2_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "BZip2";
break;
case kLZ4Compression:
result = LZ4_Compress(Options().compression_opts, text, strlen(text),
&compressed);
result = LZ4_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "LZ4";
break;
case kLZ4HCCompression:
result = LZ4HC_Compress(Options().compression_opts, text,
strlen(text), &compressed);
result = LZ4HC_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed);
name = "LZ4HC";
break;
case kNoCompression:
Expand Down Expand Up @@ -1779,19 +1779,19 @@ class Benchmark {
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, input.data(),
ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, input.data(),
ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, input.data(),
ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, input.data(),
ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
default:
Expand Down Expand Up @@ -1825,19 +1825,19 @@ class Benchmark {
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, input.data(), input.size(),
&compressed);
ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, input.data(),
ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, input.data(), input.size(),
&compressed);
ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, input.data(),
ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed);
break;
default:
Expand All @@ -1857,22 +1857,22 @@ class Benchmark {
break;
case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(),
&decompress_size);
&decompress_size, 2);
ok = uncompressed != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size);
&decompress_size, 2);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
&decompress_size);
&decompress_size, 2);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
&decompress_size);
&decompress_size, 2);
ok = uncompressed != nullptr;
break;
default:
Expand Down Expand Up @@ -2031,6 +2031,7 @@ class Benchmark {
block_based_options.block_size = FLAGS_block_size;
block_based_options.block_restart_interval = FLAGS_block_restart_interval;
block_based_options.filter_policy = filter_policy_;
block_based_options.format_version = 2;
options.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));
}
Expand Down
50 changes: 46 additions & 4 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ static bool SnappyCompressionSupported(const CompressionOptions& options) {
static bool ZlibCompressionSupported(const CompressionOptions& options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return Zlib_Compress(options, in.data(), in.size(), &out);
return Zlib_Compress(options, 2, in.data(), in.size(), &out);
}

static bool BZip2CompressionSupported(const CompressionOptions& options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return BZip2_Compress(options, in.data(), in.size(), &out);
return BZip2_Compress(options, 2, in.data(), in.size(), &out);
}

static bool LZ4CompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return LZ4_Compress(options, in.data(), in.size(), &out);
return LZ4_Compress(options, 2, in.data(), in.size(), &out);
}

static bool LZ4HCCompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return LZ4HC_Compress(options, in.data(), in.size(), &out);
return LZ4HC_Compress(options, 2, in.data(), in.size(), &out);
}

static std::string RandomString(Random* rnd, int len) {
Expand Down Expand Up @@ -10170,6 +10170,48 @@ TEST(DBTest, DontDeleteMovedFile) {
Reopen(options);
}

TEST(DBTest, EncodeDecompressedBlockSizeTest) {
// iter 0 -- zlib
// iter 1 -- bzip2
// iter 2 -- lz4
// iter 3 -- lz4HC
CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression};
for (int iter = 0; iter < 4; ++iter) {
// first_table_version 1 -- generate with table_version == 1, read with
// table_version == 2
// first_table_version 2 -- generate with table_version == 2, read with
// table_version == 1
for (int first_table_version = 1; first_table_version <= 2;
++first_table_version) {
BlockBasedTableOptions table_options;
table_options.format_version = first_table_version;
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
Options options = CurrentOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.create_if_missing = true;
options.compression = compressions[iter];
DestroyAndReopen(options);

int kNumKeysWritten = 100000;

Random rnd(301);
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}

table_options.format_version = first_table_version == 1 ? 2 : 1;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
ASSERT_EQ(r.substr(128), std::string(128, 'a'));
}
}
}
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#pragma once

#define ROCKSDB_MAJOR 3
#define ROCKSDB_MINOR 9
#define ROCKSDB_MINOR 10
#define ROCKSDB_PATCH 0

// Do not use these. We made the mistake of declaring macros starting with
Expand Down
30 changes: 20 additions & 10 deletions table/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,11 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
return compressed_size < raw_size - (raw_size / 8u);
}

// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options,
CompressionType* type, std::string* compressed_output) {
CompressionType* type, uint32_t format_version,
std::string* compressed_output) {
if (*type == kNoCompression) {
return raw;
}
Expand All @@ -320,29 +322,37 @@ Slice CompressBlock(const Slice& raw,
}
break; // fall back to no compression.
case kZlibCompression:
if (Zlib_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
if (Zlib_Compress(
compression_options,
GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kBZip2Compression:
if (BZip2_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
if (BZip2_Compress(
compression_options,
GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kLZ4Compression:
if (LZ4_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
if (LZ4_Compress(
compression_options,
GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
case kLZ4HCCompression:
if (LZ4HC_Compress(compression_options, raw.data(), raw.size(),
compressed_output) &&
if (LZ4HC_Compress(
compression_options,
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
Expand Down Expand Up @@ -579,7 +589,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (raw_block_contents.size() < kCompressionSizeLimit) {
block_contents =
CompressBlock(raw_block_contents, r->compression_opts, &type,
&r->compressed_output);
r->table_options.format_version, &r->compressed_output);
} else {
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
type = kNoCompression;
Expand Down
8 changes: 5 additions & 3 deletions table/block_based_table_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
#include <string>
#include <stdint.h>

#include "port/port.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/cache.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "port/port.h"
#include "table/format.h"

namespace rocksdb {

Expand Down Expand Up @@ -76,9 +77,10 @@ Status BlockBasedTableFactory::SanitizeOptions(
return Status::InvalidArgument("Enable cache_index_and_filter_blocks, "
", but block cache is disabled");
}
if (table_options_.format_version > 1) {
if (!BlockBasedTableSupportedVersion(table_options_.format_version)) {
return Status::InvalidArgument(
"We currently only support versions 0 and 1");
"Unsupported BlockBasedTable format_version. Please check "
"include/rocksdb/table.h for more info");
}
return Status::OK();
}
Expand Down
24 changes: 14 additions & 10 deletions table/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
if (!s.ok()) {
return s;
}
if (footer.version() > 1) {
if (!BlockBasedTableSupportedVersion(footer.version())) {
return Status::Corruption(
"Unknown Footer version. Maybe this file was created with too new "
"Unknown Footer version. Maybe this file was created with newer "
"version of RocksDB?");
}

Expand Down Expand Up @@ -605,7 +605,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block) {
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
Expand Down Expand Up @@ -648,7 +648,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
s = UncompressBlockContents(compressed_block->data(),
compressed_block->size(), &contents);
compressed_block->size(), &contents,
format_version);

// Insert uncompressed block into block cache
if (s.ok()) {
Expand All @@ -673,16 +674,16 @@ Status BlockBasedTable::PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics,
CachableEntry<Block>* block, Block* raw_block) {
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version) {
assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr);

Status s;
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
if (raw_block->compression_type() != kNoCompression) {
s = UncompressBlockContents(raw_block->data(), raw_block->size(),
&contents);
s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents,
format_version);
}
if (!s.ok()) {
delete raw_block;
Expand Down Expand Up @@ -929,7 +930,8 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
}

s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
statistics, ro, &block);
statistics, ro, &block,
rep->table_options.format_version);

if (block.value == nullptr && !no_io && ro.fill_cache) {
Block* raw_block = nullptr;
Expand All @@ -942,7 +944,8 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,

if (s.ok()) {
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
ro, statistics, &block, raw_block);
ro, statistics, &block, raw_block,
rep->table_options.format_version);
}
}
}
Expand Down Expand Up @@ -1194,7 +1197,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
Slice ckey;

s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, nullptr,
options, &block);
options, &block,
rep_->table_options.format_version);
assert(s.ok());
bool in_cache = block.value != nullptr;
if (in_cache) {
Expand Down

0 comments on commit 9ab5adf

Please sign in to comment.