Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async insert with deduplication for ReplicatedMergeTree using merging algorithms #51676

Merged
merged 7 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Processors/Transforms/ExpressionTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ void ConvertingTransform::onConsume(Chunk chunk)
expression->execute(block, num_rows);

chunk.setColumns(block.getColumns(), num_rows);
chunk.setChunkInfo(chunk.getChunkInfo());
cur_chunk = std::move(chunk);
}

Expand Down
5 changes: 5 additions & 0 deletions src/Storages/MergeTree/MergeTreeDataWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class MergeTreeDataWriter
*/
TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);

MergeTreeData::MergingParams::Mode getMergingMode() const
{
return data.merging_params.mode;
}

TemporaryPart writeTempPartWithoutPrefix(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, int64_t block_number, ContextPtr context);

/// For insertion.
Expand Down
80 changes: 54 additions & 26 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ThreadFuzzer.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
Expand Down Expand Up @@ -54,6 +56,9 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
UInt64 elapsed_ns;
BlockIDsType block_id;
BlockWithPartition block_with_partition;
/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block
std::optional<BlockWithPartition> unmerged_block_with_partition;
std::unordered_map<String, std::vector<size_t>> block_id_to_offset_idx;
ProfileEvents::Counters part_counters;

Expand All @@ -63,12 +68,14 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
UInt64 elapsed_ns_,
BlockIDsType && block_id_,
BlockWithPartition && block_,
std::optional<BlockWithPartition> && unmerged_block_with_partition_,
ProfileEvents::Counters && part_counters_)
: log(log_),
temp_part(std::move(temp_part_)),
elapsed_ns(elapsed_ns_),
block_id(std::move(block_id_)),
block_with_partition(std::move(block_)),
unmerged_block_with_partition(std::move(unmerged_block_with_partition_)),
part_counters(std::move(part_counters_))
{
initBlockIDMap();
Expand Down Expand Up @@ -113,6 +120,7 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
{
if constexpr (async_insert)
{
auto * current_block_with_partition = unmerged_block_with_partition.has_value() ? &unmerged_block_with_partition.value() : &block_with_partition;
std::vector<size_t> offset_idx;
for (const auto & raw_path : block_paths)
{
Expand All @@ -127,14 +135,14 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
}
std::sort(offset_idx.begin(), offset_idx.end());

auto & offsets = block_with_partition.offsets;
auto & offsets = current_block_with_partition->offsets;
size_t idx = 0, remove_count = 0;
auto it = offset_idx.begin();
std::vector<size_t> new_offsets;
std::vector<String> new_block_ids;

/// construct filter
size_t rows = block_with_partition.block.rows();
size_t rows = current_block_with_partition->block.rows();
auto filter_col = ColumnUInt8::create(rows, 1u);
ColumnUInt8::Container & vec = filter_col->getData();
UInt8 * pos = vec.data();
Expand Down Expand Up @@ -162,18 +170,21 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk

LOG_TRACE(log, "New block IDs: {}, new offsets: {}, size: {}", toString(new_block_ids), toString(new_offsets), new_offsets.size());

block_with_partition.offsets = std::move(new_offsets);
current_block_with_partition->offsets = std::move(new_offsets);
block_id = std::move(new_block_ids);
auto cols = block_with_partition.block.getColumns();
auto cols = current_block_with_partition->block.getColumns();
for (auto & col : cols)
{
col = col->filter(vec, rows - remove_count);
}
block_with_partition.block.setColumns(cols);
current_block_with_partition->block.setColumns(cols);

LOG_TRACE(log, "New block rows {}", block_with_partition.block.rows());
LOG_TRACE(log, "New block rows {}", current_block_with_partition->block.rows());

initBlockIDMap();

if (unmerged_block_with_partition.has_value())
block_with_partition.block = unmerged_block_with_partition->block;
}
else
{
Expand Down Expand Up @@ -202,7 +213,7 @@ std::vector<Int64> testSelfDeduplicate(std::vector<Int64> data, std::vector<size
BlockWithPartition block1(std::move(block), Row(), std::move(offsets));
ProfileEvents::Counters profile_counters;
ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition part(
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1), std::move(profile_counters));
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1), std::nullopt, std::move(profile_counters));

part.filterSelfDuplicate();

Expand Down Expand Up @@ -235,8 +246,10 @@ namespace
{
SipHash hash;
for (size_t i = start; i < offset; ++i)
{
for (const auto & col : cols)
col->updateHashWithValue(i, hash);
}
union
{
char bytes[16];
Expand Down Expand Up @@ -432,8 +445,18 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
ProfileEvents::Counters part_counters;
auto profile_events_scope = std::make_unique<ProfileEventsScope>(&part_counters);

/// Write part to the filesystem under temporary name. Calculate a checksum.
/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block
std::optional<BlockWithPartition> unmerged_block;

if constexpr (async_insert)
{
/// we copy everything but offsets which we move because they are only used by async insert
if (settings.optimize_on_insert && storage.writer.getMergingMode() != MergeTreeData::MergingParams::Mode::Ordinary)
unmerged_block.emplace(Block(current_block.block), Row(current_block.partition), std::move(current_block.offsets));
}

/// Write part to the filesystem under temporary name. Calculate a checksum.
auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);

/// If optimize_on_insert setting is true, current_block could become empty after merge
Expand All @@ -446,31 +469,35 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
if constexpr (async_insert)
{
/// TODO consider insert_deduplication_token
block_id = getHashesForBlocks(current_block, temp_part.part->info.partition_id);
block_id = getHashesForBlocks(unmerged_block.has_value() ? *unmerged_block : current_block, temp_part.part->info.partition_id);
LOG_TRACE(log, "async insert part, part id {}, block id {}, offsets {}, size {}", temp_part.part->info.partition_id, toString(block_id), toString(current_block.offsets), current_block.offsets.size());
}
else if (deduplicate)
else
{
String block_dedup_token;

/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
if (deduplicate)
{
String block_dedup_token;

const String & dedup_token = settings.insert_deduplication_token;
if (!dedup_token.empty())
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.

const String & dedup_token = settings.insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}

block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
}
else
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}

block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
}
else
{
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}

profile_events_scope.reset();
Expand Down Expand Up @@ -501,6 +528,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
elapsed_ns,
std::move(block_id),
std::move(current_block),
std::move(unmerged_block),
std::move(part_counters) /// profile_events_scope must be reset here.
));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
string1
------------
string1
------------
string1
string1
string2
------------
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS 02810_async_insert_dedup_collapsing"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE 02810_async_insert_dedup_collapsing (stringvalue String, sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/{database}/02810_async_insert_dedup', 'r1', sign) ORDER BY stringvalue"

url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_busy_timeout_ms=3000&async_insert_deduplicate=1"

# insert value with same key and sign so it's collapsed on insert
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &

wait

${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"

# trigger same collaps algorithm but also deduplication
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &

wait

${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"

${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1), ('string1', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1)" &

wait

${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"

${CLICKHOUSE_CLIENT} -q "DROP TABLE 02810_async_insert_dedup_collapsing"