Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add system sync filesystem cache command #51622

Merged
merged 31 commits into from Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
72d1834
system sync filesystem cache
kssenii Jun 29, 2023
c8ab68a
Add test
kssenii Jun 30, 2023
0c03b16
Rename test
kssenii Jun 30, 2023
31a87c6
Update InterpreterSystemQuery.cpp
kssenii Jul 1, 2023
80dabd1
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Jul 5, 2023
6089243
Update .reference for tests, fix merge
kssenii Jul 5, 2023
2b02148
Fix unit test
kssenii Jul 21, 2023
5d167b4
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Jul 31, 2023
7a631be
Merge remote-tracking branch 'origin/system-sync-cache' into system-s…
kssenii Jul 31, 2023
812fbee
Better
kssenii Jul 31, 2023
0ec11a2
Merge branch 'master' into system-sync-cache
kssenii Aug 3, 2023
2c485e7
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Aug 8, 2023
43ed765
Fix
kssenii Aug 8, 2023
9b6e230
Update 02810_system_sync_filesystem_cache.sh
kssenii Aug 9, 2023
408003f
Merge branch 'master' into system-sync-cache
kssenii Aug 9, 2023
6448034
Update Metadata.cpp
kssenii Aug 10, 2023
aed955a
Merge branch 'master' into system-sync-cache
kssenii Aug 10, 2023
3925a16
Merge branch 'master' into system-sync-cache
kssenii Aug 10, 2023
b609c9b
Fix
kssenii Aug 11, 2023
d3c3ddb
Fix
kssenii Aug 11, 2023
a3d3864
Fix
kssenii Aug 12, 2023
134db24
Update 02810_system_sync_filesystem_cache.sh
kssenii Aug 12, 2023
8535e23
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Aug 13, 2023
2d74472
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Aug 15, 2023
247abe0
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Aug 19, 2023
aeebedf
Merge branch 'master' into system-sync-cache
kssenii Aug 19, 2023
55769cd
Merge branch 'master' into system-sync-cache
kssenii Aug 20, 2023
6bd65be
Update registerDiskCache.cpp
kssenii Aug 20, 2023
325e2d3
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Aug 23, 2023
c47f5eb
Minor change
kssenii Aug 23, 2023
c8a12f7
Merge remote-tracking branch 'upstream/master' into system-sync-cache
kssenii Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Expand Up @@ -153,6 +153,7 @@ enum class AccessType
M(SYSTEM_DROP_QUERY_CACHE, "SYSTEM DROP QUERY, DROP QUERY CACHE, DROP QUERY", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_SYNC_FILESYSTEM_CACHE, "SYSTEM REPAIR FILESYSTEM CACHE, REPAIR FILESYSTEM CACHE, SYNC FILESYSTEM CACHE", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/Cached/registerDiskCache.cpp
Expand Up @@ -45,6 +45,8 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
else if (fs::path(file_cache_settings.base_path).is_relative())
file_cache_settings.base_path = fs::path(context->getPath()) / "caches" / file_cache_settings.base_path;

file_cache_settings.base_path = fs::absolute(file_cache_settings.base_path);

auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings);
auto disk = disk_it->second;
if (!dynamic_cast<const DiskObjectStorage *>(disk.get()))
Expand Down
24 changes: 18 additions & 6 deletions src/Interpreters/Cache/FileCache.cpp
Expand Up @@ -42,6 +42,7 @@ size_t roundUpToMultiple(size_t num, size_t multiple)
{
return roundDownToMultiple(num + multiple - 1, multiple);
}

}

namespace DB
Expand Down Expand Up @@ -1050,7 +1051,7 @@ void FileCache::cleanupThreadFunc()
cleanup_task->scheduleAfter(delayed_cleanup_interval_ms);
}

FileSegmentsHolderPtr FileCache::getSnapshot()
FileSegments FileCache::getSnapshot()
{
assertInitialized();
#ifndef NDEBUG
Expand All @@ -1063,19 +1064,19 @@ FileSegmentsHolderPtr FileCache::getSnapshot()
for (const auto & [_, file_segment_metadata] : locked_key)
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
});
return std::make_unique<FileSegmentsHolder>(std::move(file_segments), /* complete_on_dtor */false);
return file_segments;
}

FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key)
FileSegments FileCache::getSnapshot(const Key & key)
{
FileSegments file_segments;
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
for (const auto & [_, file_segment_metadata] : *locked_key->getKeyMetadata())
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
return file_segments;
}

FileSegmentsHolderPtr FileCache::dumpQueue()
FileSegments FileCache::dumpQueue()
{
assertInitialized();

Expand All @@ -1086,7 +1087,7 @@ FileSegmentsHolderPtr FileCache::dumpQueue()
return PriorityIterationResult::CONTINUE;
}, lockCache());

return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
return file_segments;
}

std::vector<String> FileCache::tryGetCachePaths(const Key & key)
Expand Down Expand Up @@ -1161,4 +1162,15 @@ FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
}

FileSegments FileCache::sync()
{
FileSegments file_segments;
metadata.iterate([&](LockedKey & locked_key)
{
auto broken = locked_key.sync();
file_segments.insert(file_segments.end(), broken.begin(), broken.end());
});
return file_segments;
}

}
8 changes: 5 additions & 3 deletions src/Interpreters/Cache/FileCache.h
Expand Up @@ -102,11 +102,11 @@ class FileCache : private boost::noncopyable

bool tryReserve(FileSegment & file_segment, size_t size);

FileSegmentsHolderPtr getSnapshot();
FileSegments getSnapshot();

FileSegmentsHolderPtr getSnapshot(const Key & key);
FileSegments getSnapshot(const Key & key);

FileSegmentsHolderPtr dumpQueue();
FileSegments dumpQueue();
Comment on lines +127 to +131
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to make a copy of the entire list? At first glance, seems expensive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumpQueue is used for unit test only, and getSnapthot is only for system tables. Also afaik the the result is moved and not copied as the function returns a local variable.


void cleanup();

Expand All @@ -130,6 +130,8 @@ class FileCache : private boost::noncopyable

CacheGuard::Lock lockCache() const;

FileSegments sync();

private:
using KeyAndOffset = FileCacheKeyAndOffset;

Expand Down
83 changes: 78 additions & 5 deletions src/Interpreters/Cache/Metadata.cpp
Expand Up @@ -25,6 +25,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}

FileSegmentMetadata::FileSegmentMetadata(FileSegmentPtr && file_segment_)
Expand Down Expand Up @@ -116,7 +117,7 @@ bool KeyMetadata::createBaseDirectory()
return true;
}

std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment)
std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment) const
{
return fs::path(key_path)
/ CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind());
Expand Down Expand Up @@ -586,19 +587,34 @@ void LockedKey::removeAllReleasable()
}
}

KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock)
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, bool can_be_broken)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no offset {}", offset);

auto file_segment = it->second->file_segment;
return removeFileSegmentImpl(it, file_segment->lock(), can_be_broken);
}

KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock, bool can_be_broken)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset);

return removeFileSegmentImpl(it, segment_lock, can_be_broken);
}

KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock, bool can_be_broken)
{
auto file_segment = it->second->file_segment;

LOG_DEBUG(
key_metadata->log, "Remove from cache. Key: {}, offset: {}, size: {}",
getKey(), offset, file_segment->reserved_size);
getKey(), file_segment->offset(), file_segment->reserved_size);

chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
chassert(can_be_broken || file_segment->assertCorrectnessUnlocked(segment_lock));

if (file_segment->queue_iterator)
file_segment->queue_iterator->invalidate();
Expand All @@ -620,7 +636,7 @@ KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegm

LOG_TEST(key_metadata->log, "Removed file segment at path: {}", path);
}
else if (file_segment->downloaded_size)
else if (file_segment->downloaded_size && !can_be_broken)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected path {} to exist", path);

return key_metadata->erase(it);
Expand Down Expand Up @@ -737,6 +753,63 @@ std::string LockedKey::toString() const
return result;
}

FileSegments LockedKey::sync()
{
FileSegments broken;
for (auto it = key_metadata->begin(); it != key_metadata->end();)
{
auto file_segment = it->second->file_segment;
if (file_segment->getDownloadedSize(false) == 0)
{
++it;
continue;
}

const auto & path = file_segment->getPathInLocalCache();
if (!fs::exists(path))
{
LOG_WARNING(
key_metadata->log,
"File segment has DOWNLOADED state, but file does not exist ({})",
file_segment->getInfoForLog());

broken.push_back(FileSegment::getSnapshot(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), false);
continue;
}

const size_t actual_size = fs::file_size(path);
const size_t expected_size = file_segment->getDownloadedSize(false);

if (actual_size == expected_size)
{
++it;
continue;
}

LOG_WARNING(
key_metadata->log,
"File segment has unexpected size. Having {}, expected {} ({})",
actual_size, expected_size, file_segment->getInfoForLog());

broken.push_back(FileSegment::getSnapshot(file_segment));

auto file_segment_lock = file_segment->lock();
if (actual_size < expected_size)
{
file_segment->downloaded_size = actual_size;
file_segment->download_state = FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
++it;
}
else
{
it = removeFileSegment(file_segment->offset(), file_segment_lock, false);
fs::remove(path);
}
}
return broken;
}

void CleanupQueue::add(const FileCacheKey & key)
{
std::lock_guard lock(mutex);
Expand Down
11 changes: 8 additions & 3 deletions src/Interpreters/Cache/Metadata.h
Expand Up @@ -69,7 +69,7 @@ struct KeyMetadata : public std::map<size_t, FileSegmentMetadataPtr>,

bool createBaseDirectory();

std::string getFileSegmentPath(const FileSegment & file_segment);
std::string getFileSegmentPath(const FileSegment & file_segment) const;

private:
KeyState key_state = KeyState::ACTIVE;
Expand All @@ -87,7 +87,7 @@ struct CacheMetadata : public std::unordered_map<FileCacheKey, KeyMetadataPtr>,
{
public:
using Key = FileCacheKey;
using IterateCacheMetadataFunc = std::function<void(const LockedKey &)>;
using IterateCacheMetadataFunc = std::function<void(LockedKey &)>;

explicit CacheMetadata(const std::string & path_);

Expand Down Expand Up @@ -171,7 +171,8 @@ struct LockedKey : private boost::noncopyable

void removeAllReleasable();

KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &, bool can_be_broken = false);
KeyMetadata::iterator removeFileSegment(size_t offset, bool can_be_broken = false);

void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &);

Expand All @@ -185,9 +186,13 @@ struct LockedKey : private boost::noncopyable

void markAsRemoved();

FileSegments sync();

std::string toString() const;

private:
KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &, bool can_be_broken = true);

const std::shared_ptr<KeyMetadata> key_metadata;
KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`.
};
Expand Down
34 changes: 34 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Expand Up @@ -53,6 +53,7 @@
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
Expand Down Expand Up @@ -375,6 +376,38 @@ BlockIO InterpreterSystemQuery::execute()
}
break;
}
case Type::SYNC_FILESYSTEM_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_SYNC_FILESYSTEM_CACHE);

ColumnsDescription columns{StorageSystemFilesystemCache::getNamesAndTypes()};
Block sample_block;
for (const auto & column : columns)
sample_block.insert({column.type->createColumn(), column.type, column.name});

MutableColumns res_columns = sample_block.cloneEmptyColumns();

if (query.filesystem_cache_name.empty())
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [cache_name, cache_data] : caches)
{
auto file_segments = cache_data->cache->sync();
StorageSystemFilesystemCache::fillDataImpl(res_columns, cache_data->cache, cache_name, file_segments);
}
}
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
auto file_segments = cache->sync();
StorageSystemFilesystemCache::fillDataImpl(res_columns, cache, query.filesystem_cache_name, file_segments);
}

size_t num_rows = res_columns[0]->size();
auto source = std::make_shared<SourceFromSingleChunk>(sample_block, Chunk(std::move(res_columns), num_rows));
result.pipeline = QueryPipeline(std::move(source));
break;
}
case Type::DROP_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE);
Expand Down Expand Up @@ -1015,6 +1048,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_FILESYSTEM_CACHE:
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_SCHEMA_CACHE:
#if USE_AWS_S3
case Type::DROP_S3_CLIENT_CACHE:
Expand Down
24 changes: 19 additions & 5 deletions src/Interpreters/tests/gtest_lru_file_cache.cpp
Expand Up @@ -69,13 +69,16 @@ fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";


void assertEqual(const HolderPtr & holder, const Ranges & expected_ranges, const States & expected_states = {})
void assertEqual(FileSegments::const_iterator segments_begin, FileSegments::const_iterator segments_end, size_t segments_size, const Ranges & expected_ranges, const States & expected_states = {})
{
std::cerr << "Holder: " << holder->toString() << "\n";
ASSERT_EQ(holder->size(), expected_ranges.size());
std::cerr << "File segments: ";
for (auto it = segments_begin; it != segments_end; ++it)
std::cerr << (*it)->range().toString() << ", ";

ASSERT_EQ(segments_size, expected_ranges.size());

if (!expected_states.empty())
ASSERT_EQ(holder->size(), expected_states.size());
ASSERT_EQ(segments_size, expected_states.size());

auto get_expected_state = [&](size_t i)
{
Expand All @@ -86,14 +89,25 @@ void assertEqual(const HolderPtr & holder, const Ranges & expected_ranges, const
};

size_t i = 0;
for (const auto & file_segment : *holder)
for (auto it = segments_begin; it != segments_end; ++it)
{
const auto & file_segment = *it;
ASSERT_EQ(file_segment->range(), expected_ranges[i]);
ASSERT_EQ(file_segment->state(), get_expected_state(i));
++i;
}
}

void assertEqual(const FileSegments & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
{
assertEqual(file_segments.begin(), file_segments.end(), file_segments.size(), expected_ranges, expected_states);
}

void assertEqual(const FileSegmentsHolderPtr & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
{
assertEqual(file_segments->begin(), file_segments->end(), file_segments->size(), expected_ranges, expected_states);
}

FileSegment & get(const HolderPtr & holder, int i)
{
auto it = std::next(holder->begin(), i);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Expand Up @@ -80,6 +80,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
UNFREEZE,
ENABLE_FAILPOINT,
DISABLE_FAILPOINT,
SYNC_FILESYSTEM_CACHE,
END
};

Expand Down