Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset downloader for cache file segment in TemporaryFileStream #48386

Merged
merged 6 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/Interpreters/Cache/FileSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] std::unique_loc

size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
/// range().size() can equal 0 in case of write-though cache.
if (current_downloaded_size != 0 && current_downloaded_size == range().size())
if (!is_unbound && current_downloaded_size != 0 && current_downloaded_size == range().size())
setDownloadedUnlocked(segment_lock);
else
setDownloadState(State::PARTIALLY_DOWNLOADED);
Expand Down Expand Up @@ -343,7 +343,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
ErrorCodes::LOGICAL_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);

if (current_downloaded_size == range().size())
if (!is_unbound && current_downloaded_size == range().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");

if (!cache_writer)
Expand Down Expand Up @@ -689,7 +689,8 @@ String FileSegment::getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment
info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(segment_lock) << ", ";
info << "caller id: " << getCallerId() << ", ";
info << "detached: " << is_detached << ", ";
info << "kind: " << toString(segment_kind);
info << "kind: " << toString(segment_kind) << ", ";
info << "unbound: " << is_unbound;

return info.str();
}
Expand Down Expand Up @@ -785,6 +786,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock);
snapshot->download_state = file_segment->download_state;
snapshot->segment_kind = file_segment->getKind();
snapshot->is_unbound = file_segment->is_unbound;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return snapshot;
}
Expand Down Expand Up @@ -905,6 +907,8 @@ String FileSegmentsHolder::toString()
if (!ranges.empty())
ranges += ", ";
ranges += file_segment->range().toString();
if (file_segment->is_unbound)
ranges += "(unbound)";
}
return ranges;
}
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Cache/FileSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ friend class StorageSystemFilesystemCache;

FileSegmentKind getKind() const { return segment_kind; }
bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; }
bool isUnbound() const { return is_unbound; }

using UniqueId = std::pair<FileCacheKey, size_t>;
UniqueId getUniqueId() const { return std::pair(key(), offset()); }
Expand Down
13 changes: 9 additions & 4 deletions src/Interpreters/Cache/WriteBufferToFileSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <Interpreters/Cache/FileSegment.h>
#include <IO/SwapHelper.h>

#include <base/scope_guard.h>

#include <Common/logger_useful.h>

namespace DB
Expand All @@ -10,20 +12,23 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
extern const int LOGICAL_ERROR;
}

WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(file_segment_->detachWriter()), file_segment(file_segment_)
{
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog());
}

/// If it throws an exception, the file segment will be incomplete, so you should not use it in the future.
void WriteBufferToFileSegment::nextImpl()
{
auto downloader [[maybe_unused]] = file_segment->getOrSetDownloader();
chassert(downloader == FileSegment::getCallerId());

SCOPE_EXIT({
file_segment->completePartAndResetDownloader();
});

size_t bytes_to_write = offset();

/// In case of an error, we don't need to finalize the file segment
Expand Down
25 changes: 21 additions & 4 deletions src/Interpreters/tests/gtest_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ TEST_F(FileCacheTest, writeBuffer)
DB::FileCache cache(cache_base_path, settings);
cache.initialize();

auto write_to_cache = [&cache](const String & key, const Strings & data)
auto write_to_cache = [&cache](const String & key, const Strings & data, bool flush)
{
CreateFileSegmentSettings segment_settings;
segment_settings.kind = FileSegmentKind::Temporary;
Expand All @@ -572,22 +572,39 @@ TEST_F(FileCacheTest, writeBuffer)
EXPECT_EQ(holder.file_segments.size(), 1);
auto & segment = holder.file_segments.front();
WriteBufferToFileSegment out(segment.get());
std::list<std::thread> threads;
std::mutex mu;
for (const auto & s : data)
out.write(s.data(), s.size());
{
/// Write from diffetent threads to check
/// that no assertions inside cache related to downloaderId are triggered
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw may be make a stateless test too? seems good to have

Copy link
Member Author

@vdimir vdimir Apr 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is not to easy to reproduce in stateless. But another task is to enable feature in all s3 stateless tests #48425 (and maybe add some specific ones to that run)

threads.emplace_back([&]
{
std::unique_lock lock(mu);
out.write(s.data(), s.size());
/// test different buffering scenarios
if (flush)
{
out.next();
}
});
}
for (auto & t : threads)
t.join();
return holder;
};

std::vector<fs::path> file_segment_paths;
{
auto holder = write_to_cache("key1", {"abc", "defg"});
auto holder = write_to_cache("key1", {"abc", "defg"}, false);
file_segment_paths.emplace_back(holder.file_segments.front()->getPathInLocalCache());

ASSERT_EQ(fs::file_size(file_segment_paths.back()), 7);
ASSERT_TRUE(holder.file_segments.front()->range() == FileSegment::Range(0, 7));
ASSERT_EQ(cache.getUsedCacheSize(), 7);

{
auto holder2 = write_to_cache("key2", {"1", "22", "333", "4444", "55555"});
auto holder2 = write_to_cache("key2", {"1", "22", "333", "4444", "55555"}, true);
file_segment_paths.emplace_back(holder2.file_segments.front()->getPathInLocalCache());

ASSERT_EQ(fs::file_size(file_segment_paths.back()), 15);
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemFilesystemCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes()
{"downloaded_size", std::make_shared<DataTypeUInt64>()},
{"persistent", std::make_shared<DataTypeNumber<UInt8>>()},
{"kind", std::make_shared<DataTypeString>()},
{"unbound", std::make_shared<DataTypeNumber<UInt8>>()},
};
}

Expand Down Expand Up @@ -62,6 +63,7 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
res_columns[8]->insert(file_segment->getDownloadedSize());
res_columns[9]->insert(file_segment->isPersistent());
res_columns[10]->insert(toString(file_segment->getKind()));
res_columns[11]->insert(file_segment->isUnbound());
}
}
}
Expand Down