Skip to content

Commit

Permalink
Add tests for cancelling backup/restore queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
vitlibar committed Jan 15, 2024
1 parent 3d7a17a commit 5eafd54
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 6 deletions.
19 changes: 15 additions & 4 deletions src/Backups/BackupsWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,11 @@ class BackupsWorker::ThreadPools
};


BackupsWorker::BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_)
BackupsWorker::BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_)
: thread_pools(std::make_unique<ThreadPools>(num_backup_threads, num_restore_threads))
, allow_concurrent_backups(allow_concurrent_backups_)
, allow_concurrent_restores(allow_concurrent_restores_)
, test_inject_sleep(test_inject_sleep_)
, log(&Poco::Logger::get("BackupsWorker"))
{
backup_log = global_context->getBackupLog();
Expand Down Expand Up @@ -700,9 +701,10 @@ void BackupsWorker::writeBackupEntries(
if (process_list_element)
process_list_element->checkTimeLimit();

sleepForSeconds(5);

backup->writeFile(file_info, std::move(entry));

maybeSleepForTesting();

// Update metadata
if (!internal)
{
Expand All @@ -715,7 +717,6 @@ void BackupsWorker::writeBackupEntries(
backup->getCompressedSize(),
0, 0);
}

}
catch (...)
{
Expand Down Expand Up @@ -1022,6 +1023,9 @@ void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr
process_list_element->checkTimeLimit();

std::move(task)();

maybeSleepForTesting();

setNumFilesAndSize(
restore_id,
backup->getNumFiles(),
Expand Down Expand Up @@ -1142,6 +1146,13 @@ void BackupsWorker::setNumFilesAndSize(const OperationID & id, size_t num_files,
}


void BackupsWorker::maybeSleepForTesting()
{
if (test_inject_sleep)
sleepForSeconds(1);
}


void BackupsWorker::wait(const OperationID & id, bool rethrow_exception)
{
std::unique_lock lock{infos_mutex};
Expand Down
7 changes: 6 additions & 1 deletion src/Backups/BackupsWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ using QueryStatusPtr = std::shared_ptr<QueryStatus>;
class BackupsWorker
{
public:
BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_);
BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_);
~BackupsWorker();

/// Waits until all tasks have been completed.
Expand Down Expand Up @@ -95,11 +95,16 @@ class BackupsWorker
enum class ThreadPoolId;
ThreadPool & getThreadPool(ThreadPoolId thread_pool_id);

/// Waits for some time if `test_inject_sleep` is true.
void maybeSleepForTesting();

class ThreadPools;
std::unique_ptr<ThreadPools> thread_pools;

const bool allow_concurrent_backups;
const bool allow_concurrent_restores;
const bool test_inject_sleep;

Poco::Logger * log;

std::unordered_map<BackupOperationID, BackupOperationInfo> infos;
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2535,12 +2535,13 @@ BackupsWorker & Context::getBackupsWorker() const
const auto & config = getConfigRef();
const bool allow_concurrent_backups = config.getBool("backups.allow_concurrent_backups", true);
const bool allow_concurrent_restores = config.getBool("backups.allow_concurrent_restores", true);
const bool test_inject_sleep = config.getBool("backups.test_inject_sleep", false);

const auto & settings_ref = getSettingsRef();
UInt64 backup_threads = config.getUInt64("backup_threads", settings_ref.backup_threads);
UInt64 restore_threads = config.getUInt64("restore_threads", settings_ref.restore_threads);

shared->backups_worker.emplace(getGlobalContext(), backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores);
shared->backups_worker.emplace(getGlobalContext(), backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores, test_inject_sleep);
});

return *shared->backups_worker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<backups>
<test_inject_sleep>true</test_inject_sleep>
</backups>
<backup_threads>2</backup_threads>
<restore_threads>2</restore_threads>
</clickhouse>
199 changes: 199 additions & 0 deletions tests/integration/test_backup_restore_new/test_cancel_backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, assert_eq_with_retry
import uuid


cluster = ClickHouseCluster(__file__)

main_configs = [
"configs/backups_disk.xml",
"configs/slow_backups.xml",
]

node = cluster.add_instance(
"node",
main_configs=main_configs,
external_dirs=["/backups/"],
)


@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()


@pytest.fixture(autouse=True)
def drop_after_test():
try:
yield
finally:
node.query("DROP TABLE IF EXISTS tbl SYNC")


# Generate the backup name.
def get_backup_name(backup_id):
return f"Disk('backups', '{backup_id}')"


# Start making a backup asynchronously.
def start_backup(backup_id):
node.query(
f"BACKUP TABLE tbl TO {get_backup_name(backup_id)} SETTINGS id='{backup_id}' ASYNC"
)

assert (
node.query(f"SELECT status FROM system.backups WHERE id='{backup_id}'")
== "CREATING_BACKUP\n"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Backup' AND query LIKE '%{backup_id}%'"
)
== "1\n"
)


# Wait for the backup to be completed.
def wait_backup(backup_id):
assert_eq_with_retry(
node,
f"SELECT status FROM system.backups WHERE id='{backup_id}'",
"BACKUP_CREATED",
retry_count=60,
sleep_time=5,
)

backup_duration = int(
node.query(
f"SELECT end_time - start_time FROM system.backups WHERE id='{backup_id}'"
)
)
assert backup_duration >= 3 # Backup is not expected to be too quick in this test.


# Cancel the specified backup.
def cancel_backup(backup_id):
node.query(
f"KILL QUERY WHERE query_kind='Backup' AND query LIKE '%{backup_id}%' SYNC"
)
assert (
node.query(f"SELECT status FROM system.backups WHERE id='{backup_id}'")
== "BACKUP_FAILED\n"
)
expected_error = "QUERY_WAS_CANCELLED"
assert expected_error in node.query(
f"SELECT error FROM system.backups WHERE id='{backup_id}'"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Backup' AND query LIKE '%{backup_id}%'"
)
== "0\n"
)
node.query("SYSTEM FLUSH LOGS")
kill_duration_ms = int(
node.query(
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{backup_id}%' AND type='QueryFinish'"
)
)
assert kill_duration_ms < 2000 # Query must be cancelled quickly


# Start restoring from a backup.
def start_restore(restore_id, backup_id):
node.query(
f"RESTORE TABLE tbl FROM {get_backup_name(backup_id)} SETTINGS id='{restore_id}' ASYNC"
)

assert (
node.query(f"SELECT status FROM system.backups WHERE id='{restore_id}'")
== "RESTORING\n"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Restore' AND query LIKE '%{restore_id}%'"
)
== "1\n"
)


# Wait for the restore operation to be completed.
def wait_restore(restore_id):
assert_eq_with_retry(
node,
f"SELECT status FROM system.backups WHERE id='{restore_id}'",
"RESTORED",
retry_count=60,
sleep_time=5,
)

restore_duration = int(
node.query(
f"SELECT end_time - start_time FROM system.backups WHERE id='{restore_id}'"
)
)
assert (
restore_duration >= 3
) # Restore is not expected to be too quick in this test.


# Cancel the specified restore operation.
def cancel_restore(restore_id):
node.query(
f"KILL QUERY WHERE query_kind='Restore' AND query LIKE '%{restore_id}%' SYNC"
)
assert (
node.query(f"SELECT status FROM system.backups WHERE id='{restore_id}'")
== "RESTORE_FAILED\n"
)
expected_error = "QUERY_WAS_CANCELLED"
assert expected_error in node.query(
f"SELECT error FROM system.backups WHERE id='{restore_id}'"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Restore' AND query LIKE '%{restore_id}%'"
)
== "0\n"
)
node.query("SYSTEM FLUSH LOGS")
kill_duration_ms = int(
node.query(
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{restore_id}%' AND type='QueryFinish'"
)
)
assert kill_duration_ms < 2000 # Query must be cancelled quickly


def test_cancel_backup():
# We use partitioning so backups would contain more files.
node.query(
"CREATE TABLE tbl (x UInt64) ENGINE=MergeTree() ORDER BY tuple() PARTITION BY x%5"
)

node.query(f"INSERT INTO tbl SELECT number FROM numbers(500)")

try_backup_id_1 = uuid.uuid4().hex
start_backup(try_backup_id_1)
cancel_backup(try_backup_id_1)

backup_id = uuid.uuid4().hex
start_backup(backup_id)
wait_backup(backup_id)

node.query(f"DROP TABLE tbl SYNC")

try_restore_id_1 = uuid.uuid4().hex
start_restore(try_restore_id_1, backup_id)
cancel_restore(try_restore_id_1)

node.query(f"DROP TABLE tbl SYNC")

restore_id = uuid.uuid4().hex
start_restore(restore_id, backup_id)
wait_restore(restore_id)

0 comments on commit 5eafd54

Please sign in to comment.