Skip to content

Commit

Permalink
Merge pull request #51676 from ClickHouse/fix-async-insert-dedup-for-…
Browse files Browse the repository at this point in the history
…merging-algorithms

Fix async insert with deduplication for ReplicatedMergeTree using merging algorithms
  • Loading branch information
antonio2368 committed Jul 1, 2023
2 parents 8b4fda9 + 4f59261 commit 3e8358e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 27 deletions.
1 change: 0 additions & 1 deletion src/Processors/Transforms/ExpressionTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ void ConvertingTransform::onConsume(Chunk chunk)
expression->execute(block, num_rows);

chunk.setColumns(block.getColumns(), num_rows);
chunk.setChunkInfo(chunk.getChunkInfo());
cur_chunk = std::move(chunk);
}

Expand Down
5 changes: 5 additions & 0 deletions src/Storages/MergeTree/MergeTreeDataWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class MergeTreeDataWriter
*/
TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);

MergeTreeData::MergingParams::Mode getMergingMode() const
{
return data.merging_params.mode;
}

TemporaryPart writeTempPartWithoutPrefix(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, int64_t block_number, ContextPtr context);

/// For insertion.
Expand Down
80 changes: 54 additions & 26 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ThreadFuzzer.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
Expand Down Expand Up @@ -54,6 +56,9 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
UInt64 elapsed_ns;
BlockIDsType block_id;
BlockWithPartition block_with_partition;
/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block
std::optional<BlockWithPartition> unmerged_block_with_partition;
std::unordered_map<String, std::vector<size_t>> block_id_to_offset_idx;
ProfileEvents::Counters part_counters;

Expand All @@ -63,12 +68,14 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
UInt64 elapsed_ns_,
BlockIDsType && block_id_,
BlockWithPartition && block_,
std::optional<BlockWithPartition> && unmerged_block_with_partition_,
ProfileEvents::Counters && part_counters_)
: log(log_),
temp_part(std::move(temp_part_)),
elapsed_ns(elapsed_ns_),
block_id(std::move(block_id_)),
block_with_partition(std::move(block_)),
unmerged_block_with_partition(std::move(unmerged_block_with_partition_)),
part_counters(std::move(part_counters_))
{
initBlockIDMap();
Expand Down Expand Up @@ -113,6 +120,7 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
{
if constexpr (async_insert)
{
auto * current_block_with_partition = unmerged_block_with_partition.has_value() ? &unmerged_block_with_partition.value() : &block_with_partition;
std::vector<size_t> offset_idx;
for (const auto & raw_path : block_paths)
{
Expand All @@ -127,14 +135,14 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
}
std::sort(offset_idx.begin(), offset_idx.end());

auto & offsets = block_with_partition.offsets;
auto & offsets = current_block_with_partition->offsets;
size_t idx = 0, remove_count = 0;
auto it = offset_idx.begin();
std::vector<size_t> new_offsets;
std::vector<String> new_block_ids;

/// construct filter
size_t rows = block_with_partition.block.rows();
size_t rows = current_block_with_partition->block.rows();
auto filter_col = ColumnUInt8::create(rows, 1u);
ColumnUInt8::Container & vec = filter_col->getData();
UInt8 * pos = vec.data();
Expand Down Expand Up @@ -162,18 +170,21 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk

LOG_TRACE(log, "New block IDs: {}, new offsets: {}, size: {}", toString(new_block_ids), toString(new_offsets), new_offsets.size());

block_with_partition.offsets = std::move(new_offsets);
current_block_with_partition->offsets = std::move(new_offsets);
block_id = std::move(new_block_ids);
auto cols = block_with_partition.block.getColumns();
auto cols = current_block_with_partition->block.getColumns();
for (auto & col : cols)
{
col = col->filter(vec, rows - remove_count);
}
block_with_partition.block.setColumns(cols);
current_block_with_partition->block.setColumns(cols);

LOG_TRACE(log, "New block rows {}", block_with_partition.block.rows());
LOG_TRACE(log, "New block rows {}", current_block_with_partition->block.rows());

initBlockIDMap();

if (unmerged_block_with_partition.has_value())
block_with_partition.block = unmerged_block_with_partition->block;
}
else
{
Expand Down Expand Up @@ -202,7 +213,7 @@ std::vector<Int64> testSelfDeduplicate(std::vector<Int64> data, std::vector<size
BlockWithPartition block1(std::move(block), Row(), std::move(offsets));
ProfileEvents::Counters profile_counters;
ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition part(
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1), std::move(profile_counters));
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1), std::nullopt, std::move(profile_counters));

part.filterSelfDuplicate();

Expand Down Expand Up @@ -235,8 +246,10 @@ namespace
{
SipHash hash;
for (size_t i = start; i < offset; ++i)
{
for (const auto & col : cols)
col->updateHashWithValue(i, hash);
}
union
{
char bytes[16];
Expand Down Expand Up @@ -432,8 +445,18 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
ProfileEvents::Counters part_counters;
auto profile_events_scope = std::make_unique<ProfileEventsScope>(&part_counters);

/// Write part to the filesystem under temporary name. Calculate a checksum.
/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block
std::optional<BlockWithPartition> unmerged_block;

if constexpr (async_insert)
{
/// we copy everything but offsets which we move because they are only used by async insert
if (settings.optimize_on_insert && storage.writer.getMergingMode() != MergeTreeData::MergingParams::Mode::Ordinary)
unmerged_block.emplace(Block(current_block.block), Row(current_block.partition), std::move(current_block.offsets));
}

/// Write part to the filesystem under temporary name. Calculate a checksum.
auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);

/// If optimize_on_insert setting is true, current_block could become empty after merge
Expand All @@ -446,31 +469,35 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
if constexpr (async_insert)
{
/// TODO consider insert_deduplication_token
block_id = getHashesForBlocks(current_block, temp_part.part->info.partition_id);
block_id = getHashesForBlocks(unmerged_block.has_value() ? *unmerged_block : current_block, temp_part.part->info.partition_id);
LOG_TRACE(log, "async insert part, part id {}, block id {}, offsets {}, size {}", temp_part.part->info.partition_id, toString(block_id), toString(current_block.offsets), current_block.offsets.size());
}
else if (deduplicate)
else
{
String block_dedup_token;

/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
if (deduplicate)
{
String block_dedup_token;

const String & dedup_token = settings.insert_deduplication_token;
if (!dedup_token.empty())
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.

const String & dedup_token = settings.insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}

block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
}
else
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}

block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
}
else
{
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}

profile_events_scope.reset();
Expand Down Expand Up @@ -501,6 +528,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
elapsed_ns,
std::move(block_id),
std::move(current_block),
std::move(unmerged_block),
std::move(part_counters) /// profile_events_scope must be reset here.
));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
string1
------------
string1
------------
string1
string1
string2
------------
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS 02810_async_insert_dedup_collapsing"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE 02810_async_insert_dedup_collapsing (stringvalue String, sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/{database}/02810_async_insert_dedup', 'r1', sign) ORDER BY stringvalue"

url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_busy_timeout_ms=3000&async_insert_deduplicate=1"

# insert value with same key and sign so it's collapsed on insert
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &

wait

${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"

# trigger same collaps algorithm but also deduplication
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &

wait

${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"

${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1), ('string1', 1)" &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string2', 1)" &

wait

${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"

${CLICKHOUSE_CLIENT} -q "DROP TABLE 02810_async_insert_dedup_collapsing"

0 comments on commit 3e8358e

Please sign in to comment.