Skip to content

Commit

Permalink
Merge pull request #57565 from ClickHouse/backport/23.8/57275
Browse files Browse the repository at this point in the history
Backport #57275 to 23.8: Background merges correctly use temporary data storage in the cache
  • Loading branch information
alexey-milovidov committed Jan 5, 2024
2 parents 9b1e39b + e61a040 commit 0d9ee48
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 32 deletions.
6 changes: 3 additions & 3 deletions src/Disks/IO/WriteBufferFromTemporaryFile.cpp
Expand Up @@ -22,7 +22,7 @@ WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(TemporaryFileOnDiskHo
class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
{
public:
static ReadBufferPtr createFrom(WriteBufferFromTemporaryFile * origin)
static std::unique_ptr<ReadBufferFromTemporaryWriteBuffer> createFrom(WriteBufferFromTemporaryFile * origin)
{
int fd = origin->getFD();
std::string file_name = origin->getFileName();
Expand All @@ -32,7 +32,7 @@ class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
throwFromErrnoWithPath("Cannot reread temporary file " + file_name, file_name,
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);

return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name, std::move(origin->tmp_file));
return std::make_unique<ReadBufferFromTemporaryWriteBuffer>(fd, file_name, std::move(origin->tmp_file));
}

ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, TemporaryFileOnDiskHolder && tmp_file_)
Expand All @@ -43,7 +43,7 @@ class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
};


ReadBufferPtr WriteBufferFromTemporaryFile::getReadBufferImpl()
std::unique_ptr<ReadBuffer> WriteBufferFromTemporaryFile::getReadBufferImpl()
{
/// ignore buffer, write all data to file and reread it
finalize();
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/WriteBufferFromTemporaryFile.h
Expand Up @@ -21,7 +21,7 @@ class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadabl
~WriteBufferFromTemporaryFile() override;

private:
std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
std::unique_ptr<ReadBuffer> getReadBufferImpl() override;

TemporaryFileOnDiskHolder tmp_file;

Expand Down
2 changes: 1 addition & 1 deletion src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp
Expand Up @@ -69,7 +69,7 @@ static void testCascadeBufferRedability(
auto rbuf = wbuf_readable.tryGetReadBuffer();
ASSERT_FALSE(!rbuf);

concat.appendBuffer(wrapReadBufferPointer(rbuf));
concat.appendBuffer(wrapReadBufferPointer(std::move(rbuf)));
}

std::string decoded_data;
Expand Down
4 changes: 2 additions & 2 deletions src/IO/IReadableWriteBuffer.h
Expand Up @@ -8,7 +8,7 @@ namespace DB
struct IReadableWriteBuffer
{
/// At the first time returns getReadBufferImpl(). Next calls return nullptr.
inline std::shared_ptr<ReadBuffer> tryGetReadBuffer()
inline std::unique_ptr<ReadBuffer> tryGetReadBuffer()
{
if (!can_reread)
return nullptr;
Expand All @@ -24,7 +24,7 @@ struct IReadableWriteBuffer
/// Creates read buffer from current write buffer.
/// Returned buffer points to the first byte of original buffer.
/// Original stream becomes invalid.
virtual std::shared_ptr<ReadBuffer> getReadBufferImpl() = 0;
virtual std::unique_ptr<ReadBuffer> getReadBufferImpl() = 0;

bool can_reread = true;
};
Expand Down
4 changes: 2 additions & 2 deletions src/IO/MemoryReadWriteBuffer.cpp
Expand Up @@ -124,11 +124,11 @@ void MemoryWriteBuffer::addChunk()
}


std::shared_ptr<ReadBuffer> MemoryWriteBuffer::getReadBufferImpl()
std::unique_ptr<ReadBuffer> MemoryWriteBuffer::getReadBufferImpl()
{
finalize();

auto res = std::make_shared<ReadBufferFromMemoryWriteBuffer>(std::move(*this));
auto res = std::make_unique<ReadBufferFromMemoryWriteBuffer>(std::move(*this));

/// invalidate members
chunk_list.clear();
Expand Down
2 changes: 1 addition & 1 deletion src/IO/MemoryReadWriteBuffer.h
Expand Up @@ -38,7 +38,7 @@ class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost

void finalizeImpl() override { /* no op */ }

std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
std::unique_ptr<ReadBuffer> getReadBufferImpl() override;

const size_t max_total_size;
const size_t initial_chunk_size;
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/Cache/WriteBufferToFileSegment.cpp
Expand Up @@ -80,10 +80,10 @@ void WriteBufferToFileSegment::nextImpl()
file_segment->setDownloadedSize(bytes_to_write);
}

std::shared_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
std::unique_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
{
finalize();
return std::make_shared<ReadBufferFromFile>(file_segment->getPathInLocalCache());
return std::make_unique<ReadBufferFromFile>(file_segment->getPathInLocalCache());
}

WriteBufferToFileSegment::~WriteBufferToFileSegment()
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Cache/WriteBufferToFileSegment.h
Expand Up @@ -20,7 +20,7 @@ class WriteBufferToFileSegment : public WriteBufferFromFileDecorator, public IRe

private:

std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
std::unique_ptr<ReadBuffer> getReadBufferImpl() override;

/// Reference to the file segment in segment_holder if owned by this WriteBufferToFileSegment
/// or to the external file segment passed to the constructor
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/TemporaryDataOnDisk.cpp
Expand Up @@ -55,17 +55,17 @@ TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, Cu
, current_metric_scope(metric_scope)
{}

WriteBufferPtr TemporaryDataOnDisk::createRawStream(size_t max_file_size)
std::unique_ptr<WriteBufferFromFileBase> TemporaryDataOnDisk::createRawStream(size_t max_file_size)
{
if (file_cache)
{
auto holder = createCacheFile(max_file_size);
return std::make_shared<WriteBufferToFileSegment>(std::move(holder));
return std::make_unique<WriteBufferToFileSegment>(std::move(holder));
}
else if (volume)
{
auto tmp_file = createRegularFile(max_file_size);
return std::make_shared<WriteBufferFromTemporaryFile>(std::move(tmp_file));
return std::make_unique<WriteBufferFromTemporaryFile>(std::move(tmp_file));
}

throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache and no volume");
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/TemporaryDataOnDisk.h
Expand Up @@ -95,7 +95,7 @@ class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
/// 1) it doesn't account data in parent scope
/// 2) returned buffer owns resources (instead of TemporaryDataOnDisk itself)
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
WriteBufferPtr createRawStream(size_t max_file_size = 0);
std::unique_ptr<WriteBufferFromFileBase> createRawStream(size_t max_file_size = 0);

std::vector<TemporaryFileStream *> getStreams() const;
bool empty() const;
Expand Down
28 changes: 23 additions & 5 deletions src/Storages/MergeTree/MergeTask.cpp
Expand Up @@ -11,6 +11,7 @@

#include <DataTypes/ObjectUtils.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <IO/IReadableWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
Expand Down Expand Up @@ -278,7 +279,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->compression_codec = global_ctx->data->getCompressionCodecForPart(
global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge);

ctx->tmp_disk = global_ctx->context->getGlobalTemporaryVolume()->getDisk();
ctx->tmp_disk = std::make_unique<TemporaryDataOnDisk>(global_ctx->context->getTempDataOnDisk());

switch (global_ctx->chosen_merge_algorithm)
{
Expand All @@ -292,8 +293,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
}
case MergeAlgorithm::Vertical :
{
ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk->getPath());
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path()), DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, global_ctx->context->getWriteSettings());
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->createRawStream();
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);

MergeTreeDataPartInMemory::ColumnToSize local_merged_column_to_size;
Expand Down Expand Up @@ -395,7 +395,6 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g

new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf);
new_ctx->rows_sources_uncompressed_write_buf = std::move(ctx->rows_sources_uncompressed_write_buf);
new_ctx->rows_sources_file = std::move(ctx->rows_sources_file);
new_ctx->column_sizes = std::move(ctx->column_sizes);
new_ctx->compression_codec = std::move(ctx->compression_codec);
new_ctx->tmp_disk = std::move(ctx->tmp_disk);
Expand Down Expand Up @@ -506,7 +505,26 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);

ctx->rows_sources_read_buf = std::make_unique<CompressedReadBufferFromFile>(ctx->tmp_disk->readFile(fileName(ctx->rows_sources_file->path())));
/// TemporaryDataOnDisk::createRawStream returns WriteBufferFromFile implementing IReadableWriteBuffer
/// and we expect to get ReadBufferFromFile here.
/// So, it's relatively safe to use dynamic_cast here and downcast to ReadBufferFromFile.
auto * wbuf_readable = dynamic_cast<IReadableWriteBuffer *>(ctx->rows_sources_uncompressed_write_buf.get());
std::unique_ptr<ReadBuffer> reread_buf = wbuf_readable ? wbuf_readable->tryGetReadBuffer() : nullptr;
if (!reread_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot read temporary file {}", ctx->rows_sources_uncompressed_write_buf->getFileName());
auto * reread_buffer_raw = dynamic_cast<ReadBufferFromFile *>(reread_buf.get());
if (!reread_buffer_raw)
{
const auto & reread_buf_ref = *reread_buf;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected ReadBufferFromFile, but got {}", demangle(typeid(reread_buf_ref).name()));
}
/// Move ownership from std::unique_ptr<ReadBuffer> to std::unique_ptr<ReadBufferFromFile> for CompressedReadBufferFromFile.
/// First, release ownership from unique_ptr to base type.
reread_buf.release(); /// NOLINT(bugprone-unused-return-value): we already have the pointer value in `reread_buffer_raw`
/// Then, move ownership to unique_ptr to concrete type.
std::unique_ptr<ReadBufferFromFile> reread_buffer_from_file(reread_buffer_raw);
/// CompressedReadBufferFromFile expects std::unique_ptr<ReadBufferFromFile> as argument.
ctx->rows_sources_read_buf = std::make_unique<CompressedReadBufferFromFile>(std::move(reread_buffer_from_file));

/// For external cycle
global_ctx->gathering_column_names_size = global_ctx->gathering_column_names.size();
Expand Down
33 changes: 23 additions & 10 deletions src/Storages/MergeTree/MergeTask.h
@@ -1,21 +1,36 @@
#pragma once

#include <list>
#include <memory>

#include <Common/filesystemHelpers.h>

#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>

#include <Interpreters/TemporaryDataOnDisk.h>

#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/ColumnGathererTransform.h>

#include <QueryPipeline/QueryPipeline.h>

#include <Storages/MergeTree/ColumnSizeEstimator.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeProgress.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/ColumnSizeEstimator.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/MergeProgress.h>
#include <Storages/MergeTree/MergeTreeData.h>

#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Common/filesystemHelpers.h>

#include <memory>
#include <list>


namespace DB
{
Expand Down Expand Up @@ -193,13 +208,12 @@ class MergeTask
bool need_prefix;
MergeTreeData::MergingParams merging_params{};

DiskPtr tmp_disk{nullptr};
TemporaryDataOnDiskPtr tmp_disk{nullptr};
DiskPtr disk{nullptr};
bool need_remove_expired_values{false};
bool force_ttl{false};
CompressionCodecPtr compression_codec{nullptr};
size_t sum_input_rows_upper_bound{0};
std::unique_ptr<PocoTemporaryFile> rows_sources_file{nullptr};
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::optional<ColumnSizeEstimator> column_sizes{};
Expand Down Expand Up @@ -262,12 +276,11 @@ class MergeTask
struct VerticalMergeRuntimeContext : public IStageRuntimeContext
{
/// Begin dependencies from previous stage
std::unique_ptr<PocoTemporaryFile> rows_sources_file;
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf{nullptr};
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::optional<ColumnSizeEstimator> column_sizes;
CompressionCodecPtr compression_codec;
DiskPtr tmp_disk{nullptr};
TemporaryDataOnDiskPtr tmp_disk{nullptr};
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
size_t column_num_for_vertical_merge{0};
bool read_with_direct_io{false};
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_s3_zero_copy_ttl/configs/s3.xml
Expand Up @@ -7,6 +7,13 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_disk>
<s3_cache_disk>
<!-- used as a temporary data storage -->
<type>cache</type>
<disk>s3_disk</disk>
<path>./s3_cache_disk/</path>
<max_size>10Gi</max_size>
</s3_cache_disk>
</disks>

<policies>
Expand Down Expand Up @@ -35,5 +42,7 @@
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
</merge_tree>

<temporary_data_in_cache>s3_cache_disk</temporary_data_in_cache>

<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

0 comments on commit 0d9ee48

Please sign in to comment.