Skip to content

Commit

Permalink
Merge pull request #42122 from ClickHouse/backport/22.9/41741
Browse files Browse the repository at this point in the history
Backport #41741 to 22.9: Fix intersecting parts
  • Loading branch information
tavplubix committed Oct 7, 2022
2 parents 17660b2 + 4bafedc commit f1af66b
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);

if (!metadata_storage.exists(from_path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist, cannot move", to_path);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist, cannot move", from_path);

tx->moveFile(from_path, to_path);
}));
Expand Down
19 changes: 17 additions & 2 deletions src/Storages/MergeTree/DataPartStorageOnDisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace ErrorCodes
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int NOT_ENOUGH_SPACE;
extern const int LOGICAL_ERROR;
extern const int FILE_DOESNT_EXIST;
}

DataPartStorageOnDisk::DataPartStorageOnDisk(VolumePtr volume_, std::string root_path_, std::string part_dir_)
Expand Down Expand Up @@ -249,9 +250,23 @@ void DataPartStorageOnDisk::remove(
can_remove_description.emplace(can_remove_callback());
disk->removeSharedRecursive(fs::path(to) / "", !can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove);
}
catch (...)
catch (const Exception & e)
{
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
return;
}
throw;
}
catch (const fs::filesystem_error & e)
{
LOG_ERROR(log, "Cannot recursively remove directory {}. Exception: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
if (e.code() == std::errc::no_such_file_or_directory)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. "
"Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
return;
}
throw;
}
}
Expand Down
21 changes: 16 additions & 5 deletions src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
String replica_path = storage.zookeeper_path + "/replicas/" + replica;

Strings parts = zookeeper->getChildren(replica_path + "/parts");
Strings parts_found;
for (const String & part_on_replica : parts)
{
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version);
Expand All @@ -174,14 +175,22 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
if (part_info.contains(part_on_replica_info))
{
if (part_on_replica_info.min_block == part_info.min_block)
{
found_part_with_the_same_min_block = true;
parts_found.push_back(part_on_replica);
}
if (part_on_replica_info.max_block == part_info.max_block)
{
found_part_with_the_same_max_block = true;
parts_found.push_back(part_on_replica);
}

if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
/// FIXME It may never appear
LOG_INFO(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. Hoping that it will eventually appear as a result of a merge.", part_name, replica);
LOG_INFO(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. "
"Hoping that it will eventually appear as a result of a merge. Parts: {}",
part_name, replica, fmt::join(parts_found, ", "));
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
}
Expand Down Expand Up @@ -209,17 +218,19 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
/// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue.
if (exists_in_zookeeper)
{
/// If part found on some other replica
if (missing_part_search_result == MissingPartSearchResult::FoundAndNeedFetch)
{
LOG_WARNING(log, "Part {} exists in ZooKeeper but not locally and found on other replica. Removing from ZooKeeper and queueing a fetch.", part_name);
storage.removePartAndEnqueueFetch(part_name);
}
else /// If we have covering part on other replica or part is lost forever we don't need to fetch anything
else
{
LOG_WARNING(log, "Part {} exists in ZooKeeper but not locally and not found on other replica. Removing it from ZooKeeper.", part_name);
storage.removePartFromZooKeeper(part_name);
}

/// We cannot simply remove part from ZooKeeper, because it may be removed from virtual_part,
/// so we have to create some entry in the queue. Maybe we will execute it (by fetching part or covering part from somewhere),
/// maybe will simply replace with empty part.
storage.removePartAndEnqueueFetch(part_name);
}

ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7674,7 +7674,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(

if (!children.empty())
{
LOG_TRACE(logger, "Found {} ({}) zookeper locks for {}", children.size(), fmt::join(children, ", "), zookeeper_part_uniq_node);
LOG_TRACE(logger, "Found {} ({}) zookeeper locks for {}", children.size(), fmt::join(children, ", "), zookeeper_part_uniq_node);
part_has_no_more_locks = false;
continue;
}
Expand Down
3 changes: 0 additions & 3 deletions tests/config/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ fi

if [[ -n "$USE_S3_STORAGE_FOR_MERGE_TREE" ]] && [[ "$USE_S3_STORAGE_FOR_MERGE_TREE" -eq 1 ]]; then
ln -sf $SRC_PATH/config.d/s3_storage_policy_by_default.xml $DEST_SERVER_PATH/config.d/
# Too verbose logging in S3 tests
rm -f $DEST_SERVER_PATH/config.d/logger_test.xml
ln -sf $SRC_PATH/config.d/logger_trace.xml $DEST_SERVER_PATH/config.d/
fi

ARM="aarch64"
Expand Down
14 changes: 13 additions & 1 deletion tests/integration/test_lost_part/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def test_lost_part_same_replica(start_cluster):
node1.query("ATTACH TABLE mt0")

node1.query("SYSTEM START MERGES mt0")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt0")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
Expand Down Expand Up @@ -133,6 +136,9 @@ def test_lost_part_other_replica(start_cluster):
node1.query("CHECK TABLE mt1")

node2.query("SYSTEM START REPLICATION QUEUES")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt1")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node2.query("SELECT count() FROM system.replication_queue")
Expand Down Expand Up @@ -190,6 +196,9 @@ def test_lost_part_mutation(start_cluster):
node1.query("CHECK TABLE mt2")

node1.query("SYSTEM START MERGES mt2")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt2")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
Expand Down Expand Up @@ -237,10 +246,13 @@ def test_lost_last_part(start_cluster):
node1.query("CHECK TABLE mt3")

node1.query("SYSTEM START MERGES mt3")
res, err = node1.http_query_and_get_answer_with_error("SYSTEM SYNC REPLICA mt3")
print("result: ", res)
print("error: ", res)

for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
assert int(result) <= 1, "Have a lot of entries in queue {}".format(
assert int(result) <= 2, "Have a lot of entries in queue {}".format(
node1.query("SELECT * FROM system.replication_queue FORMAT Vertical")
)
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ function drop_db()
{
while true; do
database=$($CLICKHOUSE_CLIENT -q "select name from system.databases where name like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [[ "$database" == "$CLICKHOUSE_DATABASE" ]]; then return; fi
if [ -z "$database" ]; then return; fi
if [[ "$database" == "$CLICKHOUSE_DATABASE" ]]; then continue; fi
if [ -z "$database" ]; then continue; fi
$CLICKHOUSE_CLIENT -n --query \
"drop database if exists $database" 2>&1| grep -Fa "Exception: "
sleep 0.$RANDOM
Expand All @@ -38,7 +38,7 @@ function sync_db()
{
while true; do
database=$($CLICKHOUSE_CLIENT -q "select name from system.databases where name like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$database" ]; then return; fi
if [ -z "$database" ]; then continue; fi
$CLICKHOUSE_CLIENT --receive_timeout=1 -q \
"system sync database replica $database" 2>&1| grep -Fa "Exception: " | grep -Fv TIMEOUT_EXCEEDED | grep -Fv "only with Replicated engine" | grep -Fv UNKNOWN_DATABASE
sleep 0.$RANDOM
Expand All @@ -49,7 +49,7 @@ function create_table()
{
while true; do
database=$($CLICKHOUSE_CLIENT -q "select name from system.databases where name like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$database" ]; then return; fi
if [ -z "$database" ]; then continue; fi
$CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \
"create table $database.rmt_${RANDOM}_${RANDOM}_${RANDOM} (n int) engine=ReplicatedMergeTree order by tuple() -- suppress $CLICKHOUSE_TEST_ZOOKEEPER_PREFIX" \
2>&1| grep -Fa "Exception: " | grep -Fv "Macro 'uuid' and empty arguments" | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE
Expand All @@ -61,9 +61,9 @@ function alter_table()
{
while true; do
table=$($CLICKHOUSE_CLIENT -q "select database || '.' || name from system.tables where database like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$table" ]; then return; fi
if [ -z "$table" ]; then continue; fi
$CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \
"alter table $table on cluster $database update n = n + (select max(n) from merge(REGEXP('${CLICKHOUSE_DATABASE}.*'), '.*')) where 1 settings allow_nondeterministic_mutations=1" \
"alter table $table update n = n + (select max(n) from merge(REGEXP('${CLICKHOUSE_DATABASE}.*'), '.*')) where 1 settings allow_nondeterministic_mutations=1" \
2>&1| grep -Fa "Exception: " | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE | grep -Fv UNKNOWN_TABLE | grep -Fv TABLE_IS_READ_ONLY
sleep 0.$RANDOM
done
Expand All @@ -73,7 +73,7 @@ function insert()
{
while true; do
table=$($CLICKHOUSE_CLIENT -q "select database || '.' || name from system.tables where database like '${CLICKHOUSE_DATABASE}%' order by rand() limit 1")
if [ -z "$table" ]; then return; fi
if [ -z "$table" ]; then continue; fi
$CLICKHOUSE_CLIENT -q \
"insert into $table values ($RANDOM)" 2>&1| grep -Fa "Exception: " | grep -Fv UNKNOWN_DATABASE | grep -Fv UNKNOWN_TABLE | grep -Fv TABLE_IS_READ_ONLY
done
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
1 1 all_0_1_1
1 2 all_0_1_1
2 1 all_0_0_0
2 2 all_1_1_0
0
3 1 all_0_2_2
3 2 all_0_2_2
3 3 all_0_2_2
4 1 all_0_2_2
4 2 all_0_2_2
4 3 all_0_2_2
46 changes: 46 additions & 0 deletions tests/queries/0_stateless/02369_lost_part_intersecting_merges.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env bash
# Tags: zookeeper

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

$CLICKHOUSE_CLIENT -q "drop table if exists rmt1;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt2;"

$CLICKHOUSE_CLIENT -q "create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '1') order by n;"
$CLICKHOUSE_CLIENT -q "create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '2') order by n;"

$CLICKHOUSE_CLIENT -q "insert into rmt1 values (1);"
$CLICKHOUSE_CLIENT -q "insert into rmt1 values (2);"

$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt2;"
$CLICKHOUSE_CLIENT -q "system stop merges rmt2;"
$CLICKHOUSE_CLIENT -q "optimize table rmt1 final;"

$CLICKHOUSE_CLIENT -q "select 1, *, _part from rmt1 order by n;"
$CLICKHOUSE_CLIENT -q "select 2, *, _part from rmt2 order by n;"

path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='rmt1' and name='all_0_1_1'")
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
rm -rf $path

$CLICKHOUSE_CLIENT -q "select * from rmt1;" 2>/dev/null

$CLICKHOUSE_CLIENT -q "detach table rmt1;"
$CLICKHOUSE_CLIENT -q "attach table rmt1;"

$CLICKHOUSE_CLIENT -q "insert into rmt1 values (3);"
$CLICKHOUSE_CLIENT -q "system start merges rmt2;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "optimize table rmt1 final;"

$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt2;"
$CLICKHOUSE_CLIENT -q "select 3, *, _part from rmt1 order by n;"
$CLICKHOUSE_CLIENT -q "select 4, *, _part from rmt2 order by n;"

$CLICKHOUSE_CLIENT -q "drop table rmt1;"
$CLICKHOUSE_CLIENT -q "drop table rmt2;"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Replication did not hang: synced all replicas of alter_table
Consistency: 1
101 changes: 101 additions & 0 deletions tests/queries/0_stateless/02396_system_parts_race_condition_rm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env bash
# Tags: race, zookeeper, no-parallel, disabled

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./replication.lib
. "$CURDIR"/replication.lib

set -e

# NOTE this test is copy of 00992_system_parts_race_condition_zookeeper_long, but with extra thread7

$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0;
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0
"

function thread1()
{
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
while true; do $CLICKHOUSE_CLIENT --query "SELECT * FROM system.parts FORMAT Null"; done
}

function thread2()
{
while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table0 ADD COLUMN h String DEFAULT '0'; ALTER TABLE alter_table0 MODIFY COLUMN h UInt64; ALTER TABLE alter_table0 DROP COLUMN h;"; done
}

function thread3()
{
while true; do $CLICKHOUSE_CLIENT -q "INSERT INTO alter_table0 SELECT rand(1), rand(2), 1 / rand(3), toString(rand(4)), [rand(5), rand(6)], rand(7) % 2 ? NULL : generateUUIDv4(), (rand(8), rand(9)) FROM numbers(100000)"; done
}

function thread4()
{
while true; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table0 FINAL"; done
}

function thread5()
{
while true; do $CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table0 DELETE WHERE cityHash64(a,b,c,d,e,g) % 1048576 < 524288"; done
}

function thread7()
{
while true; do
path=$($CLICKHOUSE_CLIENT -q "SELECT path FROM system.parts WHERE database='$CLICKHOUSE_DATABASE' AND table LIKE 'alter_table%' ORDER BY rand() LIMIT 1")
if [ -z "$path" ]; then continue; fi
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path') format Null" || exit
rm -rf $path 2> /dev/null
sleep 0.$RANDOM;
done
}

# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;
export -f thread3;
export -f thread4;
export -f thread5;
export -f thread7;

TIMEOUT=10

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &

timeout $TIMEOUT bash -c thread7 &

wait
check_replication_consistency "alter_table" "count(), sum(a), sum(b), round(sum(c))"

$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table0;" 2> >(grep -F -v 'is already started to be removing by another replica right now') &
$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table1;" 2> >(grep -F -v 'is already started to be removing by another replica right now') &
wait
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Replication did not hang: synced all replicas of alter_table_
Consistency: 1
Loading

0 comments on commit f1af66b

Please sign in to comment.