From d13588e130aa3963fab853a7d445d74b1177e0c3 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 5 Oct 2023 08:01:53 +0200 Subject: [PATCH] Fix --- src/Interpreters/Cache/FileCache.cpp | 3 +- src/Interpreters/Cache/FileSegment.cpp | 10 ++- src/Interpreters/Cache/FileSegment.h | 4 ++ src/Interpreters/Cache/Metadata.cpp | 86 ++++++++++++++------------ 4 files changed, 63 insertions(+), 40 deletions(-) diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 3ed2c9c2dd60..5de24977db5f 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -521,7 +521,7 @@ KeyMetadata::iterator FileCache::addFileSegment( result_state = state; } - auto file_segment = std::make_shared(key, offset, size, result_state, settings, this, locked_key.getKeyMetadata()); + auto file_segment = std::make_shared(key, offset, size, result_state, settings, background_download_threads > 0, this, locked_key.getKeyMetadata()); auto file_segment_metadata = std::make_shared(std::move(file_segment)); auto [file_segment_metadata_it, inserted] = locked_key.getKeyMetadata()->emplace(offset, file_segment_metadata); @@ -1018,6 +1018,7 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir) auto file_segment = std::make_shared(key, offset, size, FileSegment::State::DOWNLOADED, CreateFileSegmentSettings(segment_kind), + false, this, key_metadata, cache_it); diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index bb3216cb20e1..7559a4153161 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -44,6 +44,7 @@ FileSegment::FileSegment( size_t size_, State download_state_, const CreateFileSegmentSettings & settings, + bool background_download_enabled_, FileCache * cache_, std::weak_ptr key_metadata_, Priority::Iterator queue_iterator_) @@ -51,6 +52,7 @@ FileSegment::FileSegment( , segment_range(offset_, offset_ + size_ - 1) , segment_kind(settings.kind) , is_unbound(settings.unbounded) + , background_download_enabled(background_download_enabled_) , download_state(download_state_) , key_metadata(key_metadata_) , queue_iterator(queue_iterator_) @@ -652,7 +654,7 @@ void FileSegment::complete() if (is_last_holder) { - if (remote_file_reader) + if (background_download_enabled && remote_file_reader) { LOG_TEST( log, "Submitting file segment for background download " @@ -908,6 +910,12 @@ void FileSegment::use() } } +void FileSegment::disableBackgroundDownload() +{ + auto lock = lockFileSegment(); + background_download_enabled = false; +} + FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl() { front().complete(); diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 8948b67fe2a0..088df12f0a37 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -110,6 +110,7 @@ friend class FileCache; /// Because of reserved_size in tryReserve(). size_t size_, State download_state_, const CreateFileSegmentSettings & create_settings = {}, + bool background_download_enabled_ = false, FileCache * cache_ = nullptr, std::weak_ptr key_metadata_ = std::weak_ptr(), Priority::Iterator queue_iterator_ = Priority::Iterator{}); @@ -262,6 +263,8 @@ friend class FileCache; /// Because of reserved_size in tryReserve(). void setDownloadedSize(size_t delta); + void disableBackgroundDownload(); + private: String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const; bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; @@ -290,6 +293,7 @@ friend class FileCache; /// Because of reserved_size in tryReserve(). /// Size of the segment is not known until it is downloaded and /// can be bigger than max_file_segment_size. const bool is_unbound = false; + bool background_download_enabled; std::atomic download_state; DownloaderId downloader_id; /// The one who prepares the download diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index f8fe3dd41851..cd4b1b6f3b06 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -542,7 +542,9 @@ void CacheMetadata::downloadThreadFunc() { if (holder) { - const auto & file_segment = holder->front(); + auto & file_segment = holder->front(); + file_segment.disableBackgroundDownload(); + LOG_ERROR( log, "Error during background download of {}:{} ({}): {}", file_segment.key(), file_segment.offset(), @@ -581,52 +583,60 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optionalinternalBuffer().empty()) - { - if (!memory) - memory.emplace(DBMS_DEFAULT_BUFFER_SIZE); - reader->set(memory->data(), memory->size()); - } - - size_t offset = file_segment.getCurrentWriteOffset(); - if (offset != static_cast(reader->getPosition())) - reader->seek(offset, SEEK_SET); - - while (!reader->eof()) + try { - auto size = reader->available(); - - if (!file_segment.reserve(size)) + /// If remote_fs_read_method == 'threadpool', + /// reader itself never owns/allocates the buffer. + if (reader->internalBuffer().empty()) { - LOG_TEST( - log, "Failed to reserve space during background download " - "for {}:{} (downloaded size: {}/{})", - file_segment.key(), file_segment.offset(), - file_segment.getDownloadedSize(), file_segment.range().size()); - return; + if (!memory) + memory.emplace(DBMS_DEFAULT_BUFFER_SIZE); + reader->set(memory->data(), memory->size()); } - try - { - file_segment.write(reader->position(), size, offset); - offset += size; - reader->position() += size; - } - catch (ErrnoException & e) + size_t offset = file_segment.getCurrentWriteOffset(); + if (offset != static_cast(reader->getPosition())) + reader->seek(offset, SEEK_SET); + + while (!reader->eof()) { - int code = e.getErrno(); - if (code == /* No space left on device */28 || code == /* Quota exceeded */122) + auto size = reader->available(); + + if (!file_segment.reserve(size)) { - LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText()); + LOG_TEST( + log, "Failed to reserve space during background download " + "for {}:{} (downloaded size: {}/{})", + file_segment.key(), file_segment.offset(), + file_segment.getDownloadedSize(), file_segment.range().size()); return; } - throw; + + try + { + file_segment.write(reader->position(), size, offset); + offset += size; + reader->position() += size; + } + catch (ErrnoException & e) + { + int code = e.getErrno(); + if (code == /* No space left on device */28 || code == /* Quota exceeded */122) + { + LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText()); + return; + } + throw; + } } - } - LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog()); + LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog()); + } + catch (...) + { + file_segment.resetRemoteFileReader(); + throw; + } } void CacheMetadata::cancelDownload() @@ -802,7 +812,7 @@ void LockedKey::shrinkFileSegmentToDownloadedSize( metadata->file_segment = std::make_shared( getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED, - CreateFileSegmentSettings(file_segment->getKind()), + CreateFileSegmentSettings(file_segment->getKind()), false, file_segment->cache, key_metadata, file_segment->queue_iterator); if (diff)