Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #47010 to 23.2: Fix bug in zero-copy replication disk choice during fetch #47754

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/Storages/MergeTree/DataPartsExchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
writeUUIDText(part->uuid, out);

String remote_fs_metadata = parse<String>(params.get("remote_fs_metadata", ""));

std::regex re("\\s*,\\s*");
Strings capability(
std::sregex_token_iterator(remote_fs_metadata.begin(), remote_fs_metadata.end(), re, -1),
Expand Down Expand Up @@ -477,6 +478,22 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(

int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));

String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));

DiskPtr preffered_disk = disk;

if (!preffered_disk)
{
for (const auto & disk_candidate : data.getDisks())
{
if (toString(disk_candidate->getDataSourceDescription().type) == remote_fs_metadata)
{
preffered_disk = disk_candidate;
break;
}
}
}

ReservationPtr reservation;
size_t sum_files_size = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
Expand All @@ -496,11 +513,12 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using storage balanced reservation");
reservation
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);

if (!reservation)
{
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using TTL rules");
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true, preffered_disk);
}
}
}
Expand All @@ -525,7 +543,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
if (!disk)
{
disk = reservation->getDisk();
LOG_INFO(log, "Disk for fetch is not provided, getting disk from reservation {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
LOG_TEST(log, "Disk for fetch is not provided, getting disk from reservation {} with type '{}'", disk->getName(), toString(disk->getDataSourceDescription().type));
}
else
{
Expand All @@ -552,8 +570,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
readUUIDText(part_uuid, *in);

String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));

size_t projections = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
readBinary(projections, *in);
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_zero_copy_fetch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#!/usr/bin/env python3
30 changes: 30 additions & 0 deletions tests/integration/test_zero_copy_fetch/configs/storage_conf.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<default>
<disk>default</disk>
</default>
<main>
<disk>s3</disk>
<prefer_not_to_merge>False</prefer_not_to_merge>
<perform_ttl_move_on_insert>True</perform_ttl_move_on_insert>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>

<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
</merge_tree>
</clickhouse>
104 changes: 104 additions & 0 deletions tests/integration/test_zero_copy_fetch/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python3

import logging
import random
import string
import time

import pytest
from helpers.cluster import ClickHouseCluster


cluster = ClickHouseCluster(__file__)


@pytest.fixture(scope="module")
def started_cluster():
try:

cluster.add_instance(
"node1",
main_configs=["configs/storage_conf.xml"],
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"node2",
main_configs=["configs/storage_conf.xml"],
with_minio=True,
with_zookeeper=True,
)
cluster.start()

yield cluster
finally:
cluster.shutdown()


def test_fetch_correct_volume(started_cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]

node1.query(
"""
CREATE TABLE test1 (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test1', 'r1')
PARTITION BY toMonday(EventDate)
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
)

node1.query(
"INSERT INTO test1 SELECT toDate('2023-01-01') + toIntervalDay(number), number + 1000 from system.numbers limit 20"
)

def get_part_to_disk(query_result):
part_to_disk = {}
for row in query_result.strip().split("\n"):
print(row)
disk, part = row.split("\t")
part_to_disk[part] = disk
return part_to_disk

part_to_disk = get_part_to_disk(
node1.query(
"SELECT disk_name, name FROM system.parts where table = 'test1' and active"
)
)
for disk in part_to_disk.values():
assert disk == "default"

node1.query("ALTER TABLE test1 MOVE PARTITION '2022-12-26' TO DISK 's3'")
node1.query("ALTER TABLE test1 MOVE PARTITION '2023-01-02' TO DISK 's3'")
node1.query("ALTER TABLE test1 MOVE PARTITION '2023-01-09' TO DISK 's3'")

part_to_disk = get_part_to_disk(
node1.query(
"SELECT disk_name, name FROM system.parts where table = 'test1' and active"
)
)
assert part_to_disk["20221226_0_0_0"] == "s3"
assert part_to_disk["20230102_0_0_0"] == "s3"
assert part_to_disk["20230109_0_0_0"] == "s3"
assert part_to_disk["20230116_0_0_0"] == "default"

node2.query(
"""
CREATE TABLE test1 (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test1', 'r2')
PARTITION BY toMonday(EventDate)
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
)

node2.query("SYSTEM SYNC REPLICA test1")

part_to_disk = get_part_to_disk(
node2.query(
"SELECT disk_name, name FROM system.parts where table = 'test1' and active"
)
)
assert part_to_disk["20221226_0_0_0"] == "s3"
assert part_to_disk["20230102_0_0_0"] == "s3"
assert part_to_disk["20230109_0_0_0"] == "s3"
assert part_to_disk["20230116_0_0_0"] == "default"