Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #41741 to 22.9: Fix intersecting parts #42122

Merged
merged 1 commit into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);

if (!metadata_storage.exists(from_path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist, cannot move", to_path);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist, cannot move", from_path);

tx->moveFile(from_path, to_path);
}));
Expand Down
19 changes: 17 additions & 2 deletions src/Storages/MergeTree/DataPartStorageOnDisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace ErrorCodes
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int NOT_ENOUGH_SPACE;
extern const int LOGICAL_ERROR;
extern const int FILE_DOESNT_EXIST;
}

DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string part_dir_)
Expand Down Expand Up @@ -249,9 +250,23 @@ void DataPartStorageOnDisk::remove(
can_remove_description.emplace(can_remove_callback());
disk->removeSharedRecursive(fs::path(to) / "", !can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove);
}
catch (...)
catch (const Exception & e)
{
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
return;
}
throw;
}
catch (const fs::filesystem_error & e)
{
LOG_ERROR(log, "Cannot recursively remove directory {}. Exception: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
if (e.code() == std::errc::no_such_file_or_directory)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. "
"Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
return;
}
throw;
}
}
Expand Down
21 changes: 16 additions & 5 deletions src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
String replica_path = storage.zookeeper_path + "/replicas/" + replica;

Strings parts = zookeeper->getChildren(replica_path + "/parts");
Strings parts_found;
for (const String & part_on_replica : parts)
{
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version);
Expand All @@ -174,14 +175,22 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
if (part_info.contains(part_on_replica_info))
{
if (part_on_replica_info.min_block == part_info.min_block)
{
found_part_with_the_same_min_block = true;
parts_found.push_back(part_on_replica);
}
if (part_on_replica_info.max_block == part_info.max_block)
{
found_part_with_the_same_max_block = true;
parts_found.push_back(part_on_replica);
}

if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
/// FIXME It may never appear
LOG_INFO(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. Hoping that it will eventually appear as a result of a merge.", part_name, replica);
LOG_INFO(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. "
"Hoping that it will eventually appear as a result of a merge. Parts: {}",
part_name, replica, fmt::join(parts_found, ", "));
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
}
Expand Down Expand Up @@ -209,17 +218,19 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
/// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue.
if (exists_in_zookeeper)
{
/// If part found on some other replica
if (missing_part_search_result == MissingPartSearchResult::FoundAndNeedFetch)
{
LOG_WARNING(log, "Part {} exists in ZooKeeper but not locally and found on other replica. Removing from ZooKeeper and queueing a fetch.", part_name);
storage.removePartAndEnqueueFetch(part_name);
}
else /// If we have covering part on other replica or part is lost forever we don't need to fetch anything
else
{
LOG_WARNING(log, "Part {} exists in ZooKeeper but not locally and not found on other replica. Removing it from ZooKeeper.", part_name);
storage.removePartFromZooKeeper(part_name);
}

/// We cannot simply remove part from ZooKeeper, because it may be removed from virtual_part,
/// so we have to create some entry in the queue. Maybe we will execute it (by fetching part or covering part from somewhere),
/// maybe will simply replace with empty part.
storage.removePartAndEnqueueFetch(part_name);
}

ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7674,7 +7674,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(

if (!children.empty())
{
LOG_TRACE(logger, "Found {} ({}) zookeper locks for {}", children.size(), fmt::join(children, ", "), zookeeper_part_uniq_node);
LOG_TRACE(logger, "Found {} ({}) zookeeper locks for {}", children.size(), fmt::join(children, ", "), zookeeper_part_uniq_node);
part_has_no_more_locks = false;
continue;
}
Expand Down
3 changes: 0 additions & 3 deletions tests/config/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ fi

if [[ -n "$USE_S3_STORAGE_FOR_MERGE_TREE" ]] && [[ "$USE_S3_STORAGE_FOR_MERGE_TREE" -eq 1 ]]; then
ln -sf $SRC_PATH/config.d/s3_storage_policy_by_default.xml $DEST_SERVER_PATH/config.d/
# Too verbose logging in S3 tests
rm -f $DEST_SERVER_PATH/config.d/logger_test.xml
ln -sf $SRC_PATH/config.d/logger_trace.xml $DEST_SERVER_PATH/config.d/
fi

ARM="aarch64"
Expand Down
14 changes: 13 additions & 1 deletion tests/integration/test_lost_part/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def test_lost_part_same_replica(start_cluster):
node1.query("ATTACH TABLE mt0")

node1.query("SYSTEM START MERGES mt0")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
Expand Down Expand Up @@ -133,6 +136,9 @@ def test_lost_part_other_replica(start_cluster):
node1.query("CHECK TABLE mt1")

node2.query("SYSTEM START REPLICATION QUEUES")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node2.query("SELECT count() FROM system.replication_queue")
Expand Down Expand Up @@ -190,6 +196,9 @@ def test_lost_part_mutation(start_cluster):
node1.query("CHECK TABLE mt2")

node1.query("SYSTEM START MERGES mt2")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
Expand Down Expand Up @@ -237,10 +246,13 @@ def test_lost_last_part(start_cluster):
node1.query("CHECK TABLE mt3")

node1.query("SYSTEM START MERGES mt3")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt3")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
assert int(result) <= 1, "Have a lot of entries in queue {}".format(
assert int(result) <= 2, "Have a lot of entries in queue {}".format(
node1.query("SELECT * FROM system.replication_queue FORMAT Vertical")
)
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ function drop_db()
{
while true; do
database=$($CLICKHOUSE_CLIENT -q "select name from system.databases where name like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [[ "$database" == "$CLICKHOUSE_DATABASE" ]]; then return; fi
if [ -z "$database" ]; then return; fi
if [[ "$database" == "$CLICKHOUSE_DATABASE" ]]; then continue; fi
if [ -z "$database" ]; then continue; fi
$CLICKHOUSE_CLIENT -n --query \
"drop database if exists $database" 2>&1| grep -Fa "Exception: "
sleep 0.$RANDOM
Expand All @@ -38,7 +38,7 @@ function sync_db()
{
while true; do
database=$($CLICKHOUSE_CLIENT -q "select name from system.databases where name like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$database" ]; then return; fi
if [ -z "$database" ]; then continue; fi
$CLICKHOUSE_CLIENT --receive_timeout=1 -q \
"system sync database replica $database" 2>&1| grep -Fa "Exception: " | grep -Fv TIMEOUT_EXCEEDED | grep -Fv "only with Replicated engine" | grep -Fv UNKNOWN_DATABASE
sleep 0.$RANDOM
Expand All @@ -49,7 +49,7 @@ function create_table()
{
while true; do
database=$($CLICKHOUSE_CLIENT -q "select name from system.databases where name like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$database" ]; then return; fi
if [ -z "$database" ]; then continue; fi
$CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \
"create table $database.rmt_${RANDOM}_${RANDOM}_${RANDOM} (n int) engine=ReplicatedMergeTree order by tuple() -- suppress $CLICKHOUSE_TEST_ZOOKEEPER_PREFIX" \
2>&1| grep -Fa "Exception: " | grep -Fv "Macro 'uuid' and empty arguments" | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE
Expand All @@ -61,9 +61,9 @@ function alter_table()
{
while true; do
table=$($CLICKHOUSE_CLIENT -q "select database || '.' || name from system.tables where database like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$table" ]; then return; fi
if [ -z "$table" ]; then continue; fi
$CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \
"alter table $table on cluster $database update n = n + (select max(n) from merge(REGEXP('${CLICKHOUSE_DATABASE}.*'), '.*')) where 1 settings allow_nondeterministic_mutations=1" \
"alter table $table update n = n + (select max(n) from merge(REGEXP('${CLICKHOUSE_DATABASE}.*'), '.*')) where 1 settings allow_nondeterministic_mutations=1" \
2>&1| grep -Fa "Exception: " | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE | grep -Fv UNKNOWN_TABLE | grep -Fv TABLE_IS_READ_ONLY
sleep 0.$RANDOM
done
Expand All @@ -73,7 +73,7 @@ function insert()
{
while true; do
table=$($CLICKHOUSE_CLIENT -q "select database || '.' || name from system.tables where database like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$table" ]; then return; fi
if [ -z "$table" ]; then continue; fi
$CLICKHOUSE_CLIENT -q \
"insert into $table values ($RANDOM)" 2>&1| grep -Fa "Exception: " | grep -Fv UNKNOWN_DATABASE | grep -Fv UNKNOWN_TABLE | grep -Fv TABLE_IS_READ_ONLY
done
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
1 1 all_0_1_1
1 2 all_0_1_1
2 1 all_0_0_0
2 2 all_1_1_0
0
3 1 all_0_2_2
3 2 all_0_2_2
3 3 all_0_2_2
4 1 all_0_2_2
4 2 all_0_2_2
4 3 all_0_2_2
46 changes: 46 additions & 0 deletions tests/queries/0_stateless/02369_lost_part_intersecting_merges.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env bash
# Tags: zookeeper

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

$CLICKHOUSE_CLIENT -q "drop table if exists rmt1;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt2;"

$CLICKHOUSE_CLIENT -q "create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '1') order by n;"
$CLICKHOUSE_CLIENT -q "create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '2') order by n;"

$CLICKHOUSE_CLIENT -q "insert into rmt1 values (1);"
$CLICKHOUSE_CLIENT -q "insert into rmt1 values (2);"

$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt2;"
$CLICKHOUSE_CLIENT -q "system stop merges rmt2;"
$CLICKHOUSE_CLIENT -q "optimize table rmt1 final;"

$CLICKHOUSE_CLIENT -q "select 1, *, _part from rmt1 order by n;"
$CLICKHOUSE_CLIENT -q "select 2, *, _part from rmt2 order by n;"

path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='rmt1' and name='all_0_1_1'")
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
rm -rf $path

$CLICKHOUSE_CLIENT -q "select * from rmt1;" 2>/dev/null

$CLICKHOUSE_CLIENT -q "detach table rmt1;"
$CLICKHOUSE_CLIENT -q "attach table rmt1;"

$CLICKHOUSE_CLIENT -q "insert into rmt1 values (3);"
$CLICKHOUSE_CLIENT -q "system start merges rmt2;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "optimize table rmt1 final;"

$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt2;"
$CLICKHOUSE_CLIENT -q "select 3, *, _part from rmt1 order by n;"
$CLICKHOUSE_CLIENT -q "select 4, *, _part from rmt2 order by n;"

$CLICKHOUSE_CLIENT -q "drop table rmt1;"
$CLICKHOUSE_CLIENT -q "drop table rmt2;"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Replication did not hang: synced all replicas of alter_table
Consistency: 1
101 changes: 101 additions & 0 deletions tests/queries/0_stateless/02396_system_parts_race_condition_rm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env bash
# Tags: race, zookeeper, no-parallel, disabled

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./replication.lib
. "$CURDIR"/replication.lib

set -e

# NOTE this test is copy of 00992_system_parts_race_condition_zookeeper_long, but with extra thread7

$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;

CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0;
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0
"

function thread1()
{
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
while true; do $CLICKHOUSE_CLIENT --query "SELECT * FROM system.parts FORMAT Null"; done
}

function thread2()
{
while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table0 ADD COLUMN h String DEFAULT '0'; ALTER TABLE alter_table0 MODIFY COLUMN h UInt64; ALTER TABLE alter_table0 DROP COLUMN h;"; done
}

function thread3()
{
while true; do $CLICKHOUSE_CLIENT -q "INSERT INTO alter_table0 SELECT rand(1), rand(2), 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(100000)"; done
}

function thread4()
{
while true; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table0 FINAL"; done
}

function thread5()
{
while true; do $CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table0 DELETE WHERE cityHash64(a,b,c,d,e,g) % 1048576 < 524288"; done
}

function thread7()
{
while true; do
path=$($CLICKHOUSE_CLIENT -q "SELECT path FROM system.parts WHERE database='$CLICKHOUSE_DATABASE' AND table LIKE 'alter_table%' ORDER BY rand() LIMIT 1")
if [ -z "$path" ]; then continue; fi
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path') format Null" || exit
rm -rf $path 2> /dev/null
sleep 0.$RANDOM;
done
}

# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;
export -f thread3;
export -f thread4;
export -f thread5;
export -f thread7;

TIMEOUT=10

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread7 &

wait
check_replication_consistency "alter_table" "count(), sum(a), sum(b), round(sum(c))"

$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table0;" 2> >(grep -F -v 'is already started to be removing by another replica right now') &
$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table1;" 2> >(grep -F -v 'is already started to be removing by another replica right now') &
wait
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Replication did not hang: synced all replicas of alter_table_
Consistency: 1