Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedicated Mark/Uncompressed cache for skip indices #27961

Merged
merged 2 commits into from Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions programs/local/LocalServer.cpp
Expand Up @@ -544,6 +544,17 @@ void LocalServer::processConfig()
if (mark_cache_size)
global_context->setMarkCache(mark_cache_size);

/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);

/// Size of cache for index marks (index of MergeTree skip indices). It is necessary.
/// Specify default value for index_mark_cache_size explicitly!
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This major performance degradation went unnoticed for two years.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, before this PR, there is no mark/uncompressed cache for secondary indices at all. I don't think it's a degradation.

if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);

/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
if (mmap_cache_size)
Expand Down
11 changes: 11 additions & 0 deletions programs/server/Server.cpp
Expand Up @@ -940,6 +940,17 @@ if (ThreadFuzzer::instance().isEffective())
}
global_context->setMarkCache(mark_cache_size);

/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);

/// Size of cache for index marks (index of MergeTree skip indices). It is necessary.
/// Specify default value for index_mark_cache_size explicitly!
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);

/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
if (mmap_cache_size)
Expand Down
16 changes: 16 additions & 0 deletions src/Interpreters/AsynchronousMetrics.cpp
Expand Up @@ -546,6 +546,22 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
}
}

{
if (auto index_mark_cache = getContext()->getIndexMarkCache())
{
new_values["IndexMarkCacheBytes"] = index_mark_cache->weight();
new_values["IndexMarkCacheFiles"] = index_mark_cache->count();
}
}

{
if (auto index_uncompressed_cache = getContext()->getIndexUncompressedCache())
{
new_values["IndexUncompressedCacheBytes"] = index_uncompressed_cache->weight();
new_values["IndexUncompressedCacheCells"] = index_uncompressed_cache->count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are different metrics for bytes/cells/files but same metric for cache misses/hits.
I think that it worth adding new metrics for hits/misses for these separate caches.
@amosbird what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@azat I plan to test this pull request and add them. Also enable them by default.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@azat Yes, it'll be useful. @kitaisreal Great!

}
}

{
if (auto mmap_cache = getContext()->getMMappedFileCache())
{
Expand Down
58 changes: 58 additions & 0 deletions src/Interpreters/Context.cpp
Expand Up @@ -211,6 +211,8 @@ struct ContextSharedPart
std::unique_ptr<AccessControlManager> access_control_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
Expand Down Expand Up @@ -1573,6 +1575,56 @@ void Context::dropMarkCache() const
}


void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();

if (shared->index_uncompressed_cache)
throw Exception("Index uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);

shared->index_uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}


UncompressedCachePtr Context::getIndexUncompressedCache() const
{
auto lock = getLock();
return shared->index_uncompressed_cache;
}


void Context::dropIndexUncompressedCache() const
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
}


void Context::setIndexMarkCache(size_t cache_size_in_bytes)
{
auto lock = getLock();

if (shared->index_mark_cache)
throw Exception("Index mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);

shared->index_mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes);
}

MarkCachePtr Context::getIndexMarkCache() const
{
auto lock = getLock();
return shared->index_mark_cache;
}

void Context::dropIndexMarkCache() const
{
auto lock = getLock();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}


void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{
auto lock = getLock();
Expand Down Expand Up @@ -1607,6 +1659,12 @@ void Context::dropCaches() const
if (shared->mark_cache)
shared->mark_cache->reset();

if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();

if (shared->index_mark_cache)
shared->index_mark_cache->reset();

if (shared->mmap_cache)
shared->mmap_cache->reset();
}
Expand Down
10 changes: 10 additions & 0 deletions src/Interpreters/Context.h
Expand Up @@ -696,6 +696,16 @@ class Context: public std::enable_shared_from_this<Context>
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;

/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
void dropIndexUncompressedCache() const;

/// Create a cache of index marks of specified size. This can be done only once.
void setIndexMarkCache(size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void dropIndexMarkCache() const;

/// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
void setMMappedFileCache(size_t cache_size_in_num_entries);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;
Expand Down
10 changes: 10 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Expand Up @@ -280,6 +280,14 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropUncompressedCache();
break;
case Type::DROP_INDEX_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropIndexMarkCache();
break;
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropIndexUncompressedCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->dropMMappedFileCache();
Expand Down Expand Up @@ -746,6 +754,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_COMPILED_EXPRESSION_CACHE: [[fallthrough]];
#endif
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break;
Expand Down
2 changes: 2 additions & 0 deletions src/Parsers/ASTSystemQuery.h
Expand Up @@ -24,6 +24,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
DROP_DNS_CACHE,
DROP_MARK_CACHE,
DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,
DROP_INDEX_UNCOMPRESSED_CACHE,
DROP_MMAP_CACHE,
#if USE_EMBEDDED_COMPILER
DROP_COMPILED_EXPRESSION_CACHE,
Expand Down
9 changes: 9 additions & 0 deletions src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Expand Up @@ -849,6 +849,9 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
if (settings.read_overflow_mode_leaf == OverflowMode::THROW && settings.max_rows_to_read_leaf)
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf);

auto mark_cache = context->getIndexMarkCache();
auto uncompressed_cache = context->getIndexUncompressedCache();

auto process_part = [&](size_t part_index)
{
auto & part = parts[part_index];
Expand Down Expand Up @@ -885,6 +888,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
reader_settings,
total_granules,
granules_dropped,
mark_cache.get(),
uncompressed_cache.get(),
log);

index_and_condition.total_granules.fetch_add(total_granules, std::memory_order_relaxed);
Expand Down Expand Up @@ -1424,6 +1429,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
const MergeTreeReaderSettings & reader_settings,
size_t & total_granules,
size_t & granules_dropped,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
Poco::Logger * log)
{
const std::string & path_prefix = part->getFullRelativePath() + index_helper->getFileName();
Expand All @@ -1449,6 +1456,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
index_helper, part,
index_marks_count,
ranges,
mark_cache,
uncompressed_cache,
reader_settings);

MarkRanges res;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeDataSelectExecutor.h
Expand Up @@ -90,6 +90,8 @@ class MergeTreeDataSelectExecutor
const MergeTreeReaderSettings & reader_settings,
size_t & total_granules,
size_t & granules_dropped,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
Poco::Logger * log);

struct PartFilterCounters
Expand Down
21 changes: 18 additions & 3 deletions src/Storages/MergeTree/MergeTreeIndexReader.cpp
Expand Up @@ -11,13 +11,15 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
MergeTreeData::DataPartPtr part,
size_t marks_count,
const MarkRanges & all_mark_ranges,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings)
{
return std::make_unique<MergeTreeReaderStream>(
part->volume->getDisk(),
part->getFullRelativePath() + index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), nullptr, nullptr,
std::move(settings), mark_cache, uncompressed_cache,
part->getFileSizeOrZero(index->getFileName() + extension),
&part->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE);
Expand All @@ -29,14 +31,27 @@ namespace DB
{

MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_,
MergeTreeIndexPtr index_,
MergeTreeData::DataPartPtr part_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings)
: index(index_)
{
const std::string & path_prefix = part_->getFullRelativePath() + index->getFileName();
auto index_format = index->getDeserializedFormat(part_->volume->getDisk(), path_prefix);

stream = makeIndexReader(index_format.extension, index_, part_, marks_count_, all_mark_ranges_, std::move(settings));
stream = makeIndexReader(
index_format.extension,
index_,
part_,
marks_count_,
all_mark_ranges_,
mark_cache,
uncompressed_cache,
std::move(settings));
version = index_format.version;

stream->seekToStart();
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeIndexReader.h
Expand Up @@ -16,6 +16,8 @@ class MergeTreeIndexReader
MergeTreeData::DataPartPtr part_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings);
~MergeTreeIndexReader();

Expand Down