Skip to content

Commit

Permalink
Merge pull request #51622 from kssenii/system-sync-cache
Browse files Browse the repository at this point in the history
Add `system sync filesystem cache` command
  • Loading branch information
kssenii committed Aug 24, 2023
2 parents 28c5725 + c8a12f7 commit 1877065
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 73 deletions.
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Expand Up @@ -153,6 +153,7 @@ enum class AccessType
M(SYSTEM_DROP_QUERY_CACHE, "SYSTEM DROP QUERY, DROP QUERY CACHE, DROP QUERY", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_SYNC_FILESYSTEM_CACHE, "SYSTEM REPAIR FILESYSTEM CACHE, REPAIR FILESYSTEM CACHE, SYNC FILESYSTEM CACHE", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
Expand Down
49 changes: 18 additions & 31 deletions src/Interpreters/Cache/FileCache.cpp
Expand Up @@ -42,6 +42,7 @@ size_t roundUpToMultiple(size_t num, size_t multiple)
{
return roundDownToMultiple(num + multiple - 1, multiple);
}

}

namespace DB
Expand Down Expand Up @@ -174,31 +175,6 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
"Cannot have zero size downloaded file segments. {}",
file_segment->getInfoForLog());
}

#ifndef NDEBUG
/**
* Check that in-memory state of the cache is consistent with the state on disk.
* Check only in debug build, because such checks can be done often and can be quite
* expensive compared to overall query execution time.
*/

fs::path path = file_segment->getPathInLocalCache();
if (!fs::exists(path))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"File path does not exist, but file has DOWNLOADED state. {}",
file_segment->getInfoForLog());
}

if (fs::file_size(path) == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot have zero size downloaded file segments. {}",
file_segment->getInfoForLog());
}
#endif
}
}
else
Expand Down Expand Up @@ -1037,7 +1013,7 @@ void FileCache::deactivateBackgroundOperations()
cleanup_thread->join();
}

FileSegmentsHolderPtr FileCache::getSnapshot()
FileSegments FileCache::getSnapshot()
{
assertInitialized();
#ifndef NDEBUG
Expand All @@ -1050,19 +1026,19 @@ FileSegmentsHolderPtr FileCache::getSnapshot()
for (const auto & [_, file_segment_metadata] : locked_key)
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
});
return std::make_unique<FileSegmentsHolder>(std::move(file_segments), /* complete_on_dtor */false);
return file_segments;
}

FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key)
FileSegments FileCache::getSnapshot(const Key & key)
{
FileSegments file_segments;
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW_LOGICAL);
for (const auto & [_, file_segment_metadata] : *locked_key->getKeyMetadata())
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
return file_segments;
}

FileSegmentsHolderPtr FileCache::dumpQueue()
FileSegments FileCache::dumpQueue()
{
assertInitialized();

Expand All @@ -1073,7 +1049,7 @@ FileSegmentsHolderPtr FileCache::dumpQueue()
return PriorityIterationResult::CONTINUE;
}, lockCache());

return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
return file_segments;
}

std::vector<String> FileCache::tryGetCachePaths(const Key & key)
Expand Down Expand Up @@ -1148,4 +1124,15 @@ FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
}

FileSegments FileCache::sync()
{
FileSegments file_segments;
metadata.iterate([&](LockedKey & locked_key)
{
auto broken = locked_key.sync();
file_segments.insert(file_segments.end(), broken.begin(), broken.end());
});
return file_segments;
}

}
8 changes: 5 additions & 3 deletions src/Interpreters/Cache/FileCache.h
Expand Up @@ -124,11 +124,11 @@ class FileCache : private boost::noncopyable

bool tryReserve(FileSegment & file_segment, size_t size, FileCacheReserveStat & stat);

FileSegmentsHolderPtr getSnapshot();
FileSegments getSnapshot();

FileSegmentsHolderPtr getSnapshot(const Key & key);
FileSegments getSnapshot(const Key & key);

FileSegmentsHolderPtr dumpQueue();
FileSegments dumpQueue();

void deactivateBackgroundOperations();

Expand All @@ -150,6 +150,8 @@ class FileCache : private boost::noncopyable

CacheGuard::Lock lockCache() const;

FileSegments sync();

private:
using KeyAndOffset = FileCacheKeyAndOffset;

Expand Down
11 changes: 9 additions & 2 deletions src/Interpreters/Cache/FileSegment.cpp
Expand Up @@ -882,8 +882,15 @@ void FileSegment::setDetachedState(const FileSegmentGuard::Lock & lock)
key_metadata.reset();
cache = nullptr;
queue_iterator = nullptr;
cache_writer.reset();
remote_file_reader.reset();
try
{
cache_writer.reset();
remote_file_reader.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

void FileSegment::detach(const FileSegmentGuard::Lock & lock, const LockedKey &)
Expand Down
109 changes: 88 additions & 21 deletions src/Interpreters/Cache/Metadata.cpp
Expand Up @@ -128,7 +128,7 @@ bool KeyMetadata::createBaseDirectory()
return true;
}

std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment)
std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment) const
{
return fs::path(key_path)
/ CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind());
Expand Down Expand Up @@ -704,57 +704,72 @@ bool LockedKey::removeAllFileSegments(bool if_releasable)
return removed_all;
}

KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset)
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, bool can_be_broken)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no offset {}", offset);

auto file_segment = it->second->file_segment;
return removeFileSegmentImpl(it, file_segment->lock());
return removeFileSegmentImpl(it, file_segment->lock(), can_be_broken);
}

KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock)
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock, bool can_be_broken)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no offset {} in key {}", offset, getKey());

return removeFileSegmentImpl(it, segment_lock);
return removeFileSegmentImpl(it, segment_lock, can_be_broken);
}

KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock)
KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock, bool can_be_broken)
{
auto file_segment = it->second->file_segment;

LOG_DEBUG(
key_metadata->log, "Remove from cache. Key: {}, offset: {}, size: {}",
getKey(), file_segment->offset(), file_segment->reserved_size);

chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
chassert(can_be_broken || file_segment->assertCorrectnessUnlocked(segment_lock));

if (file_segment->queue_iterator)
file_segment->queue_iterator->invalidate();

file_segment->detach(segment_lock, *this);

const auto path = key_metadata->getFileSegmentPath(*file_segment);
bool exists = fs::exists(path);
if (exists)
try
{
fs::remove(path);
const auto path = key_metadata->getFileSegmentPath(*file_segment);
bool exists = fs::exists(path);
if (exists)
{
fs::remove(path);

/// Clear OpenedFileCache to avoid reading from incorrect file descriptor.
int flags = file_segment->getFlagsForLocalRead();
/// Files are created with flags from file_segment->getFlagsForLocalRead()
/// plus optionally O_DIRECT is added, depends on query setting, so remove both.
OpenedFileCache::instance().remove(path, flags);
OpenedFileCache::instance().remove(path, flags | O_DIRECT);
/// Clear OpenedFileCache to avoid reading from incorrect file descriptor.
int flags = file_segment->getFlagsForLocalRead();
/// Files are created with flags from file_segment->getFlagsForLocalRead()
/// plus optionally O_DIRECT is added, depends on query setting, so remove both.
OpenedFileCache::instance().remove(path, flags);
OpenedFileCache::instance().remove(path, flags | O_DIRECT);

LOG_TEST(key_metadata->log, "Removed file segment at path: {}", path);
LOG_TEST(key_metadata->log, "Removed file segment at path: {}", path);
}
else if (file_segment->downloaded_size && !can_be_broken)
{
#ifdef ABORT_ON_LOGICAL_ERROR
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected path {} to exist", path);
#else
LOG_WARNING(key_metadata->log, "Expected path {} to exist, while removing {}:{}",
path, getKey(), file_segment->offset());
#endif
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
chassert(false);
}
else if (file_segment->downloaded_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected path {} to exist", path);

return key_metadata->erase(it);
}
Expand Down Expand Up @@ -870,4 +885,56 @@ std::string LockedKey::toString() const
return result;
}

FileSegments LockedKey::sync()
{
FileSegments broken;
for (auto it = key_metadata->begin(); it != key_metadata->end();)
{
auto file_segment = it->second->file_segment;
if (file_segment->isDetached())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"File segment has unexpected state: DETACHED ({})", file_segment->getInfoForLog());
}

if (file_segment->getDownloadedSize(false) == 0)
{
++it;
continue;
}

const auto & path = key_metadata->getFileSegmentPath(*file_segment);
if (!fs::exists(path))
{
LOG_WARNING(
key_metadata->log,
"File segment has DOWNLOADED state, but file does not exist ({})",
file_segment->getInfoForLog());

broken.push_back(FileSegment::getSnapshot(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */true);
continue;
}

const size_t actual_size = fs::file_size(path);
const size_t expected_size = file_segment->getDownloadedSize(false);

if (actual_size == expected_size)
{
++it;
continue;
}

LOG_WARNING(
key_metadata->log,
"File segment has unexpected size. Having {}, expected {} ({})",
actual_size, expected_size, file_segment->getInfoForLog());

broken.push_back(FileSegment::getSnapshot(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */false);
}
return broken;
}

}
10 changes: 6 additions & 4 deletions src/Interpreters/Cache/Metadata.h
Expand Up @@ -73,7 +73,7 @@ struct KeyMetadata : public std::map<size_t, FileSegmentMetadataPtr>,

bool createBaseDirectory();

std::string getFileSegmentPath(const FileSegment & file_segment);
std::string getFileSegmentPath(const FileSegment & file_segment) const;

private:
KeyState key_state = KeyState::ACTIVE;
Expand Down Expand Up @@ -192,8 +192,8 @@ struct LockedKey : private boost::noncopyable

bool removeAllFileSegments(bool if_releasable = true);

KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
KeyMetadata::iterator removeFileSegment(size_t offset);
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &, bool can_be_broken = false);
KeyMetadata::iterator removeFileSegment(size_t offset, bool can_be_broken = false);

void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &);

Expand All @@ -207,10 +207,12 @@ struct LockedKey : private boost::noncopyable

void markAsRemoved();

FileSegments sync();

std::string toString() const;

private:
KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &);
KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &, bool can_be_broken = false);

const std::shared_ptr<KeyMetadata> key_metadata;
KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`.
Expand Down

0 comments on commit 1877065

Please sign in to comment.