Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http temporary buffer integration with fs cache #48664

Merged
merged 4 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/Disks/DiskLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
#include <Disks/ObjectStorages/LocalObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
#include <Disks/TemporaryFileOnDisk.h>

#include <fstream>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>

#include <Disks/DiskFactory.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>

#include <Common/randomSeed.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteHelpers.h>
#include <Common/logger_useful.h>


namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
Expand Down Expand Up @@ -582,13 +585,14 @@ struct DiskWriteCheckData
}
};

bool DiskLocal::canWrite() const noexcept
bool DiskLocal::canWrite() noexcept
try
{
static DiskWriteCheckData data;
String tmp_template = fs::path(disk_path) / "";
{
auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
auto disk_ptr = std::static_pointer_cast<DiskLocal>(shared_from_this());
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk_ptr);
auto buf = std::make_unique<WriteBufferFromTemporaryFile>(std::move(tmp_file));
buf->write(data.data, data.PAGE_SIZE_IN_BYTES);
buf->sync();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/DiskLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DiskLocal : public IDisk
/// rudimentary. The more advanced choice would be using
/// https://github.com/smartmontools/smartmontools. However, it's good enough for now.
bool canRead() const noexcept;
bool canWrite() const noexcept;
bool canWrite() noexcept;

DiskObjectStoragePtr createDiskObjectStorage() override;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Disks/TemporaryFileOnDisk.h>

#include <fcntl.h>

Expand All @@ -12,18 +13,12 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
}


WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file_)
: WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600), tmp_file(std::move(tmp_file_))
{}


WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & tmp_dir)
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(TemporaryFileOnDiskHolder && tmp_file_)
: WriteBufferFromFile(tmp_file_->getPath(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, /* throttler= */ {}, 0600)
, tmp_file(std::move(tmp_file_))
{
return Ptr{new WriteBufferFromTemporaryFile(createTemporaryFile(tmp_dir))};
}


class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
{
public:
Expand All @@ -40,11 +35,11 @@ class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name, std::move(origin->tmp_file));
}

ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<PocoTemporaryFile> && tmp_file_)
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, TemporaryFileOnDiskHolder && tmp_file_)
: ReadBufferFromFile(fd_, file_name_), tmp_file(std::move(tmp_file_))
{}

std::unique_ptr<PocoTemporaryFile> tmp_file;
TemporaryFileOnDiskHolder tmp_file;
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@
namespace DB
{

class TemporaryFileOnDisk;
using TemporaryFileOnDiskHolder = std::unique_ptr<TemporaryFileOnDisk>;

/// Rereadable WriteBuffer, could be used as disk buffer
/// Creates unique temporary in directory (and directory itself)
class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadableWriteBuffer
{
public:
using Ptr = std::shared_ptr<WriteBufferFromTemporaryFile>;

static Ptr create(const std::string & tmp_dir);
explicit WriteBufferFromTemporaryFile(TemporaryFileOnDiskHolder && tmp_file_);

~WriteBufferFromTemporaryFile() override;

private:
explicit WriteBufferFromTemporaryFile(std::unique_ptr<PocoTemporaryFile> && tmp_file);

std::shared_ptr<ReadBuffer> getReadBufferImpl() override;

std::unique_ptr<PocoTemporaryFile> tmp_file;
TemporaryFileOnDiskHolder tmp_file;

friend class ReadBufferFromTemporaryWriteBuffer;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
#include <stdexcept>
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/copyData.h>
#include <Common/typeid_cast.h>
#include <Disks/DiskLocal.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
#include <Disks/TemporaryFileOnDisk.h>

#include <filesystem>

namespace fs = std::filesystem;
Expand All @@ -22,6 +25,26 @@ static std::string makeTestArray(size_t size)
return res;
}

class TestCascadeWriteBufferWithDisk : public testing::Test
{
public:
constexpr static auto tmp_root = "tmp/RereadWithTemporaryFileWriteBuffer/";

void SetUp() override
{
fs::create_directories(tmp_root);
disk = std::make_shared<DB::DiskLocal>("local_disk", tmp_root, 0);
}

void TearDown() override
{
disk.reset();
fs::remove_all(tmp_root);
}

DB::DiskPtr disk;
};

static void testCascadeBufferRedability(
std::string data,
CascadeWriteBuffer::WriteBufferPtrs && arg1,
Expand Down Expand Up @@ -206,17 +229,18 @@ TEST(MemoryWriteBuffer, WriteAndReread)
}


TEST(TemporaryFileWriteBuffer, WriteAndReread)
TEST_F(TestCascadeWriteBufferWithDisk, WriteAndReread)
try
{
for (size_t s = 0; s < 2500000; s += 500000)
{
std::string tmp_template = "tmp/TemporaryFileWriteBuffer/";
std::string data = makeTestArray(s);

auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk);
auto buf = std::make_shared<WriteBufferFromTemporaryFile>(std::move(tmp_file));
buf->write(data.data(), data.size());

std::string tmp_template = TestCascadeWriteBufferWithDisk::tmp_root;
std::string tmp_filename = buf->getFileName();
ASSERT_EQ(tmp_template, tmp_filename.substr(0, tmp_template.size()));

Expand All @@ -243,25 +267,23 @@ catch (...)
}


TEST(CascadeWriteBuffer, RereadWithTemporaryFileWriteBuffer)
TEST_F(TestCascadeWriteBufferWithDisk, RereadWithTemporaryFileWriteBuffer)
try
{
const std::string tmp_template = "tmp/RereadWithTemporaryFileWriteBuffer/";

for (size_t s = 0; s < 4000000; s += 1000000)
{
testCascadeBufferRedability(makeTestArray(s),
{},
{
[=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); }
[=, this] (auto) { return std::make_shared<WriteBufferFromTemporaryFile>(std::make_unique<TemporaryFileOnDisk>(disk)); }
});

testCascadeBufferRedability(makeTestArray(s),
{
std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/3ul), 2, 1.5),
},
{
[=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); }
[=, this] (auto) { return std::make_shared<WriteBufferFromTemporaryFile>(std::make_unique<TemporaryFileOnDisk>(disk)); }
});
}
}
Expand Down
20 changes: 19 additions & 1 deletion src/Interpreters/Cache/WriteBufferToFileSegment.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
#include <Interpreters/Cache/FileSegment.h>
#include <IO/SwapHelper.h>
#include <IO/ReadBufferFromFile.h>

#include <base/scope_guard.h>

Expand All @@ -11,11 +12,23 @@ namespace DB

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_ENOUGH_SPACE;
}

WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(file_segment_->detachWriter()), file_segment(file_segment_)
: WriteBufferFromFileDecorator(file_segment_->detachWriter())
, file_segment(file_segment_)
{
}

WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolder && segment_holder_)
: WriteBufferFromFileDecorator(
segment_holder_.file_segments.size() == 1
? segment_holder_.file_segments.front()->detachWriter()
: throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment"))
, file_segment(segment_holder_.file_segments.front().get())
, segment_holder(std::move(segment_holder_))
{
}

Expand Down Expand Up @@ -52,6 +65,11 @@ void WriteBufferToFileSegment::nextImpl()
file_segment->setDownloadedSize(bytes_to_write);
}

std::shared_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
{
finalize();
return std::make_shared<ReadBufferFromFile>(file_segment->getPathInLocalCache());
}

WriteBufferToFileSegment::~WriteBufferToFileSegment()
{
Expand Down
13 changes: 12 additions & 1 deletion src/Interpreters/Cache/WriteBufferToFileSegment.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
#pragma once

#include <IO/WriteBufferFromFileDecorator.h>
#include <Interpreters/Cache/FileSegment.h>
#include <IO/IReadableWriteBuffer.h>

namespace DB
{

class FileSegment;

class WriteBufferToFileSegment : public WriteBufferFromFileDecorator
class WriteBufferToFileSegment : public WriteBufferFromFileDecorator, public IReadableWriteBuffer
{
public:
explicit WriteBufferToFileSegment(FileSegment * file_segment_);
explicit WriteBufferToFileSegment(FileSegmentsHolder && segment_holder);

void nextImpl() override;

~WriteBufferToFileSegment() override;

private:

std::shared_ptr<ReadBuffer> getReadBufferImpl() override;

/// Reference to the file segment in segment_holder if owned by this WriteBufferToFileSegment
/// or to the external file segment passed to the constructor
FileSegment * file_segment;

/// Empty if file_segment is not owned by this WriteBufferToFileSegment
FileSegmentsHolder segment_holder;
};


Expand Down