Skip to content

Commit

Permalink
Merge pull request #57897 from ClickHouse/allow-to-dynamically-change…
Browse files Browse the repository at this point in the history
…-fs-cache-size

Support dynamic reloading of filesystem cache size
  • Loading branch information
kssenii committed Dec 19, 2023
2 parents 952175c + 06a2e86 commit d4e71f9
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 44 deletions.
74 changes: 55 additions & 19 deletions src/Interpreters/Cache/FileCache.cpp
Expand Up @@ -707,7 +707,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
stash_records.emplace(
stash_key, stash->queue->add(locked_key.getKeyMetadata(), offset, 0, *lock));

if (stash->queue->getElementsCount(*lock) > stash->queue->getElementsLimit())
if (stash->queue->getElementsCount(*lock) > stash->queue->getElementsLimit(*lock))
stash->queue->pop(*lock);

result_state = FileSegment::State::DETACHED;
Expand Down Expand Up @@ -748,7 +748,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
LOG_TEST(
log, "Trying to reserve space ({} bytes) for {}:{}, current usage {}/{}",
size, file_segment.key(), file_segment.offset(),
main_priority->getSize(cache_lock), main_priority->getSizeLimit());
main_priority->getSize(cache_lock), main_priority->getSizeLimit(cache_lock));

/// In case of per query cache limit (by default disabled), we add/remove entries from both
/// (main_priority and query_priority) priority queues, but iterate entries in order of query_priority,
Expand All @@ -760,7 +760,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
{
query_priority = &query_context->getPriority();

const bool query_limit_exceeded = query_priority->getSize(cache_lock) + size > query_priority->getSizeLimit();
const bool query_limit_exceeded = query_priority->getSize(cache_lock) + size > query_priority->getSizeLimit(cache_lock);
if (query_limit_exceeded && !query_context->recacheOnFileCacheQueryLimitExceeded())
{
LOG_TEST(log, "Query limit exceeded, space reservation failed, "
Expand All @@ -771,7 +771,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa

LOG_TEST(
log, "Using query limit, current usage: {}/{} (while reserving for {}:{})",
query_priority->getSize(cache_lock), query_priority->getSizeLimit(),
query_priority->getSize(cache_lock), query_priority->getSizeLimit(cache_lock),
file_segment.key(), file_segment.offset());
}

Expand Down Expand Up @@ -1066,9 +1066,11 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir)

bool limits_satisfied;
IFileCachePriority::IteratorPtr cache_it;
size_t size_limit = 0;

{
auto lock = lockCache();
size_limit = main_priority->getSizeLimit(lock);

limits_satisfied = main_priority->canFit(size, lock);
if (limits_satisfied)
Expand Down Expand Up @@ -1118,7 +1120,7 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir)
log,
"Cache capacity changed (max size: {}), "
"cached file `{}` does not fit in cache anymore (size: {})",
main_priority->getSizeLimit(), offset_it->path().string(), size);
size_limit, offset_it->path().string(), size);

fs::remove(offset_it->path());
}
Expand Down Expand Up @@ -1222,7 +1224,8 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,

std::lock_guard lock(apply_settings_mutex);

if (metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit))
if (new_settings.background_download_queue_size_limit != actual_settings.background_download_queue_size_limit
&& metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit))
{
LOG_INFO(log, "Changed background_download_queue_size from {} to {}",
actual_settings.background_download_queue_size_limit,
Expand All @@ -1231,24 +1234,57 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
}

bool updated;
try
{
updated = metadata.setBackgroundDownloadThreads(new_settings.background_download_threads);
}
catch (...)
if (new_settings.background_download_threads != actual_settings.background_download_threads)
{
actual_settings.background_download_threads = metadata.getBackgroundDownloadThreads();
throw;
bool updated = false;
try
{
updated = metadata.setBackgroundDownloadThreads(new_settings.background_download_threads);
}
catch (...)
{
actual_settings.background_download_threads = metadata.getBackgroundDownloadThreads();
throw;
}

if (updated)
{
LOG_INFO(log, "Changed background_download_threads from {} to {}",
actual_settings.background_download_threads,
new_settings.background_download_threads);

actual_settings.background_download_threads = new_settings.background_download_threads;
}
}

if (updated)

if (new_settings.max_size != actual_settings.max_size
|| new_settings.max_elements != actual_settings.max_elements)
{
LOG_INFO(log, "Changed background_download_threads from {} to {}",
actual_settings.background_download_threads,
new_settings.background_download_threads);
auto cache_lock = lockCache();

actual_settings.background_download_threads = new_settings.background_download_threads;
bool updated = false;
try
{
updated = main_priority->modifySizeLimits(
new_settings.max_size, new_settings.max_elements, new_settings.slru_size_ratio, cache_lock);
}
catch (...)
{
actual_settings.max_size = main_priority->getSizeLimit(cache_lock);
actual_settings.max_elements = main_priority->getElementsLimit(cache_lock);
throw;
}

if (updated)
{
LOG_INFO(log, "Changed max_size from {} to {}, max_elements from {} to {}",
actual_settings.max_size, new_settings.max_size,
actual_settings.max_elements, new_settings.max_elements);

actual_settings.max_size = main_priority->getSizeLimit(cache_lock);
actual_settings.max_elements = main_priority->getElementsLimit(cache_lock);
}
}
}

Expand Down
30 changes: 19 additions & 11 deletions src/Interpreters/Cache/FileCacheFactory.cpp
Expand Up @@ -25,6 +25,12 @@ FileCacheSettings FileCacheFactory::FileCacheData::getSettings() const
return settings;
}

void FileCacheFactory::FileCacheData::setSettings(const FileCacheSettings & new_settings)
{
std::lock_guard lock(settings_mutex);
settings = new_settings;
}

FileCacheFactory & FileCacheFactory::instance()
{
static FileCacheFactory ret;
Expand Down Expand Up @@ -100,21 +106,23 @@ void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfig
FileCacheSettings new_settings;
new_settings.loadFromConfig(config, cache_info->config_path);

FileCacheSettings old_settings;
{
std::lock_guard lock(cache_info->settings_mutex);
if (new_settings == cache_info->settings)
continue;
FileCacheSettings old_settings = cache_info->getSettings();
if (old_settings == new_settings)
continue;

old_settings = cache_info->settings;
try
{
cache_info->cache->applySettingsIfPossible(new_settings, old_settings);
}

cache_info->cache->applySettingsIfPossible(new_settings, old_settings);

catch (...)
{
std::lock_guard lock(cache_info->settings_mutex);
cache_info->settings = old_settings;
/// Settings changes could be partially applied in case of exception,
/// make sure cache_info->settings show correct state of applied settings.
cache_info->setSettings(old_settings);
throw;
}

cache_info->setSettings(old_settings);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Cache/FileCacheFactory.h
Expand Up @@ -24,6 +24,8 @@ class FileCacheFactory final : private boost::noncopyable

FileCacheSettings getSettings() const;

void setSettings(const FileCacheSettings & new_settings);

const FileCachePtr cache;
const std::string config_path;

Expand Down
10 changes: 6 additions & 4 deletions src/Interpreters/Cache/IFileCachePriority.h
Expand Up @@ -55,9 +55,9 @@ class IFileCachePriority : private boost::noncopyable

virtual ~IFileCachePriority() = default;

size_t getElementsLimit() const { return max_elements; }
size_t getElementsLimit(const CacheGuard::Lock &) const { return max_elements; }

size_t getSizeLimit() const { return max_size; }
size_t getSizeLimit(const CacheGuard::Lock &) const { return max_size; }

virtual size_t getSize(const CacheGuard::Lock &) const = 0;

Expand Down Expand Up @@ -86,9 +86,11 @@ class IFileCachePriority : private boost::noncopyable
FinalizeEvictionFunc & finalize_eviction_func,
const CacheGuard::Lock &) = 0;

virtual bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CacheGuard::Lock &) = 0;

protected:
const size_t max_size = 0;
const size_t max_elements = 0;
size_t max_size = 0;
size_t max_elements = 0;
};

}
52 changes: 50 additions & 2 deletions src/Interpreters/Cache/LRUFileCachePriority.cpp
Expand Up @@ -16,6 +16,9 @@ namespace ProfileEvents
{
extern const Event FilesystemCacheEvictionSkippedFileSegments;
extern const Event FilesystemCacheEvictionTries;
extern const Event FilesystemCacheEvictMicroseconds;
extern const Event FilesystemCacheEvictedBytes;
extern const Event FilesystemCacheEvictedFileSegments;
}

namespace DB
Expand All @@ -36,7 +39,7 @@ IFileCachePriority::IteratorPtr LRUFileCachePriority::add( /// NOLINT
return std::make_shared<LRUIterator>(add(Entry(key_metadata->key, offset, size, key_metadata), lock));
}

LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock &)
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock & lock)
{
if (entry.size == 0)
{
Expand All @@ -59,7 +62,7 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, cons
}
#endif

const auto & size_limit = getSizeLimit();
const auto & size_limit = getSizeLimit(lock);
if (size_limit && current_size + entry.size > size_limit)
{
throw Exception(
Expand Down Expand Up @@ -288,6 +291,51 @@ std::vector<FileSegmentInfo> LRUFileCachePriority::dump(const CacheGuard::Lock &
return res;
}

bool LRUFileCachePriority::modifySizeLimits(
size_t max_size_, size_t max_elements_, double /* size_ratio_ */, const CacheGuard::Lock & lock)
{
if (max_size == max_size_ && max_elements == max_elements_)
return false; /// Nothing to change.

auto check_limits_satisfied = [&]()
{
return (max_size_ == 0 || current_size <= max_size_)
&& (max_elements_ == 0 || current_elements_num <= max_elements_);
};

if (check_limits_satisfied())
{
max_size = max_size_;
max_elements = max_elements_;
return true;
}

auto iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
chassert(segment_metadata->file_segment->assertCorrectness());

if (!segment_metadata->releasable())
return IterationResult::CONTINUE;

auto segment = segment_metadata->file_segment;
locked_key.removeFileSegment(segment->offset(), segment->lock());

ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedFileSegments);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedBytes, segment->getDownloadedSize());
return IterationResult::REMOVE_AND_CONTINUE;
};

auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::FilesystemCacheEvictMicroseconds);
iterate(
[&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{ return check_limits_satisfied() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata); },
lock);

max_size = max_size_;
max_elements = max_elements_;
return true;
}

void LRUFileCachePriority::LRUIterator::remove(const CacheGuard::Lock & lock)
{
assertValid();
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Cache/LRUFileCachePriority.h
Expand Up @@ -48,6 +48,8 @@ class LRUFileCachePriority final : public IFileCachePriority

void pop(const CacheGuard::Lock & lock) { remove(queue.begin(), lock); }

bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CacheGuard::Lock &) override;

private:
void updateElementsCount(int64_t num);
void updateSize(int64_t size);
Expand Down
7 changes: 3 additions & 4 deletions src/Interpreters/Cache/Metadata.cpp
Expand Up @@ -687,7 +687,7 @@ void CacheMetadata::startup()
download_threads.emplace_back(std::make_shared<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>([this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ cleanupThreadFunc(); }});
cleanup_thread = std::make_unique<ThreadFromGlobalPool>([this]{ cleanupThreadFunc(); });
}

void CacheMetadata::shutdown()
Expand All @@ -714,10 +714,10 @@ bool CacheMetadata::setBackgroundDownloadThreads(size_t threads_num)
if (threads_num == download_threads_num)
return false;

SCOPE_EXIT({ download_threads_num = download_threads.size(); });

if (threads_num > download_threads_num)
{
SCOPE_EXIT({ download_threads_num = download_threads.size(); });

size_t add_threads = threads_num - download_threads_num;
for (size_t i = 0; i < add_threads; ++i)
{
Expand Down Expand Up @@ -745,7 +745,6 @@ bool CacheMetadata::setBackgroundDownloadThreads(size_t threads_num)
}

download_queue->cv.notify_all();
SCOPE_EXIT({ download_threads_num = download_threads.size(); });

for (size_t i = 0; i < remove_threads; ++i)
{
Expand Down
22 changes: 19 additions & 3 deletions src/Interpreters/Cache/SLRUFileCachePriority.cpp
Expand Up @@ -21,14 +21,15 @@ namespace
SLRUFileCachePriority::SLRUFileCachePriority(
size_t max_size_,
size_t max_elements_,
double size_ratio)
double size_ratio_)
: IFileCachePriority(max_size_, max_elements_)
, size_ratio(size_ratio_)
, protected_queue(LRUFileCachePriority(getRatio(max_size_, size_ratio), getRatio(max_elements_, size_ratio)))
, probationary_queue(LRUFileCachePriority(getRatio(max_size_, 1 - size_ratio), getRatio(max_elements_, 1 - size_ratio)))
{
LOG_DEBUG(
log, "Using probationary queue size: {}, protected queue size: {}",
probationary_queue.getSizeLimit(), protected_queue.getSizeLimit());
probationary_queue.max_size, protected_queue.max_elements);
}

size_t SLRUFileCachePriority::getSize(const CacheGuard::Lock & lock) const
Expand Down Expand Up @@ -151,7 +152,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
/// Entry is in probationary queue.
/// We need to move it to protected queue.
const size_t size = iterator.getEntry().size;
if (size > protected_queue.getSizeLimit())
if (size > protected_queue.getSizeLimit(lock))
{
/// Entry size is bigger than the whole protected queue limit.
/// This is only possible if protected_queue_size_limit is less than max_file_segment_size,
Expand Down Expand Up @@ -235,6 +236,21 @@ void SLRUFileCachePriority::shuffle(const CacheGuard::Lock & lock)
probationary_queue.shuffle(lock);
}

bool SLRUFileCachePriority::modifySizeLimits(
size_t max_size_, size_t max_elements_, double size_ratio_, const CacheGuard::Lock & lock)
{
if (max_size == max_size_ && max_elements == max_elements_ && size_ratio == size_ratio_)
return false; /// Nothing to change.

protected_queue.modifySizeLimits(getRatio(max_size_, size_ratio_), getRatio(max_elements_, size_ratio_), 0, lock);
probationary_queue.modifySizeLimits(getRatio(max_size_, 1 - size_ratio_), getRatio(max_elements_, 1 - size_ratio_), 0, lock);

max_size = max_size_;
max_elements = max_elements_;
size_ratio = size_ratio_;
return true;
}

SLRUFileCachePriority::SLRUIterator::SLRUIterator(
SLRUFileCachePriority * cache_priority_,
LRUFileCachePriority::LRUIterator && lru_iterator_,
Expand Down

0 comments on commit d4e71f9

Please sign in to comment.