Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inserting only non-duplicate chunks in MV #54184

Merged
merged 6 commits into from Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/Processors/Sinks/IOutputChunkGenerator.h
@@ -0,0 +1,26 @@
#pragma once

#include <Processors/Chunk.h>
#include <Interpreters/Context.h>

namespace DB
{

/// This interface is meant to be used by the SinkToStorage processor
/// SinkToStorage delegates on it the creation of the data chunk that will deliver to the next stages of the query pipeline
/// Default implementation (createDefault() factory method) just forwards everything that it receives
class IOutputChunkGenerator
{
public:
static std::unique_ptr<IOutputChunkGenerator> createCopyRanges(bool deduplicate_later);
static std::unique_ptr<IOutputChunkGenerator> createDefault();

virtual ~IOutputChunkGenerator() = default;

virtual void onNewChunkArrived(Chunk chunk) = 0;
virtual void onRowsProcessed(size_t row_count, bool append) = 0;

virtual Chunk generateChunk() = 0;
};

}
91 changes: 91 additions & 0 deletions src/Processors/Sinks/OutputChunkGenerator.cpp
@@ -0,0 +1,91 @@
#include <Processors/Sinks/IOutputChunkGenerator.h>

namespace DB
{

/// Default implementation. The new chunk received is forwarded as-is to the next stages of the query
class ForwardEverythingGenerator : public IOutputChunkGenerator
hanfei1991 marked this conversation as resolved.
Show resolved Hide resolved
{
public:

explicit ForwardEverythingGenerator() = default;

void onNewChunkArrived(Chunk chunk) override
{
in_chunk = chunk.clone();
}

void onRowsProcessed(size_t /*row_count*/, bool /*append*/) override
{}

Chunk generateChunk() override
{
return std::move(in_chunk);
}

private:
Chunk in_chunk;
};

/// Specific implementation which generates a chunk with just a subset of the rows received originally
/// Rows are assumed to be processed in the same order than they appear in the original chunk
/// Is up to the client to decide how many rows process at once, but after each range processed,
/// onRowsProcessed() has to be called, indicating whether append that range to the output chunk or not
class CopyRangesGenerator : public IOutputChunkGenerator
{
public:
explicit CopyRangesGenerator() = default;

void onNewChunkArrived(Chunk chunk) override
{
out_cols = chunk.cloneEmptyColumns();
in_chunk = std::move(chunk);
row_offset = 0;
final_chunk_rows = 0;
}

void onRowsProcessed(size_t row_count, bool append) override
{
if (append)
{
const Columns& in_cols = in_chunk.getColumns();
for (size_t i = 0; i < out_cols.size(); i++)
{
out_cols[i]->insertRangeFrom(*(in_cols[i]), row_offset, row_count);
}
final_chunk_rows += row_count;
}

row_offset += row_count;
}

Chunk generateChunk() override
{
return Chunk(std::move(out_cols), final_chunk_rows);
}

private:
Chunk in_chunk;
MutableColumns out_cols;
size_t row_offset = 0;
size_t final_chunk_rows = 0;
};

std::unique_ptr<IOutputChunkGenerator> IOutputChunkGenerator::createCopyRanges(bool deduplicate_later)
{
// If MV is responsible for deduplication, block won't be considered duplicated.
// So default implementation, forwarding all the data, is used
if (deduplicate_later)
{
return createDefault();
}

return std::make_unique<CopyRangesGenerator>();
}

std::unique_ptr<IOutputChunkGenerator> IOutputChunkGenerator::createDefault()
{
return std::make_unique<ForwardEverythingGenerator>();
}

}
13 changes: 9 additions & 4 deletions src/Processors/Sinks/SinkToStorage.cpp
Expand Up @@ -4,7 +4,12 @@
namespace DB
{

SinkToStorage::SinkToStorage(const Block & header) : ExceptionKeepingTransform(header, header, false) {}
SinkToStorage::SinkToStorage(const Block & header) : SinkToStorage(header, IOutputChunkGenerator::createDefault()) {}

SinkToStorage::SinkToStorage(const Block & header, std::unique_ptr<IOutputChunkGenerator> output_generator_)
: ExceptionKeepingTransform(header, header, false),
output_generator(std::move(output_generator_))
{ }

void SinkToStorage::onConsume(Chunk chunk)
{
Expand All @@ -15,15 +20,15 @@ void SinkToStorage::onConsume(Chunk chunk)
*/
Nested::validateArraySizes(getHeader().cloneWithColumns(chunk.getColumns()));

output_generator->onNewChunkArrived(chunk.clone());
consume(chunk.clone());
if (!lastBlockIsDuplicate())
cur_chunk = std::move(chunk);
}

SinkToStorage::GenerateResult SinkToStorage::onGenerate()
{
GenerateResult res;
res.chunk = std::move(cur_chunk);

res.chunk = output_generator->generateChunk();
res.is_done = true;
return res;
}
Expand Down
7 changes: 5 additions & 2 deletions src/Processors/Sinks/SinkToStorage.h
@@ -1,6 +1,7 @@
#pragma once
#include <Storages/TableLockHolder.h>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Processors/Sinks/IOutputChunkGenerator.h>

namespace DB
{
Expand All @@ -13,21 +14,23 @@ friend class PartitionedSink;

public:
explicit SinkToStorage(const Block & header);
explicit SinkToStorage(const Block & header, std::unique_ptr<IOutputChunkGenerator> output_generator_);

const Block & getHeader() const { return inputs.front().getHeader(); }
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

protected:
virtual void consume(Chunk chunk) = 0;
virtual bool lastBlockIsDuplicate() const { return false; }

IOutputChunkGenerator& getOutputGenerator() { return *output_generator; }

private:
std::vector<TableLockHolder> table_locks;

void onConsume(Chunk chunk) override;
GenerateResult onGenerate() override;

Chunk cur_chunk;
std::unique_ptr<IOutputChunkGenerator> output_generator;
};

using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
Expand Down
25 changes: 6 additions & 19 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Expand Up @@ -130,7 +130,8 @@ ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
bool majority_quorum,
ContextPtr context_,
bool is_attach_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
: SinkToStorage(metadata_snapshot_->getSampleBlock(),
IOutputChunkGenerator::createCopyRanges(context_->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, required_quorum_size(majority_quorum ? std::nullopt : std::make_optional<size_t>(quorum_size))
Expand Down Expand Up @@ -386,13 +387,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);

/// If deduplicated data should not be inserted into MV, we need to set proper
/// value for `last_block_is_duplicate`, which is possible only after the part is committed.
/// Othervide we can delay commit.
/// TODO: we can also delay commit if there is no MVs.
if (!settings.deduplicate_blocks_in_dependent_materialized_views)
finishDelayedChunk(zookeeper);
finishDelayedChunk(zookeeper);

++num_blocks_processed;
}
Expand All @@ -403,8 +398,6 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
if (!delayed_chunk)
return;

last_block_is_duplicate = false;

for (auto & partition : delayed_chunk->partitions)
{
ProfileEventsScope scoped_attach(&partition.part_counters);
Expand All @@ -415,9 +408,10 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF

try
{
bool deduplicated = commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num, false).second;
const size_t rowsCount = partition.temp_part.part->rows_count;
const bool deduplicated = commitPart(zookeeper, part, partition.block_id, delayed_chunk->replicas_num, false).second;

last_block_is_duplicate = last_block_is_duplicate || deduplicated;
getOutputGenerator().onRowsProcessed(rowsCount, !deduplicated);

/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
Expand Down Expand Up @@ -1092,13 +1086,6 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::onStart()
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event, context, true);
}

template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onFinish()
{
auto zookeeper = storage.getZooKeeper();
finishDelayedChunk(std::make_shared<ZooKeeperWithFaultInjection>(zookeeper));
}

template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::waitForQuorum(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
Expand Down
12 changes: 0 additions & 12 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.h
Expand Up @@ -51,23 +51,12 @@ class ReplicatedMergeTreeSinkImpl : public SinkToStorage

void onStart() override;
void consume(Chunk chunk) override;
void onFinish() override;

String getName() const override { return "ReplicatedMergeTreeSink"; }

/// For ATTACHing existing data on filesystem.
bool writeExistingPart(MergeTreeData::MutableDataPartPtr & part);

/// For proper deduplication in MaterializedViews
bool lastBlockIsDuplicate() const override
{
/// If MV is responsible for deduplication, block is not considered duplicating.
if (context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
return false;

return last_block_is_duplicate;
}

struct DelayedChunk;
private:
using BlockIDsType = std::conditional_t<async_insert, std::vector<String>, String>;
Expand Down Expand Up @@ -122,7 +111,6 @@ class ReplicatedMergeTreeSinkImpl : public SinkToStorage
bool is_attach = false;
bool quorum_parallel = false;
const bool deduplicate = true;
bool last_block_is_duplicate = false;
UInt64 num_blocks_processed = 0;

using Logger = Poco::Logger;
Expand Down
@@ -0,0 +1,14 @@
Initial
2020-01-01 13:00:00 24
Last block is duplicate
2020-01-01 13:00:00 24
2021-09-01 11:00:00 24
One block is duplicate (default setting)
2020-01-01 13:00:00 24
2021-09-01 11:00:00 24
2022-01-01 12:00:00 24
One block is duplicate (changed setting)
2020-01-01 13:00:00 24
2021-09-01 11:00:00 24
2022-01-01 12:00:00 24
2023-01-01 12:00:00 24
@@ -0,0 +1,44 @@
-- Tags: zookeeper

DROP TABLE IF EXISTS landing SYNC;
DROP TABLE IF EXISTS mv SYNC;

CREATE TABLE landing
(
`time` DateTime,
`number` Int64
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/landing/', 'r1')
PARTITION BY toYYYYMMDD(time)
ORDER BY time;

CREATE MATERIALIZED VIEW mv
ENGINE = ReplicatedSummingMergeTree('/clickhouse/{database}/tables/mv', 'r1')
PARTITION BY toYYYYMMDD(hour) ORDER BY hour
AS SELECT
toStartOfHour(time) AS hour,
sum(number) AS sum_amount
FROM landing GROUP BY hour;

SELECT 'Initial';
INSERT INTO landing VALUES ('2020-01-01 13:23:34', 24);
SELECT * FROM mv ORDER BY hour;

SELECT 'Last block is duplicate';
INSERT INTO landing VALUES ('2021-09-01 11:00:00', 24), ('2020-01-01 13:23:34', 24);
SELECT * FROM mv ORDER BY hour;

SELECT 'One block is duplicate (default setting)';
SET max_insert_delayed_streams_for_parallel_write = 0;
INSERT INTO landing VALUES ('2021-09-01 11:00:00', 24), ('2022-01-01 12:03:00', 24);
SELECT * FROM mv ORDER BY hour;

SELECT 'One block is duplicate (changed setting)';
SET max_insert_delayed_streams_for_parallel_write = 5;
INSERT INTO landing VALUES ('2021-09-01 11:00:00', 24), ('2023-01-01 12:03:00', 24);

SELECT * FROM mv ORDER BY hour;

DROP TABLE mv;
DROP TABLE landing;