Skip to content

Commit

Permalink
add test_delete_race_leftovers
Browse files Browse the repository at this point in the history
  • Loading branch information
vdimir committed Jul 9, 2023
1 parent 88911e1 commit 790b438
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<s31>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</s31>
<s32>
<type>s3</type>
<endpoint>http://minio1:9001/root/data2/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s32>
</disks>
<policies>
<two_disks>
Expand All @@ -15,10 +21,17 @@
<disk>default</disk>
</default>
<external>
<disk>s3</disk>
<disk>s31</disk>
</external>
</volumes>
</two_disks>
<one_disk>
<volumes>
<external>
<disk>s32</disk>
</external>
</volumes>
</one_disk>
</policies>
</storage_configuration>

Expand Down
91 changes: 88 additions & 3 deletions tests/integration/test_alter_moving_garbage/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def cluster():
def create_table(node, table_name, replicated, additional_settings):
settings = {
"storage_policy": "two_disks",
"old_parts_lifetime": 1,
"old_parts_lifetime": 0,
"index_granularity": 512,
"temporary_directories_lifetime": 0,
"merge_tree_clear_old_temporary_directories_interval_seconds": 1,
Expand Down Expand Up @@ -73,9 +73,13 @@ def create_table(node, table_name, replicated, additional_settings):
"allow_remote_fs_zero_copy_replication,replicated_engine",
[(False, False), (False, True), (True, True)],
)
def test_create_table(
def test_alter_moving(
cluster, allow_remote_fs_zero_copy_replication, replicated_engine
):
"""
Test that we correctly move parts during ALTER TABLE
"""

if replicated_engine:
nodes = list(cluster.instances.values())
else:
Expand Down Expand Up @@ -126,7 +130,7 @@ def alter():
partition = f"2021-01-{i:02d}"
try:
random.choice(nodes).query(
f"ALTER TABLE {table_name} MOVE PARTITION '{partition}' TO DISK 's3'",
f"ALTER TABLE {table_name} MOVE PARTITION '{partition}' TO DISK 's31'",
)
except QueryRuntimeException as e:
if "PART_IS_TEMPORARILY_LOCKED" in str(e):
Expand All @@ -153,3 +157,84 @@ def alter():
)

assert data_digest == "1000\n"


def test_delete_race_leftovers(cluster):
"""
Test that we correctly delete outdated parts and do not leave any leftovers on s3
"""

node = cluster.instances["node1"]

table_name = "test_delete_race_leftovers"
additional_settings = {
# use another disk not to interfere with other tests
"storage_policy": "one_disk",
# always remove parts in parallel
"concurrent_part_removal_threshold": 1,
}

create_table(
node, table_name, replicated=True, additional_settings=additional_settings
)

# Stop merges to have several small parts in active set
node.query(f"SYSTEM STOP MERGES {table_name}")

# Creare several small parts in one partition
for i in range(1, 11):
node.query(
f"INSERT INTO {table_name} SELECT toDate('2021-01-01'), number as id, toString(sipHash64(number, {i})) FROM numbers(10_000)"
)
table_digest_query = f"SELECT count(), sum(sipHash64(id, data)) FROM {table_name}"
table_digest = node.query(table_digest_query)

# Execute several noop deletes to have parts with updated mutation id without changes in data
# New parts will have symlinks to old parts
node.query(f"SYSTEM START MERGES {table_name}")
for i in range(10):
node.query(f"DELETE FROM {table_name} WHERE data = ''")

# Make existing parts outdated
# Also we don't want have changing parts set,
# because it will be difficult match objects on s3 and in remote_data_paths to check correctness
node.query(f"OPTIMIZE TABLE {table_name} FINAL")

inactive_parts_query = (
f"SELECT count() FROM system.parts "
f"WHERE not active AND table = '{table_name}' AND database = 'default'"
)

# Try to wait for deletion of outdated parts
# However, we do not want to wait too long
# If some parts are not deleted after several iterations, we will just continue
for i in range(20):
inactive_parts_count = int(node.query(inactive_parts_query).strip())
if inactive_parts_count == 0:
print(f"Inactive parts are deleted after {i} iterations")
break

print(f"Inactive parts count: {inactive_parts_count}")
time.sleep(5)

# Check that we correctly deleted all outdated parts and no leftovers on s3
known_remote_paths = set(
node.query(
f"SELECT remote_path FROM system.remote_data_paths WHERE disk_name = 's32'"
).splitlines()
)

all_remote_paths = set(
obj.object_name
for obj in cluster.minio_client.list_objects(
cluster.minio_bucket, "data2/", recursive=True
)
)

# Some blobs can be deleted after we listed remote_data_paths
# It's alright, thus we check only that all remote paths are known
# (in other words, all remote paths is subset of known paths)
assert all_remote_paths == {p for p in known_remote_paths if p in all_remote_paths}

# Check that we have all data
assert table_digest == node.query(table_digest_query)

0 comments on commit 790b438

Please sign in to comment.