Skip to content

Commit

Permalink
Merge pull request #49203 from kssenii/fix-system-error-from-cache
Browse files Browse the repository at this point in the history
Catch exceptions from create_directories in fs cache
  • Loading branch information
kssenii committed Apr 27, 2023
2 parents ed97e46 + 8ba9ab6 commit 28dee37
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions src/Interpreters/Cache/FileCache.cpp
Expand Up @@ -497,62 +497,68 @@ FileCache::FileSegmentCell * FileCache::addCell(
/// Create a file segment cell and put it in `files` map by [key][offset].

if (!size)
return nullptr; /// Empty files are not cached.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero size files are not allowed");

if (files[key].contains(offset))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache cell already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
key.toString(), offset, size, dumpStructureUnlocked(key, cache_lock));

auto skip_or_download = [&]() -> FileSegmentPtr
FileSegment::State result_state = state;
if (state == FileSegment::State::EMPTY && enable_cache_hits_threshold)
{
FileSegment::State result_state = state;
if (state == FileSegment::State::EMPTY && enable_cache_hits_threshold)
auto record = stash_records.find({key, offset});

if (record == stash_records.end())
{
auto record = stash_records.find({key, offset});
auto priority_iter = stash_priority->add(key, offset, 0, cache_lock);
stash_records.insert({{key, offset}, priority_iter});

if (record == stash_records.end())
if (stash_priority->getElementsNum(cache_lock) > max_stash_element_size)
{
auto priority_iter = stash_priority->add(key, offset, 0, cache_lock);
stash_records.insert({{key, offset}, priority_iter});

if (stash_priority->getElementsNum(cache_lock) > max_stash_element_size)
{
auto remove_priority_iter = stash_priority->getLowestPriorityWriteIterator(cache_lock);
stash_records.erase({remove_priority_iter->key(), remove_priority_iter->offset()});
remove_priority_iter->removeAndGetNext(cache_lock);
}

/// For segments that do not reach the download threshold,
/// we do not download them, but directly read them
result_state = FileSegment::State::SKIP_CACHE;
auto remove_priority_iter = stash_priority->getLowestPriorityWriteIterator(cache_lock);
stash_records.erase({remove_priority_iter->key(), remove_priority_iter->offset()});
remove_priority_iter->removeAndGetNext(cache_lock);
}
else
{
auto priority_iter = record->second;
priority_iter->use(cache_lock);

result_state = priority_iter->hits() >= enable_cache_hits_threshold
? FileSegment::State::EMPTY
: FileSegment::State::SKIP_CACHE;
}
/// For segments that do not reach the download threshold,
/// we do not download them, but directly read them
result_state = FileSegment::State::SKIP_CACHE;
}
else
{
auto priority_iter = record->second;
priority_iter->use(cache_lock);

return std::make_shared<FileSegment>(offset, size, key, this, result_state, settings);
};
result_state = priority_iter->hits() >= enable_cache_hits_threshold
? FileSegment::State::EMPTY
: FileSegment::State::SKIP_CACHE;
}
}

FileSegmentCell cell(skip_or_download(), this, cache_lock);
auto & offsets = files[key];

if (offsets.empty())
{
auto key_path = getPathInLocalCache(key);

if (!fs::exists(key_path))
fs::create_directories(key_path);
{
try
{
fs::create_directories(key_path);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
result_state = FileSegment::State::SKIP_CACHE;
}
}
}

auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, result_state, settings);
FileSegmentCell cell(std::move(file_segment), this, cache_lock);

auto [it, inserted] = offsets.insert({offset, std::move(cell)});
if (!inserted)
throw Exception(
Expand Down

0 comments on commit 28dee37

Please sign in to comment.