Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Builders for partition filter #1952

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/rocksdb/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ struct BlockBasedTableOptions {
// i.e., the number of data blocks covered by each index partition
uint64_t index_per_partition = 1024;

// Note: currently this option requires kTwoLevelIndexSearch to be set as
// well.
// TODO(myabandeh): remove the note above once the limitation is lifted
// TODO(myabandeh): this feature is in experimental phase and shall not be
// used in production; either remove the feature or remove this comment if
// it is ready to be used in production.
// Use partitioned full filters for each SST file
bool partition_filters = false;

// Use delta encoding to compress keys in blocks.
// ReadOptions::pin_data requires this option to be disabled.
//
Expand Down
5 changes: 4 additions & 1 deletion table/block_based_filter_block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ inline void BlockBasedFilterBlockBuilder::AddPrefix(const Slice& key) {
}
}

Slice BlockBasedFilterBlockBuilder::Finish() {
Slice BlockBasedFilterBlockBuilder::Finish(const BlockHandle& tmp,
Status* status) {
// In this impl we ignore BlockHandle
*status = Status::OK();
if (!start_.empty()) {
GenerateFilter();
}
Expand Down
3 changes: 2 additions & 1 deletion table/block_based_filter_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class BlockBasedFilterBlockBuilder : public FilterBlockBuilder {
virtual bool IsBlockBased() override { return true; }
virtual void StartBlock(uint64_t block_offset) override;
virtual void Add(const Slice& key) override;
virtual Slice Finish() override;
virtual Slice Finish(const BlockHandle& tmp, Status* status) override;
using FilterBlockBuilder::Finish;

private:
void AddKey(const Slice& key);
Expand Down
99 changes: 65 additions & 34 deletions table/block_based_table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
#include "table/filter_block.h"
#include "table/format.h"
#include "table/full_filter_block.h"
#include "table/index_builder.h"
#include "table/meta_blocks.h"
#include "table/partitioned_filter_block.h"
#include "table/table_builder.h"

#include "util/string_util.h"
Expand All @@ -50,6 +48,9 @@
#include "util/stop_watch.h"
#include "util/xxhash.h"

#include "table/index_builder.h"
#include "table/partitioned_filter_block.h"

namespace rocksdb {

extern const std::string kHashIndexPrefixesBlock;
Expand All @@ -60,19 +61,28 @@ typedef BlockBasedTableOptions::IndexType IndexType;
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {

// Create a index builder based on its type.
FilterBlockBuilder* CreateFilterBlockBuilder(const ImmutableCFOptions& opt,
const BlockBasedTableOptions& table_opt) {
// Create a filter block builder based on its type.
FilterBlockBuilder* CreateFilterBlockBuilder(
const ImmutableCFOptions& opt, const BlockBasedTableOptions& table_opt,
PartitionedIndexBuilder* const p_index_builder) {
if (table_opt.filter_policy == nullptr) return nullptr;

FilterBitsBuilder* filter_bits_builder =
table_opt.filter_policy->GetFilterBitsBuilder();
if (filter_bits_builder == nullptr) {
return new BlockBasedFilterBlockBuilder(opt.prefix_extractor, table_opt);
} else {
return new FullFilterBlockBuilder(opt.prefix_extractor,
table_opt.whole_key_filtering,
filter_bits_builder);
if (table_opt.partition_filters) {
assert(p_index_builder != nullptr);
return new PartitionedFilterBlockBuilder(
opt.prefix_extractor, table_opt.whole_key_filtering,
filter_bits_builder, table_opt.index_block_restart_interval,
p_index_builder);
} else {
return new FullFilterBlockBuilder(opt.prefix_extractor,
table_opt.whole_key_filtering,
filter_bits_builder);
}
}
}

Expand Down Expand Up @@ -246,7 +256,7 @@ struct BlockBasedTableBuilder::Rep {
TableProperties props;

bool closed = false; // Either Finish() or Abandon() has been called.
std::unique_ptr<FilterBlockBuilder> filter_block;
std::unique_ptr<FilterBlockBuilder> filter_builder;
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;

Expand Down Expand Up @@ -277,21 +287,32 @@ struct BlockBasedTableBuilder::Rep {
table_options.use_delta_encoding),
range_del_block(1), // TODO(andrewkr): restart_interval unnecessary
internal_prefix_transform(_ioptions.prefix_extractor),
index_builder(IndexBuilder::CreateIndexBuilder(
table_options.index_type, &internal_comparator,
&this->internal_prefix_transform,
table_options.index_block_restart_interval,
table_options.index_per_partition)),
compression_type(_compression_type),
compression_opts(_compression_opts),
compression_dict(_compression_dict),
filter_block(skip_filters ? nullptr : CreateFilterBlockBuilder(
_ioptions, table_options)),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
table_options, data_block)),
column_family_id(_column_family_id),
column_family_name(_column_family_name) {
PartitionedIndexBuilder* p_index_builder = nullptr;
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder = PartitionedIndexBuilder::CreateIndexBuilder(
&internal_comparator, table_options);
index_builder.reset(p_index_builder);
} else {
index_builder.reset(IndexBuilder::CreateIndexBuilder(
table_options.index_type, &internal_comparator,
&this->internal_prefix_transform, table_options));
}
if (skip_filters) {
filter_builder = nullptr;
} else {
filter_builder.reset(
CreateFilterBlockBuilder(_ioptions, table_options, p_index_builder));
}

for (auto& collector_factories : *int_tbl_prop_collector_factories) {
table_properties_collectors.emplace_back(
collector_factories->CreateIntTblPropCollector(column_family_id));
Expand Down Expand Up @@ -330,8 +351,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
compression_type, compression_opts, compression_dict,
skip_filters, column_family_name);

if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
}
if (table_options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix(
Expand Down Expand Up @@ -374,8 +395,10 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
}
}

if (r->filter_block != nullptr) {
r->filter_block->Add(ExtractUserKey(key));
// Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder.
if (r->filter_builder != nullptr) {
r->filter_builder->Add(ExtractUserKey(key));
}

r->last_key.assign(key.data(), key.size());
Expand Down Expand Up @@ -409,8 +432,8 @@ void BlockBasedTableBuilder::Flush() {
if (!ok()) return;
if (r->data_block.empty()) return;
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
Expand Down Expand Up @@ -600,15 +623,6 @@ Status BlockBasedTableBuilder::Finish() {
assert(!r->closed);
r->closed = true;

BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle,
compression_dict_block_handle, range_del_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
auto filter_contents = r->filter_block->Finish();
r->props.filter_size = filter_contents.size();
WriteRawBlock(filter_contents, kNoCompression, &filter_block_handle);
}

// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries here and flush them
// to storage after metaindex block is written.
Expand All @@ -617,6 +631,19 @@ Status BlockBasedTableBuilder::Finish() {
&r->last_key, nullptr /* no next data block */, r->pending_handle);
}

BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle,
compression_dict_block_handle, range_del_block_handle;
// Write filter block
if (ok() && r->filter_builder != nullptr) {
Status s = Status::Incomplete();
while (s.IsIncomplete()) {
Slice filter_content = r->filter_builder->Finish(filter_block_handle, &s);
assert(s.ok() || s.IsIncomplete());
r->props.filter_size += filter_content.size();
WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
}
}

IndexBuilder::IndexBlocks index_blocks;
auto index_builder_status = r->index_builder->Finish(&index_blocks);
if (index_builder_status.IsIncomplete()) {
Expand All @@ -643,14 +670,16 @@ Status BlockBasedTableBuilder::Finish() {
}

if (ok()) {
if (r->filter_block != nullptr) {
if (r->filter_builder != nullptr) {
// Add mapping from "<filter_block_prefix>.Name" to location
// of filter data.
std::string key;
if (r->filter_block->IsBlockBased()) {
if (r->filter_builder->IsBlockBased()) {
key = BlockBasedTable::kFilterBlockPrefix;
} else {
key = BlockBasedTable::kFullFilterBlockPrefix;
key = r->table_options.partition_filters
? BlockBasedTable::kPartitionedFilterBlockPrefix
: BlockBasedTable::kFullFilterBlockPrefix;
}
key.append(r->table_options.filter_policy->Name());
meta_index_builder.Add(key, filter_block_handle);
Expand Down Expand Up @@ -810,4 +839,6 @@ TableProperties BlockBasedTableBuilder::GetTableProperties() const {

const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
"partitionedfilter.";
} // namespace rocksdb
1 change: 1 addition & 0 deletions table/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class BlockBasedTable : public TableReader {
public:
static const std::string kFilterBlockPrefix;
static const std::string kFullFilterBlockPrefix;
static const std::string kPartitionedFilterBlockPrefix;
// The longest prefix of the cache key used to identify blocks.
// For Posix files the unique ID is three varints.
static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length * 3 + 1;
Expand Down
9 changes: 8 additions & 1 deletion table/filter_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ class FilterBlockBuilder {
virtual bool IsBlockBased() = 0; // If is blockbased filter
virtual void StartBlock(uint64_t block_offset) = 0; // Start new block filter
virtual void Add(const Slice& key) = 0; // Add a key to current filter
virtual Slice Finish() = 0; // Generate Filter
Slice Finish() { // Generate Filter
const BlockHandle empty_handle;
Status dont_care_status;
auto ret = Finish(empty_handle, &dont_care_status);
assert(dont_care_status.ok());
return ret;
}
virtual Slice Finish(const BlockHandle& tmp, Status* status) = 0;

private:
// No copying allowed
Expand Down
9 changes: 5 additions & 4 deletions table/full_filter_block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ inline void FullFilterBlockBuilder::AddKey(const Slice& key) {
// Add prefix to filter if needed
inline void FullFilterBlockBuilder::AddPrefix(const Slice& key) {
Slice prefix = prefix_extractor_->Transform(key);
filter_bits_builder_->AddKey(prefix);
num_added_++;
AddKey(prefix);
}

Slice FullFilterBlockBuilder::Finish() {
Slice FullFilterBlockBuilder::Finish(const BlockHandle& tmp, Status* status) {
// In this impl we ignore BlockHandle
*status = Status::OK();
if (num_added_ != 0) {
num_added_ = 0;
return filter_bits_builder_->Finish(&filter_data_);
Expand Down Expand Up @@ -73,7 +74,7 @@ FullFilterBlockReader::FullFilterBlockReader(
}

bool FullFilterBlockReader::KeyMayMatch(const Slice& key,
uint64_t block_offset) {
uint64_t block_offset) {
assert(block_offset == kNotValid);
if (!whole_key_filtering_) {
return true;
Expand Down
15 changes: 8 additions & 7 deletions table/full_filter_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
virtual bool IsBlockBased() override { return false; }
virtual void StartBlock(uint64_t block_offset) override {}
virtual void Add(const Slice& key) override;
virtual Slice Finish() override;
virtual Slice Finish(const BlockHandle& tmp, Status* status) override;
using FilterBlockBuilder::Finish;

protected:
virtual void AddKey(const Slice& key);
std::unique_ptr<FilterBitsBuilder> filter_bits_builder_;

private:
// important: all of these might point to invalid addresses
Expand All @@ -55,10 +60,8 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
bool whole_key_filtering_;

uint32_t num_added_;
std::unique_ptr<FilterBitsBuilder> filter_bits_builder_;
std::unique_ptr<const char[]> filter_data_;

void AddKey(const Slice& key);
void AddPrefix(const Slice& key);

// No copying allowed
Expand Down Expand Up @@ -96,16 +99,14 @@ class FullFilterBlockReader : public FilterBlockReader {

private:
const SliceTransform* prefix_extractor_;

std::unique_ptr<FilterBitsReader> filter_bits_reader_;
Slice contents_;
std::unique_ptr<FilterBitsReader> filter_bits_reader_;
BlockContents block_contents_;
std::unique_ptr<const char[]> filter_data_;

bool MayMatch(const Slice& entry);

// No copying allowed
FullFilterBlockReader(const FullFilterBlockReader&);
bool MayMatch(const Slice& entry);
void operator=(const FullFilterBlockReader&);
};

Expand Down
Loading