-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add tests for cancelling backup/restore queries.
- Loading branch information
Showing
6 changed files
with
300 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
7 changes: 7 additions & 0 deletions
7
tests/integration/test_backup_restore_new/configs/slow_backups.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
<clickhouse> | ||
<backups> | ||
<test_inject_sleep>true</test_inject_sleep> | ||
</backups> | ||
<backup_threads>2</backup_threads> | ||
<restore_threads>2</restore_threads> | ||
</clickhouse> |
199 changes: 199 additions & 0 deletions
199
tests/integration/test_backup_restore_new/test_cancel_backup.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
import pytest | ||
from helpers.cluster import ClickHouseCluster | ||
from helpers.test_tools import TSV, assert_eq_with_retry | ||
import uuid | ||
|
||
|
||
cluster = ClickHouseCluster(__file__) | ||
|
||
main_configs = [ | ||
"configs/backups_disk.xml", | ||
"configs/slow_backups.xml", | ||
] | ||
|
||
node = cluster.add_instance( | ||
"node", | ||
main_configs=main_configs, | ||
external_dirs=["/backups/"], | ||
) | ||
|
||
|
||
@pytest.fixture(scope="module", autouse=True) | ||
def start_cluster(): | ||
try: | ||
cluster.start() | ||
yield cluster | ||
finally: | ||
cluster.shutdown() | ||
|
||
|
||
@pytest.fixture(autouse=True) | ||
def drop_after_test(): | ||
try: | ||
yield | ||
finally: | ||
node.query("DROP TABLE IF EXISTS tbl SYNC") | ||
|
||
|
||
# Generate the backup name. | ||
def get_backup_name(backup_id): | ||
return f"Disk('backups', '{backup_id}')" | ||
|
||
|
||
# Start making a backup asynchronously. | ||
def start_backup(backup_id): | ||
node.query( | ||
f"BACKUP TABLE tbl TO {get_backup_name(backup_id)} SETTINGS id='{backup_id}' ASYNC" | ||
) | ||
|
||
assert ( | ||
node.query(f"SELECT status FROM system.backups WHERE id='{backup_id}'") | ||
== "CREATING_BACKUP\n" | ||
) | ||
assert ( | ||
node.query( | ||
f"SELECT count() FROM system.processes WHERE query_kind='Backup' AND query LIKE '%{backup_id}%'" | ||
) | ||
== "1\n" | ||
) | ||
|
||
|
||
# Wait for the backup to be completed. | ||
def wait_backup(backup_id): | ||
assert_eq_with_retry( | ||
node, | ||
f"SELECT status FROM system.backups WHERE id='{backup_id}'", | ||
"BACKUP_CREATED", | ||
retry_count=60, | ||
sleep_time=5, | ||
) | ||
|
||
backup_duration = int( | ||
node.query( | ||
f"SELECT end_time - start_time FROM system.backups WHERE id='{backup_id}'" | ||
) | ||
) | ||
assert backup_duration >= 3 # Backup is not expected to be too quick in this test. | ||
|
||
|
||
# Cancel the specified backup. | ||
def cancel_backup(backup_id): | ||
node.query( | ||
f"KILL QUERY WHERE query_kind='Backup' AND query LIKE '%{backup_id}%' SYNC" | ||
) | ||
assert ( | ||
node.query(f"SELECT status FROM system.backups WHERE id='{backup_id}'") | ||
== "BACKUP_FAILED\n" | ||
) | ||
expected_error = "QUERY_WAS_CANCELLED" | ||
assert expected_error in node.query( | ||
f"SELECT error FROM system.backups WHERE id='{backup_id}'" | ||
) | ||
assert ( | ||
node.query( | ||
f"SELECT count() FROM system.processes WHERE query_kind='Backup' AND query LIKE '%{backup_id}%'" | ||
) | ||
== "0\n" | ||
) | ||
node.query("SYSTEM FLUSH LOGS") | ||
kill_duration_ms = int( | ||
node.query( | ||
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{backup_id}%' AND type='QueryFinish'" | ||
) | ||
) | ||
assert kill_duration_ms < 2000 # Query must be cancelled quickly | ||
|
||
|
||
# Start restoring from a backup. | ||
def start_restore(restore_id, backup_id): | ||
node.query( | ||
f"RESTORE TABLE tbl FROM {get_backup_name(backup_id)} SETTINGS id='{restore_id}' ASYNC" | ||
) | ||
|
||
assert ( | ||
node.query(f"SELECT status FROM system.backups WHERE id='{restore_id}'") | ||
== "RESTORING\n" | ||
) | ||
assert ( | ||
node.query( | ||
f"SELECT count() FROM system.processes WHERE query_kind='Restore' AND query LIKE '%{restore_id}%'" | ||
) | ||
== "1\n" | ||
) | ||
|
||
|
||
# Wait for the restore operation to be completed. | ||
def wait_restore(restore_id): | ||
assert_eq_with_retry( | ||
node, | ||
f"SELECT status FROM system.backups WHERE id='{restore_id}'", | ||
"RESTORED", | ||
retry_count=60, | ||
sleep_time=5, | ||
) | ||
|
||
restore_duration = int( | ||
node.query( | ||
f"SELECT end_time - start_time FROM system.backups WHERE id='{restore_id}'" | ||
) | ||
) | ||
assert ( | ||
restore_duration >= 3 | ||
) # Restore is not expected to be too quick in this test. | ||
|
||
|
||
# Cancel the specified restore operation. | ||
def cancel_restore(restore_id): | ||
node.query( | ||
f"KILL QUERY WHERE query_kind='Restore' AND query LIKE '%{restore_id}%' SYNC" | ||
) | ||
assert ( | ||
node.query(f"SELECT status FROM system.backups WHERE id='{restore_id}'") | ||
== "RESTORE_FAILED\n" | ||
) | ||
expected_error = "QUERY_WAS_CANCELLED" | ||
assert expected_error in node.query( | ||
f"SELECT error FROM system.backups WHERE id='{restore_id}'" | ||
) | ||
assert ( | ||
node.query( | ||
f"SELECT count() FROM system.processes WHERE query_kind='Restore' AND query LIKE '%{restore_id}%'" | ||
) | ||
== "0\n" | ||
) | ||
node.query("SYSTEM FLUSH LOGS") | ||
kill_duration_ms = int( | ||
node.query( | ||
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{restore_id}%' AND type='QueryFinish'" | ||
) | ||
) | ||
assert kill_duration_ms < 2000 # Query must be cancelled quickly | ||
|
||
|
||
def test_cancel_backup(): | ||
# We use partitioning so backups would contain more files. | ||
node.query( | ||
"CREATE TABLE tbl (x UInt64) ENGINE=MergeTree() ORDER BY tuple() PARTITION BY x%5" | ||
) | ||
|
||
node.query(f"INSERT INTO tbl SELECT number FROM numbers(500)") | ||
|
||
try_backup_id_1 = uuid.uuid4().hex | ||
start_backup(try_backup_id_1) | ||
cancel_backup(try_backup_id_1) | ||
|
||
backup_id = uuid.uuid4().hex | ||
start_backup(backup_id) | ||
wait_backup(backup_id) | ||
|
||
node.query(f"DROP TABLE tbl SYNC") | ||
|
||
try_restore_id_1 = uuid.uuid4().hex | ||
start_restore(try_restore_id_1, backup_id) | ||
cancel_restore(try_restore_id_1) | ||
|
||
node.query(f"DROP TABLE tbl SYNC") | ||
|
||
restore_id = uuid.uuid4().hex | ||
start_restore(restore_id, backup_id) | ||
wait_restore(restore_id) |
71 changes: 71 additions & 0 deletions
71
tests/integration/test_backup_restore_on_cluster/test_kill_backup.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
from time import sleep | ||
import pytest | ||
import re | ||
import os.path | ||
import random, string | ||
from helpers.cluster import ClickHouseCluster | ||
from helpers.test_tools import TSV, assert_eq_with_retry | ||
|
||
|
||
cluster = ClickHouseCluster(__file__) | ||
|
||
main_configs = [ | ||
"configs/inject_sleep.xml" | ||
"configs/backups_disk.xml", | ||
] | ||
|
||
node1 = cluster.add_instance( | ||
"node1", | ||
main_configs=main_configs, | ||
external_dirs=["/backups/"], | ||
) | ||
|
||
|
||
@pytest.fixture(scope="module", autouse=True) | ||
def start_cluster(): | ||
try: | ||
cluster.start() | ||
yield cluster | ||
finally: | ||
cluster.shutdown() | ||
|
||
|
||
@pytest.fixture(autouse=True) | ||
def drop_after_test(): | ||
try: | ||
yield | ||
finally: | ||
node1.query("DROP TABLE IF EXISTS tbl SYNC") | ||
|
||
|
||
backup_id_counter = 0 | ||
|
||
|
||
def new_backup_name(): | ||
global backup_id_counter | ||
backup_id_counter += 1 | ||
return f"Disk('backups', '{backup_id_counter}')" | ||
|
||
|
||
|
||
def test_kill_backup(): | ||
node1.query( | ||
"CREATE TABLE tbl (x UInt64) ENGINE=MergeTree() ORDER BY tuple()" | ||
) | ||
|
||
node1.query( | ||
"INSERT INTO tbl SELECT number as x FROM numbers(1000000)" | ||
) | ||
|
||
backup_name = new_backup_name() | ||
|
||
# Make backup on node 1. | ||
node1.query(f"BACKUP TABLE tbl TO {backup_name} ASYNC") | ||
|
||
# Drop table on both nodes. | ||
node1.query(f"DROP TABLE tbl SYNC") | ||
|
||
# Restore from backup on node2. | ||
node1.query(f"RESTORE TABLE tbl FROM {backup_name} ASYNC") | ||
|
||
node1.query("KILL QUERY WHERE query_kind='Restore') |