Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic reloading of filesystem cache size #57897

Merged
merged 8 commits into from Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 55 additions & 19 deletions src/Interpreters/Cache/FileCache.cpp
Expand Up @@ -707,7 +707,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
stash_records.emplace(
stash_key, stash->queue->add(locked_key.getKeyMetadata(), offset, 0, *lock));

if (stash->queue->getElementsCount(*lock) > stash->queue->getElementsLimit())
if (stash->queue->getElementsCount(*lock) > stash->queue->getElementsLimit(*lock))
stash->queue->pop(*lock);

result_state = FileSegment::State::DETACHED;
Expand Down Expand Up @@ -748,7 +748,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
LOG_TEST(
log, "Trying to reserve space ({} bytes) for {}:{}, current usage {}/{}",
size, file_segment.key(), file_segment.offset(),
main_priority->getSize(cache_lock), main_priority->getSizeLimit());
main_priority->getSize(cache_lock), main_priority->getSizeLimit(cache_lock));

/// In case of per query cache limit (by default disabled), we add/remove entries from both
/// (main_priority and query_priority) priority queues, but iterate entries in order of query_priority,
Expand All @@ -760,7 +760,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
{
query_priority = &query_context->getPriority();

const bool query_limit_exceeded = query_priority->getSize(cache_lock) + size > query_priority->getSizeLimit();
const bool query_limit_exceeded = query_priority->getSize(cache_lock) + size > query_priority->getSizeLimit(cache_lock);
if (query_limit_exceeded && !query_context->recacheOnFileCacheQueryLimitExceeded())
{
LOG_TEST(log, "Query limit exceeded, space reservation failed, "
Expand All @@ -771,7 +771,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa

LOG_TEST(
log, "Using query limit, current usage: {}/{} (while reserving for {}:{})",
query_priority->getSize(cache_lock), query_priority->getSizeLimit(),
query_priority->getSize(cache_lock), query_priority->getSizeLimit(cache_lock),
file_segment.key(), file_segment.offset());
}

Expand Down Expand Up @@ -1066,9 +1066,11 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir)

bool limits_satisfied;
IFileCachePriority::IteratorPtr cache_it;
size_t size_limit = 0;

{
auto lock = lockCache();
size_limit = main_priority->getSizeLimit(lock);

limits_satisfied = main_priority->canFit(size, lock);
if (limits_satisfied)
Expand Down Expand Up @@ -1118,7 +1120,7 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir)
log,
"Cache capacity changed (max size: {}), "
"cached file `{}` does not fit in cache anymore (size: {})",
main_priority->getSizeLimit(), offset_it->path().string(), size);
size_limit, offset_it->path().string(), size);

fs::remove(offset_it->path());
}
Expand Down Expand Up @@ -1222,7 +1224,8 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,

std::lock_guard lock(apply_settings_mutex);

if (metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit))
if (new_settings.background_download_queue_size_limit != actual_settings.background_download_queue_size_limit
&& metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit))
{
LOG_INFO(log, "Changed background_download_queue_size from {} to {}",
actual_settings.background_download_queue_size_limit,
Expand All @@ -1231,24 +1234,57 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
}

bool updated;
try
{
updated = metadata.setBackgroundDownloadThreads(new_settings.background_download_threads);
}
catch (...)
if (new_settings.background_download_threads != actual_settings.background_download_threads)
{
actual_settings.background_download_threads = metadata.getBackgroundDownloadThreads();
throw;
bool updated = false;
try
{
updated = metadata.setBackgroundDownloadThreads(new_settings.background_download_threads);
}
catch (...)
{
actual_settings.background_download_threads = metadata.getBackgroundDownloadThreads();
throw;
}

if (updated)
{
LOG_INFO(log, "Changed background_download_threads from {} to {}",
actual_settings.background_download_threads,
new_settings.background_download_threads);

actual_settings.background_download_threads = new_settings.background_download_threads;
}
}

if (updated)

if (new_settings.max_size != actual_settings.max_size
|| new_settings.max_elements != actual_settings.max_elements)
{
LOG_INFO(log, "Changed background_download_threads from {} to {}",
actual_settings.background_download_threads,
new_settings.background_download_threads);
auto cache_lock = lockCache();

actual_settings.background_download_threads = new_settings.background_download_threads;
bool updated = false;
try
{
updated = main_priority->modifySizeLimits(
new_settings.max_size, new_settings.max_elements, new_settings.slru_size_ratio, cache_lock);
}
catch (...)
{
actual_settings.max_size = main_priority->getSizeLimit(cache_lock);
actual_settings.max_elements = main_priority->getElementsLimit(cache_lock);
throw;
}

if (updated)
{
LOG_INFO(log, "Changed max_size from {} to {}, max_elements from {} to {}",
actual_settings.max_size, new_settings.max_size,
actual_settings.max_elements, new_settings.max_elements);

actual_settings.max_size = main_priority->getSizeLimit(cache_lock);
actual_settings.max_elements = main_priority->getElementsLimit(cache_lock);
}
}
}

Expand Down
30 changes: 19 additions & 11 deletions src/Interpreters/Cache/FileCacheFactory.cpp
Expand Up @@ -25,6 +25,12 @@ FileCacheSettings FileCacheFactory::FileCacheData::getSettings() const
return settings;
}

void FileCacheFactory::FileCacheData::setSettings(const FileCacheSettings & new_settings)
{
std::lock_guard lock(settings_mutex);
settings = new_settings;
}

FileCacheFactory & FileCacheFactory::instance()
{
static FileCacheFactory ret;
Expand Down Expand Up @@ -100,21 +106,23 @@ void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfig
FileCacheSettings new_settings;
new_settings.loadFromConfig(config, cache_info->config_path);

FileCacheSettings old_settings;
{
std::lock_guard lock(cache_info->settings_mutex);
if (new_settings == cache_info->settings)
continue;
FileCacheSettings old_settings = cache_info->getSettings();
if (old_settings == new_settings)
continue;

old_settings = cache_info->settings;
try
{
cache_info->cache->applySettingsIfPossible(new_settings, old_settings);
}

cache_info->cache->applySettingsIfPossible(new_settings, old_settings);

catch (...)
{
std::lock_guard lock(cache_info->settings_mutex);
cache_info->settings = old_settings;
/// Settings changes could be partially applied in case of exception,
/// make sure cache_info->settings show correct state of applied settings.
cache_info->setSettings(old_settings);
throw;
}

cache_info->setSettings(old_settings);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Cache/FileCacheFactory.h
Expand Up @@ -24,6 +24,8 @@ class FileCacheFactory final : private boost::noncopyable

FileCacheSettings getSettings() const;

void setSettings(const FileCacheSettings & new_settings);

const FileCachePtr cache;
const std::string config_path;

Expand Down
10 changes: 6 additions & 4 deletions src/Interpreters/Cache/IFileCachePriority.h
Expand Up @@ -55,9 +55,9 @@ class IFileCachePriority : private boost::noncopyable

virtual ~IFileCachePriority() = default;

size_t getElementsLimit() const { return max_elements; }
size_t getElementsLimit(const CacheGuard::Lock &) const { return max_elements; }

size_t getSizeLimit() const { return max_size; }
size_t getSizeLimit(const CacheGuard::Lock &) const { return max_size; }

virtual size_t getSize(const CacheGuard::Lock &) const = 0;

Expand Down Expand Up @@ -86,9 +86,11 @@ class IFileCachePriority : private boost::noncopyable
FinalizeEvictionFunc & finalize_eviction_func,
const CacheGuard::Lock &) = 0;

virtual bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CacheGuard::Lock &) = 0;

protected:
const size_t max_size = 0;
const size_t max_elements = 0;
size_t max_size = 0;
size_t max_elements = 0;
};

}
52 changes: 50 additions & 2 deletions src/Interpreters/Cache/LRUFileCachePriority.cpp
Expand Up @@ -16,6 +16,9 @@ namespace ProfileEvents
{
extern const Event FilesystemCacheEvictionSkippedFileSegments;
extern const Event FilesystemCacheEvictionTries;
extern const Event FilesystemCacheEvictMicroseconds;
extern const Event FilesystemCacheEvictedBytes;
extern const Event FilesystemCacheEvictedFileSegments;
}

namespace DB
Expand All @@ -36,7 +39,7 @@ IFileCachePriority::IteratorPtr LRUFileCachePriority::add( /// NOLINT
return std::make_shared<LRUIterator>(add(Entry(key_metadata->key, offset, size, key_metadata), lock));
}

LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock &)
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock & lock)
{
if (entry.size == 0)
{
Expand All @@ -59,7 +62,7 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, cons
}
#endif

const auto & size_limit = getSizeLimit();
const auto & size_limit = getSizeLimit(lock);
if (size_limit && current_size + entry.size > size_limit)
{
throw Exception(
Expand Down Expand Up @@ -288,6 +291,51 @@ std::vector<FileSegmentInfo> LRUFileCachePriority::dump(const CacheGuard::Lock &
return res;
}

bool LRUFileCachePriority::modifySizeLimits(
size_t max_size_, size_t max_elements_, double /* size_ratio_ */, const CacheGuard::Lock & lock)
{
if (max_size == max_size_ && max_elements == max_elements_)
return false; /// Nothing to change.

auto check_limits_satisfied = [&]()
{
return (max_size_ == 0 || current_size <= max_size_)
&& (max_elements_ == 0 || current_elements_num <= max_elements_);
};

if (check_limits_satisfied())
{
max_size = max_size_;
max_elements = max_elements_;
return true;
}

auto iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
chassert(segment_metadata->file_segment->assertCorrectness());

if (!segment_metadata->releasable())
return IterationResult::CONTINUE;

auto segment = segment_metadata->file_segment;
locked_key.removeFileSegment(segment->offset(), segment->lock());

ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedFileSegments);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedBytes, segment->getDownloadedSize());
return IterationResult::REMOVE_AND_CONTINUE;
};

auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::FilesystemCacheEvictMicroseconds);
iterate(
[&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{ return check_limits_satisfied() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata); },
lock);

max_size = max_size_;
max_elements = max_elements_;
return true;
}

void LRUFileCachePriority::LRUIterator::remove(const CacheGuard::Lock & lock)
{
assertValid();
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Cache/LRUFileCachePriority.h
Expand Up @@ -48,6 +48,8 @@ class LRUFileCachePriority final : public IFileCachePriority

void pop(const CacheGuard::Lock & lock) { remove(queue.begin(), lock); }

bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CacheGuard::Lock &) override;

private:
void updateElementsCount(int64_t num);
void updateSize(int64_t size);
Expand Down
7 changes: 3 additions & 4 deletions src/Interpreters/Cache/Metadata.cpp
Expand Up @@ -681,7 +681,7 @@ void CacheMetadata::startup()
download_threads.emplace_back(std::make_shared<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>([this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ cleanupThreadFunc(); }});
cleanup_thread = std::make_unique<ThreadFromGlobalPool>([this]{ cleanupThreadFunc(); });
}

void CacheMetadata::shutdown()
Expand All @@ -708,10 +708,10 @@ bool CacheMetadata::setBackgroundDownloadThreads(size_t threads_num)
if (threads_num == download_threads_num)
return false;

SCOPE_EXIT({ download_threads_num = download_threads.size(); });

if (threads_num > download_threads_num)
{
SCOPE_EXIT({ download_threads_num = download_threads.size(); });

size_t add_threads = threads_num - download_threads_num;
for (size_t i = 0; i < add_threads; ++i)
{
Expand Down Expand Up @@ -739,7 +739,6 @@ bool CacheMetadata::setBackgroundDownloadThreads(size_t threads_num)
}

download_queue->cv.notify_all();
SCOPE_EXIT({ download_threads_num = download_threads.size(); });

for (size_t i = 0; i < remove_threads; ++i)
{
Expand Down
22 changes: 19 additions & 3 deletions src/Interpreters/Cache/SLRUFileCachePriority.cpp
Expand Up @@ -21,14 +21,15 @@ namespace
SLRUFileCachePriority::SLRUFileCachePriority(
size_t max_size_,
size_t max_elements_,
double size_ratio)
double size_ratio_)
: IFileCachePriority(max_size_, max_elements_)
, size_ratio(size_ratio_)
, protected_queue(LRUFileCachePriority(getRatio(max_size_, size_ratio), getRatio(max_elements_, size_ratio)))
, probationary_queue(LRUFileCachePriority(getRatio(max_size_, 1 - size_ratio), getRatio(max_elements_, 1 - size_ratio)))
{
LOG_DEBUG(
log, "Using probationary queue size: {}, protected queue size: {}",
probationary_queue.getSizeLimit(), protected_queue.getSizeLimit());
probationary_queue.max_size, protected_queue.max_elements);
}

size_t SLRUFileCachePriority::getSize(const CacheGuard::Lock & lock) const
Expand Down Expand Up @@ -151,7 +152,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
/// Entry is in probationary queue.
/// We need to move it to protected queue.
const size_t size = iterator.getEntry().size;
if (size > protected_queue.getSizeLimit())
if (size > protected_queue.getSizeLimit(lock))
{
/// Entry size is bigger than the whole protected queue limit.
/// This is only possible if protected_queue_size_limit is less than max_file_segment_size,
Expand Down Expand Up @@ -235,6 +236,21 @@ void SLRUFileCachePriority::shuffle(const CacheGuard::Lock & lock)
probationary_queue.shuffle(lock);
}

bool SLRUFileCachePriority::modifySizeLimits(
size_t max_size_, size_t max_elements_, double size_ratio_, const CacheGuard::Lock & lock)
{
if (max_size == max_size_ && max_elements == max_elements_ && size_ratio == size_ratio_)
return false; /// Nothing to change.

protected_queue.modifySizeLimits(getRatio(max_size_, size_ratio_), getRatio(max_elements_, size_ratio_), 0, lock);
probationary_queue.modifySizeLimits(getRatio(max_size_, 1 - size_ratio_), getRatio(max_elements_, 1 - size_ratio_), 0, lock);

max_size = max_size_;
max_elements = max_elements_;
size_ratio = size_ratio_;
return true;
}

SLRUFileCachePriority::SLRUIterator::SLRUIterator(
SLRUFileCachePriority * cache_priority_,
LRUFileCachePriority::LRUIterator && lru_iterator_,
Expand Down