From a882ef295f5fdad5a805aa9ed6761ba1129ff081 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 28 Jul 2023 16:00:01 +0000 Subject: [PATCH 1/7] Query CHECK TABLE takes care about progress and cancellation --- src/Common/FileChecker.cpp | 22 ++- src/Common/FileChecker.h | 1 + src/Interpreters/InterpreterCheckQuery.cpp | 168 +++++++++++++++----- src/Storages/CheckResults.h | 5 + src/Storages/IStorage.cpp | 8 + src/Storages/IStorage.h | 4 +- src/Storages/StorageLog.cpp | 4 +- src/Storages/StorageLog.h | 2 +- src/Storages/StorageMergeTree.cpp | 20 ++- src/Storages/StorageMergeTree.h | 2 +- src/Storages/StorageProxy.h | 3 +- src/Storages/StorageReplicatedMergeTree.cpp | 13 +- src/Storages/StorageReplicatedMergeTree.h | 2 +- src/Storages/StorageStripeLog.cpp | 5 +- src/Storages/StorageStripeLog.h | 2 +- 15 files changed, 194 insertions(+), 67 deletions(-) diff --git a/src/Common/FileChecker.cpp b/src/Common/FileChecker.cpp index 876bc4e641cd..dc3800eb29af 100644 --- a/src/Common/FileChecker.cpp +++ b/src/Common/FileChecker.cpp @@ -84,10 +84,20 @@ size_t FileChecker::getTotalSize() const CheckResults FileChecker::check() const { - if (map.empty()) - return {}; - CheckResults results; + auto callback = [&results](const CheckResult & result, size_t) -> bool + { + results.push_back(result); + return true; + }; + check(callback); + return results; +} + +void FileChecker::check(CheckDataCallback check_callback) const +{ + if (map.empty()) + return; for (const auto & name_size : map) { @@ -101,14 +111,12 @@ CheckResults FileChecker::check() const String failure_message = exists ? ("Size of " + path + " is wrong. Size is " + toString(real_size) + " but should be " + toString(name_size.second)) : ("File " + path + " doesn't exist"); - results.emplace_back(name, false, failure_message); + check_callback(CheckResult(name, false, failure_message), map.size()); break; } - results.emplace_back(name, true, ""); + check_callback(CheckResult(name, true, ""), map.size()); } - - return results; } void FileChecker::repair() diff --git a/src/Common/FileChecker.h b/src/Common/FileChecker.h index bb0383e4b563..28dc17a4610a 100644 --- a/src/Common/FileChecker.h +++ b/src/Common/FileChecker.h @@ -29,6 +29,7 @@ class FileChecker /// Check the files whose parameters are specified in sizes.json CheckResults check() const; + void check(CheckDataCallback check_callback) const; /// Truncate files that have excessive size to the expected size. /// Throw exception if the file size is less than expected. diff --git a/src/Interpreters/InterpreterCheckQuery.cpp b/src/Interpreters/InterpreterCheckQuery.cpp index 333aed848734..00d2749eaf93 100644 --- a/src/Interpreters/InterpreterCheckQuery.cpp +++ b/src/Interpreters/InterpreterCheckQuery.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -17,63 +18,158 @@ namespace DB namespace { -NamesAndTypes getBlockStructure() +Block getBlockFromCheckResult(const CheckResults & check_results, bool check_query_single_value_result) { - return { + if (check_query_single_value_result) + { + bool result = std::all_of(check_results.begin(), check_results.end(), [] (const CheckResult & res) { return res.success; }); + return Block{{ColumnUInt8::create(1, static_cast(result)), std::make_shared(), "result"}}; + } + + NamesAndTypes block_structure = NamesAndTypes{ {"part_path", std::make_shared()}, {"is_passed", std::make_shared()}, {"message", std::make_shared()}, }; -} - -} + auto path_column = block_structure[0].type->createColumn(); + auto is_passed_column = block_structure[1].type->createColumn(); + auto message_column = block_structure[2].type->createColumn(); + for (const auto & check_result : check_results) + { + path_column->insert(check_result.fs_path); + is_passed_column->insert(static_cast(check_result.success)); + message_column->insert(check_result.failure_message); + } -InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextPtr context_) : WithContext(context_), query_ptr(query_ptr_) -{ + return Block({ + {std::move(path_column), block_structure[0].type, block_structure[0].name}, + {std::move(is_passed_column), block_structure[1].type, block_structure[1].name}, + {std::move(message_column), block_structure[2].type, block_structure[2].name}, + }); } - -BlockIO InterpreterCheckQuery::execute() +class TableCheckResultSource : public ISource { - const auto & check = query_ptr->as(); - auto table_id = getContext()->resolveStorageID(check, Context::ResolveOrdinary); - - getContext()->checkAccess(AccessType::SHOW_TABLES, table_id); - StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext()); - auto check_results = table->checkData(query_ptr, getContext()); - Block block; - if (getContext()->getSettingsRef().check_query_single_value_result) +public: + explicit TableCheckResultSource(const ASTPtr & query_ptr_, StoragePtr table_, bool check_query_single_value_result_, ContextPtr context_) + : ISource(getBlockFromCheckResult({}, check_query_single_value_result_).cloneEmpty()) + , query_ptr(query_ptr_) + , table(table_) + , context(context_) + , check_query_single_value_result(check_query_single_value_result_) { - bool result = std::all_of(check_results.begin(), check_results.end(), [] (const CheckResult & res) { return res.success; }); - auto column = ColumnUInt8::create(); - column->insertValue(static_cast(result)); - block = Block{{std::move(column), std::make_shared(), "result"}}; + worker_result = std::async(std::launch::async, [this]{ worker(); }); } - else + + String getName() const override { return "TableCheckResultSource"; } + +protected: + + std::optional tryGenerate() override { - auto block_structure = getBlockStructure(); - auto path_column = block_structure[0].type->createColumn(); - auto is_passed_column = block_structure[1].type->createColumn(); - auto message_column = block_structure[2].type->createColumn(); - for (const auto & check_result : check_results) + if (is_check_completed) + return {}; + + auto status = worker_result.wait_for(std::chrono::milliseconds(100)); + is_check_completed = (status == std::future_status::ready); + + if (is_check_completed) { - path_column->insert(check_result.fs_path); - is_passed_column->insert(static_cast(check_result.success)); - message_column->insert(check_result.failure_message); + worker_result.get(); + auto result_block = getBlockFromCheckResult(check_results, check_query_single_value_result); + check_results.clear(); + return Chunk(result_block.getColumns(), result_block.rows()); } - block = Block({ - {std::move(path_column), block_structure[0].type, block_structure[0].name}, - {std::move(is_passed_column), block_structure[1].type, block_structure[1].name}, - {std::move(message_column), block_structure[2].type, block_structure[2].name}}); + std::lock_guard lock(mutex); + progress(progress_rows, 0); + progress_rows = 0; + + if (check_query_single_value_result || check_results.empty()) + { + return Chunk(); + } + + auto result_block = getBlockFromCheckResult(check_results, check_query_single_value_result); + check_results.clear(); + return Chunk(result_block.getColumns(), result_block.rows()); } - BlockIO res; - res.pipeline = QueryPipeline(std::make_shared(std::move(block))); +private: + void worker() + { + table->checkData(query_ptr, context, + [this](const CheckResult & check_result, size_t new_total_rows) + { + if (isCancelled()) + return false; + + std::lock_guard lock(mutex); + if (new_total_rows > total_rows) + { + addTotalRowsApprox(new_total_rows - total_rows); + total_rows = new_total_rows; + } + progress_rows++; + + if (!check_result.success) + { + LOG_WARNING(&Poco::Logger::get("InterpreterCheckQuery"), + "Check query for table {} failed, path {}, reason: {}", + table->getStorageID().getNameForLogs(), + check_result.fs_path, + check_result.failure_message); + } + + check_results.push_back(check_result); + + bool should_continue = check_result.success || !check_query_single_value_result; + return should_continue; + }); + } + + ASTPtr query_ptr; + StoragePtr table; + ContextPtr context; + bool check_query_single_value_result; + + std::future worker_result; + + std::mutex mutex; + CheckResults check_results; + size_t progress_rows = 0; + size_t total_rows = 0; + + bool is_check_completed = false; +}; + +} + +InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextPtr context_) + : WithContext(context_) + , query_ptr(query_ptr_) +{ +} + +BlockIO InterpreterCheckQuery::execute() +{ + const auto & check = query_ptr->as(); + const auto & context = getContext(); + auto table_id = context->resolveStorageID(check, Context::ResolveOrdinary); + + context->checkAccess(AccessType::SHOW_TABLES, table_id); + StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context); + + BlockIO res; + { + bool check_query_single_value_result = context->getSettingsRef().check_query_single_value_result; + auto result_source = std::make_shared(query_ptr, table, check_query_single_value_result, context); + res.pipeline = QueryPipeline(result_source); + } return res; } diff --git a/src/Storages/CheckResults.h b/src/Storages/CheckResults.h index b342b014fa47..b84262b50c98 100644 --- a/src/Storages/CheckResults.h +++ b/src/Storages/CheckResults.h @@ -22,6 +22,11 @@ struct CheckResult {} }; +/// Process single result of checkData +/// Second argument is an estimated number of check results +/// Return true to continue checking, false to stop +using CheckDataCallback = std::function; + using CheckResults = std::vector; } diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index ae7659e074f5..467a34ca8553 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -273,6 +273,14 @@ bool IStorage::isStaticStorage() const return false; } +CheckResults IStorage::checkData(const ASTPtr & query, ContextPtr context) +{ + CheckResults results; + auto callback = [&](const CheckResult & result, size_t) { results.push_back(result); return true;}; + checkData(query, context, callback); + return results; +} + void IStorage::adjustCreateQueryForBackup(ASTPtr &) const { } diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index ec92f57aeda5..48ebf6b1242a 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -596,7 +596,9 @@ class IStorage : public std::enable_shared_from_this, public TypePromo virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, ContextPtr /* query_context */, const StorageMetadataPtr & /* metadata_snapshot */) const { return false; } /// Checks validity of the data - virtual CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName()); } + virtual CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */); + + virtual void checkData(const ASTPtr & /* query */, ContextPtr /* context */, CheckDataCallback /* callback */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName()); } /// Checks that table could be dropped right now /// Otherwise - throws an exception with detailed information. diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index 87aa71f3e8d5..bb58e05d756c 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -866,13 +866,13 @@ SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetada return std::make_shared(*this, metadata_snapshot, std::move(lock)); } -CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr local_context) +void StorageLog::checkData(const ASTPtr & /* query */, ContextPtr local_context, CheckDataCallback check_callback) { ReadLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded"); - return file_checker.check(); + file_checker.check(check_callback); } diff --git a/src/Storages/StorageLog.h b/src/Storages/StorageLog.h index f1d05ed39ac2..f2539a6c48cb 100644 --- a/src/Storages/StorageLog.h +++ b/src/Storages/StorageLog.h @@ -59,7 +59,7 @@ class StorageLog final : public IStorage, public WithMutableContext void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override; - CheckResults checkData(const ASTPtr & query, ContextPtr local_context) override; + void checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback) override; void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index ad9013d9f131..4754412a0dec 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2197,9 +2197,8 @@ void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type) background_moves_assignee.trigger(); } -CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context) +void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback) { - CheckResults results; DataPartsVector data_parts; if (const auto & check_query = query->as(); check_query.partition) { @@ -2224,12 +2223,16 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_ part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings()); part->checkMetadata(); - results.emplace_back(part->name, true, "Checksums recounted and written to disk."); + bool should_continue = check_callback(CheckResult(part->name, true, "Checksums recounted and written to disk."), data_parts.size()); + if (!should_continue) + break; } catch (const Exception & ex) { tryLogCurrentException(log, __PRETTY_FUNCTION__); - results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'"); + bool should_continue = check_callback(CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"), data_parts.size()); + if (!should_continue) + break; } } else @@ -2238,15 +2241,18 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_ { checkDataPart(part, true); part->checkMetadata(); - results.emplace_back(part->name, true, ""); + bool should_continue = check_callback(CheckResult(part->name, true, ""), data_parts.size()); + if (!should_continue) + break; } catch (const Exception & ex) { - results.emplace_back(part->name, false, ex.message()); + bool should_continue = check_callback(CheckResult(part->name, false, ex.message()), data_parts.size()); + if (!should_continue) + break; } } } - return results; } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index c77e5140d75e..6b1b1b7b4154 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -108,7 +108,7 @@ class StorageMergeTree final : public MergeTreeData void onActionLockRemove(StorageActionBlockType action_type) override; - CheckResults checkData(const ASTPtr & query, ContextPtr context) override; + void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override; bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override; diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index 21ed4b91c621..5339656782ec 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -149,7 +149,8 @@ class StorageProxy : public IStorage return getNested()->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot); } - CheckResults checkData(const ASTPtr & query, ContextPtr context) override { return getNested()->checkData(query, context); } + void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override { getNested()->checkData(query, context, check_callback); } + void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); } bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); } Strings getDataPaths() const override { return getNested()->getDataPaths(); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7fce373e26bc..71fbd009d705 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8481,9 +8481,8 @@ void StorageReplicatedMergeTree::enqueuePartForCheck(const String & part_name, t part_check_thread.enqueuePart(part_name, delay_to_check_seconds); } -CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, ContextPtr local_context) +void StorageReplicatedMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback) { - CheckResults results; DataPartsVector data_parts; if (const auto & check_query = query->as(); check_query.partition) { @@ -8500,17 +8499,19 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, Context { try { - results.push_back(part_check_thread.checkPartAndFix(part->name)); + bool should_continue = check_callback(part_check_thread.checkPartAndFix(part->name), data_parts.size()); + if (!should_continue) + break; } catch (const Exception & ex) { tryLogCurrentException(log, __PRETTY_FUNCTION__); - results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'"); + bool should_continue = check_callback(CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"), data_parts.size()); + if (!should_continue) + break; } } } - - return results; } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 78ef39f032fc..6f9853122cbb 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -230,7 +230,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData /// Add a part to the queue of parts whose data you want to check in the background thread. void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0); - CheckResults checkData(const ASTPtr & query, ContextPtr context) override; + void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override; /// Checks ability to use granularity bool canUseAdaptiveGranularity() const override; diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index 0bfef5ed5e58..5a3a007035c0 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -403,14 +403,13 @@ SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const Storage return std::make_shared(*this, metadata_snapshot, std::move(lock)); } - -CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr local_context) +void StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr local_context, CheckDataCallback check_callback) { ReadLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded"); - return file_checker.check(); + file_checker.check(check_callback); } diff --git a/src/Storages/StorageStripeLog.h b/src/Storages/StorageStripeLog.h index f889a1de71bc..61ce49f43da8 100644 --- a/src/Storages/StorageStripeLog.h +++ b/src/Storages/StorageStripeLog.h @@ -53,7 +53,7 @@ friend class StripeLogSink; void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override; - CheckResults checkData(const ASTPtr & query, ContextPtr ocal_context) override; + void checkData(const ASTPtr & query, ContextPtr ocal_context, CheckDataCallback check_callback) override; bool storesDataOnDisk() const override { return true; } Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; } From 1aedc4e89292b39d29367fc41b3485c893c08be3 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 1 Aug 2023 10:57:59 +0000 Subject: [PATCH 2/7] Build proper pipeline for CHECK TABLE --- src/Common/FileChecker.cpp | 48 ++--- src/Common/FileChecker.h | 38 +++- src/Interpreters/InterpreterCheckQuery.cpp | 219 +++++++++++--------- src/Storages/CheckResults.h | 7 - src/Storages/IStorage.cpp | 13 +- src/Storages/IStorage.h | 41 +++- src/Storages/StorageLog.cpp | 9 +- src/Storages/StorageLog.h | 16 +- src/Storages/StorageMergeTree.cpp | 32 +-- src/Storages/StorageMergeTree.h | 29 ++- src/Storages/StorageProxy.h | 3 +- src/Storages/StorageReplicatedMergeTree.cpp | 38 ++-- src/Storages/StorageReplicatedMergeTree.h | 31 ++- src/Storages/StorageStripeLog.cpp | 9 +- src/Storages/StorageStripeLog.h | 17 +- 15 files changed, 371 insertions(+), 179 deletions(-) diff --git a/src/Common/FileChecker.cpp b/src/Common/FileChecker.cpp index dc3800eb29af..122ea83835da 100644 --- a/src/Common/FileChecker.cpp +++ b/src/Common/FileChecker.cpp @@ -82,41 +82,35 @@ size_t FileChecker::getTotalSize() const } -CheckResults FileChecker::check() const +FileChecker::DataValidationTasksPtr FileChecker::getDataValidationTasks() { - CheckResults results; - auto callback = [&results](const CheckResult & result, size_t) -> bool - { - results.push_back(result); - return true; - }; - check(callback); - return results; + return std::make_unique(map); } -void FileChecker::check(CheckDataCallback check_callback) const +CheckResult FileChecker::checkNextEntry(DataValidationTasksPtr & check_data_tasks, bool & has_nothing_to_do) const { - if (map.empty()) - return; - - for (const auto & name_size : map) + String name; + size_t expected_size; + bool is_finished = check_data_tasks->next(name, expected_size); + if (is_finished) { - const String & name = name_size.first; - String path = parentPath(files_info_path) + name; - bool exists = fileReallyExists(path); - auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files. + has_nothing_to_do = true; + return {}; + } - if (real_size != name_size.second) - { - String failure_message = exists - ? ("Size of " + path + " is wrong. Size is " + toString(real_size) + " but should be " + toString(name_size.second)) - : ("File " + path + " doesn't exist"); - check_callback(CheckResult(name, false, failure_message), map.size()); - break; - } + String path = parentPath(files_info_path) + name; + bool exists = fileReallyExists(path); + auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files. - check_callback(CheckResult(name, true, ""), map.size()); + if (real_size != expected_size) + { + String failure_message = exists + ? ("Size of " + path + " is wrong. Size is " + toString(real_size) + " but should be " + toString(expected_size)) + : ("File " + path + " doesn't exist"); + return CheckResult(name, false, failure_message); } + + return CheckResult(name, true, ""); } void FileChecker::repair() diff --git a/src/Common/FileChecker.h b/src/Common/FileChecker.h index 28dc17a4610a..8ffc310b84dc 100644 --- a/src/Common/FileChecker.h +++ b/src/Common/FileChecker.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace Poco { class Logger; } @@ -28,8 +29,11 @@ class FileChecker bool empty() const { return map.empty(); } /// Check the files whose parameters are specified in sizes.json - CheckResults check() const; - void check(CheckDataCallback check_callback) const; + /// See comment in IStorage::checkDataNext + struct DataValidationTasks; + using DataValidationTasksPtr = std::unique_ptr; + DataValidationTasksPtr getDataValidationTasks(); + CheckResult checkNextEntry(DataValidationTasksPtr & check_data_tasks, bool & has_nothing_to_do) const; /// Truncate files that have excessive size to the expected size. /// Throw exception if the file size is less than expected. @@ -42,6 +46,36 @@ class FileChecker /// Returns total size of all files. size_t getTotalSize() const; + struct DataValidationTasks + { + DataValidationTasks(const std::map & map_) + : map(map_), it(map.begin()) + {} + + bool next(String & out_name, size_t & out_size) + { + std::lock_guard lock(mutex); + if (it == map.end()) + return true; + out_name = it->first; + out_size = it->second; + ++it; + return false; + } + + size_t size() const + { + std::lock_guard lock(mutex); + return std::distance(it, map.end()); + } + + const std::map & map; + + mutable std::mutex mutex; + using Iterator = std::map::const_iterator; + Iterator it; + }; + private: void load(); diff --git a/src/Interpreters/InterpreterCheckQuery.cpp b/src/Interpreters/InterpreterCheckQuery.cpp index 00d2749eaf93..113a5d9fc90b 100644 --- a/src/Interpreters/InterpreterCheckQuery.cpp +++ b/src/Interpreters/InterpreterCheckQuery.cpp @@ -11,139 +11,133 @@ #include #include +#include + +#include +#include + namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + namespace { -Block getBlockFromCheckResult(const CheckResults & check_results, bool check_query_single_value_result) +Block getSingleValueBlock(UInt8 value) { - if (check_query_single_value_result) - { - bool result = std::all_of(check_results.begin(), check_results.end(), [] (const CheckResult & res) { return res.success; }); - return Block{{ColumnUInt8::create(1, static_cast(result)), std::make_shared(), "result"}}; - } + return Block{{ColumnUInt8::create(1, value), std::make_shared(), "result"}}; +} - NamesAndTypes block_structure = NamesAndTypes{ +Block getHeaderForCheckResult() +{ + auto names_and_types = NamesAndTypes{ {"part_path", std::make_shared()}, {"is_passed", std::make_shared()}, {"message", std::make_shared()}, }; - auto path_column = block_structure[0].type->createColumn(); - auto is_passed_column = block_structure[1].type->createColumn(); - auto message_column = block_structure[2].type->createColumn(); - - for (const auto & check_result : check_results) - { - path_column->insert(check_result.fs_path); - is_passed_column->insert(static_cast(check_result.success)); - message_column->insert(check_result.failure_message); - } return Block({ - {std::move(path_column), block_structure[0].type, block_structure[0].name}, - {std::move(is_passed_column), block_structure[1].type, block_structure[1].name}, - {std::move(message_column), block_structure[2].type, block_structure[2].name}, + {names_and_types[0].type->createColumn(), names_and_types[0].type, names_and_types[0].name}, + {names_and_types[1].type->createColumn(), names_and_types[1].type, names_and_types[1].name}, + {names_and_types[2].type->createColumn(), names_and_types[2].type, names_and_types[2].name}, }); } -class TableCheckResultSource : public ISource +Chunk getChunkFromCheckResult(const CheckResult & check_result) +{ + MutableColumns columns = getHeaderForCheckResult().cloneEmptyColumns(); + columns[0]->insert(check_result.fs_path); + columns[1]->insert(static_cast(check_result.success)); + columns[2]->insert(check_result.failure_message); + return Chunk(std::move(columns), 1); +} + +class TableCheckWorkerProcessor : public ISource { public: - explicit TableCheckResultSource(const ASTPtr & query_ptr_, StoragePtr table_, bool check_query_single_value_result_, ContextPtr context_) - : ISource(getBlockFromCheckResult({}, check_query_single_value_result_).cloneEmpty()) - , query_ptr(query_ptr_) + TableCheckWorkerProcessor(IStorage::DataValidationTasksPtr check_data_tasks_, StoragePtr table_) + : ISource(getHeaderForCheckResult()) , table(table_) - , context(context_) - , check_query_single_value_result(check_query_single_value_result_) + , check_data_tasks(check_data_tasks_) { - worker_result = std::async(std::launch::async, [this]{ worker(); }); } - String getName() const override { return "TableCheckResultSource"; } + String getName() const override { return "TableCheckWorkerProcessor"; } protected: std::optional tryGenerate() override { - - if (is_check_completed) + bool has_nothing_to_do = false; + auto check_result = table->checkDataNext(check_data_tasks, has_nothing_to_do); + if (has_nothing_to_do) return {}; - auto status = worker_result.wait_for(std::chrono::milliseconds(100)); - is_check_completed = (status == std::future_status::ready); + /// We can omit manual `progess` call, ISource will may count it automatically by returned chunk + /// However, we want to report only rows in progress + progress(1, 0); - if (is_check_completed) + if (!check_result.success) { - worker_result.get(); - auto result_block = getBlockFromCheckResult(check_results, check_query_single_value_result); - check_results.clear(); - return Chunk(result_block.getColumns(), result_block.rows()); + LOG_WARNING(&Poco::Logger::get("InterpreterCheckQuery"), + "Check query for table {} failed, path {}, reason: {}", + table->getStorageID().getNameForLogs(), + check_result.fs_path, + check_result.failure_message); } - std::lock_guard lock(mutex); - progress(progress_rows, 0); - progress_rows = 0; - - if (check_query_single_value_result || check_results.empty()) - { - return Chunk(); - } - - auto result_block = getBlockFromCheckResult(check_results, check_query_single_value_result); - check_results.clear(); - return Chunk(result_block.getColumns(), result_block.rows()); + return getChunkFromCheckResult(check_result); } private: - void worker() + StoragePtr table; + IStorage::DataValidationTasksPtr check_data_tasks; +}; + +class TableCheckResultEmitter : public IAccumulatingTransform +{ +public: + TableCheckResultEmitter() : IAccumulatingTransform(getHeaderForCheckResult(), getSingleValueBlock(1).cloneEmpty()) {} + + String getName() const override { return "TableCheckResultEmitter"; } + + void consume(Chunk chunk) override { - table->checkData(query_ptr, context, - [this](const CheckResult & check_result, size_t new_total_rows) - { - if (isCancelled()) - return false; - - std::lock_guard lock(mutex); - if (new_total_rows > total_rows) - { - addTotalRowsApprox(new_total_rows - total_rows); - total_rows = new_total_rows; - } - progress_rows++; - - if (!check_result.success) - { - LOG_WARNING(&Poco::Logger::get("InterpreterCheckQuery"), - "Check query for table {} failed, path {}, reason: {}", - table->getStorageID().getNameForLogs(), - check_result.fs_path, - check_result.failure_message); - } - - check_results.push_back(check_result); - - bool should_continue = check_result.success || !check_query_single_value_result; - return should_continue; - }); - } + if (result_value == 0) + return; - ASTPtr query_ptr; - StoragePtr table; - ContextPtr context; - bool check_query_single_value_result; + auto columns = chunk.getColumns(); + if (columns.size() != 3) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong number of columns: {}", columns.size()); - std::future worker_result; + const auto * col = checkAndGetColumn(columns[1].get()); + for (size_t i = 0; i < col->size(); ++i) + { + if (col->getElement(i) == 0) + { + result_value = 0; + return; + } + } + } - std::mutex mutex; - CheckResults check_results; - size_t progress_rows = 0; - size_t total_rows = 0; + Chunk generate() override + { + if (is_valuer_emitted.exchange(true)) + return {}; + auto block = getSingleValueBlock(result_value); + return Chunk(block.getColumns(), block.rows()); + } - bool is_check_completed = false; +private: + std::atomic result_value{1}; + std::atomic_bool is_valuer_emitted{false}; }; } @@ -154,7 +148,6 @@ InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextP { } - BlockIO InterpreterCheckQuery::execute() { const auto & check = query_ptr->as(); @@ -164,11 +157,51 @@ BlockIO InterpreterCheckQuery::execute() context->checkAccess(AccessType::SHOW_TABLES, table_id); StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context); + auto check_data_tasks = table->getCheckTaskList(query_ptr, context); + + const auto & settings = context->getSettingsRef(); + BlockIO res; { - bool check_query_single_value_result = context->getSettingsRef().check_query_single_value_result; - auto result_source = std::make_shared(query_ptr, table, check_query_single_value_result, context); - res.pipeline = QueryPipeline(result_source); + auto processors = std::make_shared(); + + std::vector worker_ports; + + size_t num_streams = std::max(1, settings.max_threads); + + for (size_t i = 0; i < num_streams; ++i) + { + auto worker_processor = std::make_shared(check_data_tasks, table); + if (i == 0) + worker_processor->addTotalRowsApprox(check_data_tasks->size()); + worker_ports.emplace_back(&worker_processor->getPort()); + processors->emplace_back(worker_processor); + } + + OutputPort * resize_outport; + { + auto resize_processor = std::make_shared(getHeaderForCheckResult(), worker_ports.size(), 1); + + auto & resize_inputs = resize_processor->getInputs(); + auto resize_inport_it = resize_inputs.begin(); + for (size_t i = 0; i < worker_ports.size(); ++i, ++resize_inport_it) + connect(*worker_ports[i], *resize_inport_it); + + resize_outport = &resize_processor->getOutputs().front(); + processors->emplace_back(resize_processor); + } + + if (settings.check_query_single_value_result) + { + auto emitter_processor = std::make_shared(); + auto input_port = &emitter_processor->getInputPort(); + processors->emplace_back(emitter_processor); + + connect(*resize_outport, *input_port); + } + + res.pipeline = QueryPipeline(Pipe(std::move(processors))); + res.pipeline.setNumThreads(num_streams); } return res; } diff --git a/src/Storages/CheckResults.h b/src/Storages/CheckResults.h index b84262b50c98..2e4652fea293 100644 --- a/src/Storages/CheckResults.h +++ b/src/Storages/CheckResults.h @@ -22,11 +22,4 @@ struct CheckResult {} }; -/// Process single result of checkData -/// Second argument is an estimated number of check results -/// Return true to continue checking, false to stop -using CheckDataCallback = std::function; - -using CheckResults = std::vector; - } diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 467a34ca8553..85299f63165a 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -273,12 +273,15 @@ bool IStorage::isStaticStorage() const return false; } -CheckResults IStorage::checkData(const ASTPtr & query, ContextPtr context) +IStorage::DataValidationTasksPtr IStorage::getCheckTaskList(const ASTPtr & /* query */, ContextPtr /* context */) { - CheckResults results; - auto callback = [&](const CheckResult & result, size_t) { results.push_back(result); return true;}; - checkData(query, context, callback); - return results; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName()); +} + +CheckResult IStorage::checkDataNext(DataValidationTasksPtr & /* check_task_list */, bool & has_nothing_to_do) +{ + has_nothing_to_do = true; + return {}; } void IStorage::adjustCreateQueryForBackup(ASTPtr &) const diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 48ebf6b1242a..b1e20c557821 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -595,10 +595,45 @@ class IStorage : public std::enable_shared_from_this, public TypePromo /// Provides a hint that the storage engine may evaluate the IN-condition by using an index. virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, ContextPtr /* query_context */, const StorageMetadataPtr & /* metadata_snapshot */) const { return false; } - /// Checks validity of the data - virtual CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */); - virtual void checkData(const ASTPtr & /* query */, ContextPtr /* context */, CheckDataCallback /* callback */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName()); } + /** A list of tasks to check a validity of data. + * Each IStorage implementation may interpret this task in its own way. + * E.g. for some storages it to check data it need to check a list of files in filesystem, for others it can be a list of parts. + * Also it may hold resources (e.g. locks) required during check. + */ + struct DataValidationTasksBase + { + /// Number of entries left to check. + /// It decreases after each call to checkDataNext(). + virtual size_t size() const = 0; + virtual ~DataValidationTasksBase() = default; + }; + + using DataValidationTasksPtr = std::shared_ptr; + + virtual DataValidationTasksPtr getCheckTaskList(const ASTPtr & /* query */, ContextPtr /* context */); + + /** Executes one task from the list. + * If no tasks left, sets has_nothing_to_do to true. + * Note: Function `checkDataNext` is accessing `check_task_list` thread-safely, + * and can be called simultaneously for the same `getCheckTaskList` result + * to process different tasks in parallel. + * Usage: + * + * auto check_task_list = storage.getCheckTaskList(query, context); + * size_t total_tasks = check_task_list->size(); + * while (true) + * { + * size_t tasks_left = check_task_list->size(); + * std::cout << "Checking data: " << (total_tasks - tasks_left) << " / " << total_tasks << " tasks done." << std::endl; + * bool has_nothing_to_do = false; + * auto result = storage.checkDataNext(check_task_list, has_nothing_to_do); + * if (has_nothing_to_do) + * break; + * doSomething(result); + * } + */ + virtual CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do); /// Checks that table could be dropped right now /// Otherwise - throws an exception with detailed information. diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index bb58e05d756c..6fb786927258 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -866,15 +866,18 @@ SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetada return std::make_shared(*this, metadata_snapshot, std::move(lock)); } -void StorageLog::checkData(const ASTPtr & /* query */, ContextPtr local_context, CheckDataCallback check_callback) +IStorage::DataValidationTasksPtr StorageLog::getCheckTaskList(const ASTPtr & /* query */, ContextPtr local_context) { ReadLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded"); - - file_checker.check(check_callback); + return std::make_unique(file_checker.getDataValidationTasks(), std::move(lock)); } +CheckResult StorageLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) +{ + return file_checker.checkNextEntry(static_cast(check_task_list.get())->file_checker_tasks, has_nothing_to_do); +} IStorage::ColumnSizeByName StorageLog::getColumnSizes() const { diff --git a/src/Storages/StorageLog.h b/src/Storages/StorageLog.h index f2539a6c48cb..95f95088aa2e 100644 --- a/src/Storages/StorageLog.h +++ b/src/Storages/StorageLog.h @@ -59,7 +59,8 @@ class StorageLog final : public IStorage, public WithMutableContext void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override; - void checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback) override; + DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override; + CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override; void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override; @@ -142,6 +143,19 @@ class StorageLog final : public IStorage, public WithMutableContext std::atomic total_rows = 0; std::atomic total_bytes = 0; + struct DataValidationTasks : public IStorage::DataValidationTasksBase + { + DataValidationTasks(FileChecker::DataValidationTasksPtr file_checker_tasks_, ReadLock && lock_) + : file_checker_tasks(std::move(file_checker_tasks_)), lock(std::move(lock_)) + {} + + size_t size() const override { return file_checker_tasks->size(); } + + FileChecker::DataValidationTasksPtr file_checker_tasks; + /// Lock to prevent table modification while checking + ReadLock lock; + }; + FileChecker file_checker; const size_t max_compress_block_size; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 4754412a0dec..4593ed6c84a3 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2197,7 +2197,7 @@ void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type) background_moves_assignee.trigger(); } -void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback) +IStorage::DataValidationTasksPtr StorageMergeTree::getCheckTaskList(const ASTPtr & query, ContextPtr local_context) { DataPartsVector data_parts; if (const auto & check_query = query->as(); check_query.partition) @@ -2208,7 +2208,14 @@ void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, else data_parts = getVisibleDataPartsVector(local_context); - for (auto & part : data_parts) + return std::make_unique(std::move(data_parts), local_context); +} + +CheckResult StorageMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) +{ + auto local_context = static_cast(check_task_list.get())->context; + + if (auto part = static_cast(check_task_list.get())->next()) { /// If the checksums file is not present, calculate the checksums and write them to disk. static constexpr auto checksums_path = "checksums.txt"; @@ -2223,16 +2230,12 @@ void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings()); part->checkMetadata(); - bool should_continue = check_callback(CheckResult(part->name, true, "Checksums recounted and written to disk."), data_parts.size()); - if (!should_continue) - break; + return CheckResult(part->name, true, "Checksums recounted and written to disk."); } catch (const Exception & ex) { tryLogCurrentException(log, __PRETTY_FUNCTION__); - bool should_continue = check_callback(CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"), data_parts.size()); - if (!should_continue) - break; + return CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"); } } else @@ -2241,18 +2244,19 @@ void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, { checkDataPart(part, true); part->checkMetadata(); - bool should_continue = check_callback(CheckResult(part->name, true, ""), data_parts.size()); - if (!should_continue) - break; + return CheckResult(part->name, true, ""); } catch (const Exception & ex) { - bool should_continue = check_callback(CheckResult(part->name, false, ex.message()), data_parts.size()); - if (!should_continue) - break; + return CheckResult(part->name, false, ex.message()); } } } + else + { + has_nothing_to_do = true; + return {}; + } } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 6b1b1b7b4154..ec4796e49410 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -108,7 +108,8 @@ class StorageMergeTree final : public MergeTreeData void onActionLockRemove(StorageActionBlockType action_type) override; - void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override; + DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override; + CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override; bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override; @@ -278,6 +279,32 @@ class StorageMergeTree final : public MergeTreeData friend class MergePlainMergeTreeTask; friend class MutatePlainMergeTreeTask; + struct DataValidationTasks : public IStorage::DataValidationTasksBase + { + DataValidationTasks(DataPartsVector && parts_, ContextPtr context_) + : parts(std::move(parts_)), it(parts.begin()), context(std::move(context_)) + {} + + DataPartPtr next() + { + std::lock_guard lock(mutex); + if (it == parts.end()) + return nullptr; + return *(it++); + } + + size_t size() const override + { + std::lock_guard lock(mutex); + return std::distance(it, parts.end()); + } + + mutable std::mutex mutex; + DataPartsVector parts; + DataPartsVector::const_iterator it; + + ContextPtr context; + }; protected: std::map getAlterMutationCommandsForPart(const DataPartPtr & part) const override; diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index 5339656782ec..ea908bea0328 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -149,7 +149,8 @@ class StorageProxy : public IStorage return getNested()->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot); } - void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override { getNested()->checkData(query, context, check_callback); } + DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override { return getNested()->getCheckTaskList(query, context); } + CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override { return getNested()->checkDataNext(check_task_list, has_nothing_to_do); } void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); } bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 71fbd009d705..b04d824e323d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8481,7 +8481,7 @@ void StorageReplicatedMergeTree::enqueuePartForCheck(const String & part_name, t part_check_thread.enqueuePart(part_name, delay_to_check_seconds); } -void StorageReplicatedMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback) +IStorage::DataValidationTasksPtr StorageReplicatedMergeTree::getCheckTaskList(const ASTPtr & query, ContextPtr local_context) { DataPartsVector data_parts; if (const auto & check_query = query->as(); check_query.partition) @@ -8492,26 +8492,30 @@ void StorageReplicatedMergeTree::checkData(const ASTPtr & query, ContextPtr loca else data_parts = getVisibleDataPartsVector(local_context); - { - auto part_check_lock = part_check_thread.pausePartsCheck(); + auto part_check_lock = part_check_thread.pausePartsCheck(); + return std::make_unique(std::move(data_parts), std::move(part_check_lock)); +} + +CheckResult StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) +{ - for (auto & part : data_parts) + if (auto part = static_cast(check_task_list.get())->next()) + { + try { - try - { - bool should_continue = check_callback(part_check_thread.checkPartAndFix(part->name), data_parts.size()); - if (!should_continue) - break; - } - catch (const Exception & ex) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - bool should_continue = check_callback(CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"), data_parts.size()); - if (!should_continue) - break; - } + return CheckResult(part_check_thread.checkPartAndFix(part->name)); + } + catch (const Exception & ex) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + return CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"); } } + else + { + has_nothing_to_do = true; + return {}; + } } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 6f9853122cbb..2bc18aa3b0a3 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -230,7 +230,8 @@ class StorageReplicatedMergeTree final : public MergeTreeData /// Add a part to the queue of parts whose data you want to check in the background thread. void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0); - void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override; + DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override; + CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override; /// Checks ability to use granularity bool canUseAdaptiveGranularity() const override; @@ -990,6 +991,34 @@ class StorageReplicatedMergeTree final : public MergeTreeData bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override; void startupImpl(bool from_attach_thread); + + struct DataValidationTasks : public IStorage::DataValidationTasksBase + { + explicit DataValidationTasks(DataPartsVector && parts_, std::unique_lock && parts_check_lock_) + : parts_check_lock(std::move(parts_check_lock_)), parts(std::move(parts_)), it(parts.begin()) + {} + + DataPartPtr next() + { + std::lock_guard lock(mutex); + if (it == parts.end()) + return nullptr; + return *(it++); + } + + size_t size() const override + { + std::lock_guard lock(mutex); + return std::distance(it, parts.end()); + } + + std::unique_lock parts_check_lock; + + mutable std::mutex mutex; + DataPartsVector parts; + DataPartsVector::const_iterator it; + }; + }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index 5a3a007035c0..c242830e657c 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -403,15 +403,18 @@ SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const Storage return std::make_shared(*this, metadata_snapshot, std::move(lock)); } -void StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr local_context, CheckDataCallback check_callback) +IStorage::DataValidationTasksPtr StorageStripeLog::getCheckTaskList(const ASTPtr & /* query */, ContextPtr local_context) { ReadLock lock{rwlock, getLockTimeout(local_context)}; if (!lock) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded"); - - file_checker.check(check_callback); + return std::make_unique(file_checker.getDataValidationTasks(), std::move(lock)); } +CheckResult StorageStripeLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) +{ + return file_checker.checkNextEntry(static_cast(check_task_list.get())->file_checker_tasks, has_nothing_to_do); +} void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) { diff --git a/src/Storages/StorageStripeLog.h b/src/Storages/StorageStripeLog.h index 61ce49f43da8..5d4e2fcbd3ab 100644 --- a/src/Storages/StorageStripeLog.h +++ b/src/Storages/StorageStripeLog.h @@ -53,7 +53,8 @@ friend class StripeLogSink; void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override; - void checkData(const ASTPtr & query, ContextPtr ocal_context, CheckDataCallback check_callback) override; + DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override; + CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override; bool storesDataOnDisk() const override { return true; } Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; } @@ -93,6 +94,20 @@ friend class StripeLogSink; const DiskPtr disk; String table_path; + struct DataValidationTasks : public IStorage::DataValidationTasksBase + { + DataValidationTasks(FileChecker::DataValidationTasksPtr file_checker_tasks_, ReadLock && lock_) + : file_checker_tasks(std::move(file_checker_tasks_)), lock(std::move(lock_)) + {} + + size_t size() const override { return file_checker_tasks->size(); } + + FileChecker::DataValidationTasksPtr file_checker_tasks; + + /// Lock to prevent table modification while checking + ReadLock lock; + }; + String data_file_path; String index_file_path; FileChecker file_checker; From 11554fe9c0efae873fb112fbde718cd33217f4ed Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 1 Aug 2023 12:39:26 +0000 Subject: [PATCH 3/7] Upd tests for CHECK TABLE --- .../0_stateless/00063_check_query.reference | 1 + .../queries/0_stateless/00063_check_query.sql | 3 ++ .../queries/0_stateless/00961_check_table.sql | 12 ++++---- ...1042_check_query_and_last_granule_size.sql | 12 ++++---- ...02235_check_table_sparse_serialization.sql | 3 +- .../02841_check_table_progress.reference | 2 ++ .../0_stateless/02841_check_table_progress.sh | 29 +++++++++++++++++++ 7 files changed, 48 insertions(+), 14 deletions(-) create mode 100644 tests/queries/0_stateless/02841_check_table_progress.reference create mode 100755 tests/queries/0_stateless/02841_check_table_progress.sh diff --git a/tests/queries/0_stateless/00063_check_query.reference b/tests/queries/0_stateless/00063_check_query.reference index 6ed281c757a9..e8183f05f5db 100644 --- a/tests/queries/0_stateless/00063_check_query.reference +++ b/tests/queries/0_stateless/00063_check_query.reference @@ -1,2 +1,3 @@ 1 1 +1 diff --git a/tests/queries/0_stateless/00063_check_query.sql b/tests/queries/0_stateless/00063_check_query.sql index e7362074a059..263cf94fb4af 100644 --- a/tests/queries/0_stateless/00063_check_query.sql +++ b/tests/queries/0_stateless/00063_check_query.sql @@ -8,6 +8,9 @@ INSERT INTO check_query_tiny_log VALUES (1, 'A'), (2, 'B'), (3, 'C'); CHECK TABLE check_query_tiny_log; +-- Settings and FORMAT are supported +CHECK TABLE check_query_tiny_log SETTINGS max_threads = 16; +CHECK TABLE check_query_tiny_log FORMAT Null SETTINGS max_threads = 8, check_query_single_value_result = 0; DROP TABLE IF EXISTS check_query_log; diff --git a/tests/queries/0_stateless/00961_check_table.sql b/tests/queries/0_stateless/00961_check_table.sql index 0e0b2c3b4837..079acc8cdbb2 100644 --- a/tests/queries/0_stateless/00961_check_table.sql +++ b/tests/queries/0_stateless/00961_check_table.sql @@ -3,29 +3,29 @@ DROP TABLE IF EXISTS mt_table; CREATE TABLE mt_table (d Date, key UInt64, data String) ENGINE = MergeTree() PARTITION BY toYYYYMM(d) ORDER BY key; -CHECK TABLE mt_table; +CHECK TABLE mt_table SETTINGS max_threads = 1; INSERT INTO mt_table VALUES (toDate('2019-01-02'), 1, 'Hello'), (toDate('2019-01-02'), 2, 'World'); -CHECK TABLE mt_table; +CHECK TABLE mt_table SETTINGS max_threads = 1; INSERT INTO mt_table VALUES (toDate('2019-01-02'), 3, 'quick'), (toDate('2019-01-02'), 4, 'brown'); SELECT '========'; -CHECK TABLE mt_table; +CHECK TABLE mt_table SETTINGS max_threads = 1; OPTIMIZE TABLE mt_table FINAL; SELECT '========'; -CHECK TABLE mt_table; +CHECK TABLE mt_table SETTINGS max_threads = 1; SELECT '========'; INSERT INTO mt_table VALUES (toDate('2019-02-03'), 5, '!'), (toDate('2019-02-03'), 6, '?'); -CHECK TABLE mt_table; +CHECK TABLE mt_table SETTINGS max_threads = 1; SELECT '========'; @@ -33,6 +33,6 @@ INSERT INTO mt_table VALUES (toDate('2019-02-03'), 7, 'jump'), (toDate('2019-02- OPTIMIZE TABLE mt_table FINAL; -CHECK TABLE mt_table PARTITION 201902; +CHECK TABLE mt_table PARTITION 201902 SETTINGS max_threads = 1; DROP TABLE IF EXISTS mt_table; diff --git a/tests/queries/0_stateless/01042_check_query_and_last_granule_size.sql b/tests/queries/0_stateless/01042_check_query_and_last_granule_size.sql index b66aff8384d7..eccb2d258781 100644 --- a/tests/queries/0_stateless/01042_check_query_and_last_granule_size.sql +++ b/tests/queries/0_stateless/01042_check_query_and_last_granule_size.sql @@ -7,11 +7,11 @@ CREATE TABLE check_query_test (SomeKey UInt64, SomeValue String) ENGINE = MergeT -- Rows in this table are short, so granularity will be 8192. INSERT INTO check_query_test SELECT number, toString(number) FROM system.numbers LIMIT 81920; -CHECK TABLE check_query_test; +CHECK TABLE check_query_test SETTINGS max_threads = 1; OPTIMIZE TABLE check_query_test; -CHECK TABLE check_query_test; +CHECK TABLE check_query_test SETTINGS max_threads = 1; DROP TABLE IF EXISTS check_query_test; @@ -21,18 +21,18 @@ CREATE TABLE check_query_test_non_adaptive (SomeKey UInt64, SomeValue String) EN INSERT INTO check_query_test_non_adaptive SELECT number, toString(number) FROM system.numbers LIMIT 81920; -CHECK TABLE check_query_test_non_adaptive; +CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1; OPTIMIZE TABLE check_query_test_non_adaptive; -CHECK TABLE check_query_test_non_adaptive; +CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1; INSERT INTO check_query_test_non_adaptive SELECT number, toString(number) FROM system.numbers LIMIT 77; -CHECK TABLE check_query_test_non_adaptive; +CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1; OPTIMIZE TABLE check_query_test_non_adaptive; -CHECK TABLE check_query_test_non_adaptive; +CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1; DROP TABLE IF EXISTS check_query_test_non_adaptive; diff --git a/tests/queries/0_stateless/02235_check_table_sparse_serialization.sql b/tests/queries/0_stateless/02235_check_table_sparse_serialization.sql index 0ac97404c46f..625be63e0c07 100644 --- a/tests/queries/0_stateless/02235_check_table_sparse_serialization.sql +++ b/tests/queries/0_stateless/02235_check_table_sparse_serialization.sql @@ -12,7 +12,6 @@ SELECT name, column, serialization_kind FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_sparse_02235' ORDER BY name, column; -SET check_query_single_value_result = 0; -CHECK TABLE t_sparse_02235; +CHECK TABLE t_sparse_02235 SETTINGS check_query_single_value_result = 0, max_threads = 1; DROP TABLE t_sparse_02235; diff --git a/tests/queries/0_stateless/02841_check_table_progress.reference b/tests/queries/0_stateless/02841_check_table_progress.reference new file mode 100644 index 000000000000..541dab48def8 --- /dev/null +++ b/tests/queries/0_stateless/02841_check_table_progress.reference @@ -0,0 +1,2 @@ +Ok +Ok diff --git a/tests/queries/0_stateless/02841_check_table_progress.sh b/tests/queries/0_stateless/02841_check_table_progress.sh new file mode 100755 index 000000000000..166386b999b5 --- /dev/null +++ b/tests/queries/0_stateless/02841_check_table_progress.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t0"; +${CLICKHOUSE_CLIENT} -q "CREATE TABLE t0 (x UInt64, val String) ENGINE = MergeTree ORDER BY x PARTITION BY x % 100"; +${CLICKHOUSE_CLIENT} -q "INSERT INTO t0 SELECT sipHash64(number), randomPrintableASCII(1000) FROM numbers(1000)"; + + +# Check that we have at least 3 different values for read_rows +UNIQUE_VALUES=$( + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d @- <<< "CHECK TABLE t0" -v |& { + grep -F -e X-ClickHouse-Progress: -e X-ClickHouse-Summary: | grep -o '"read_rows"\s*:\s*"[0-9]*"' + } | uniq | wc -l +) + +[ "$UNIQUE_VALUES" -ge "3" ] && echo "Ok" || echo "Fail: got $UNIQUE_VALUES" + + +# Check that we have we have at least 100 total_rows_to_read (at least one check task per partition) +MAX_TOTAL_VALUE=$( + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d @- <<< "CHECK TABLE t0" -v |& { + grep -F -e X-ClickHouse-Progress: -e X-ClickHouse-Summary: | grep -o '"total_rows_to_read"\s*:\s*"[0-9]*"' | grep -o '[0-9]*' + } | sort -n | tail -1 +) + +[ "$MAX_TOTAL_VALUE" -ge "100" ] && echo "Ok" || echo "Fail: got $MAX_TOTAL_VALUE" From aeee50466cb237c0458c552a8e15d957dce013bc Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 1 Aug 2023 12:39:49 +0000 Subject: [PATCH 4/7] Upd doc for CHECK TABLE --- .../sql-reference/statements/check-table.md | 97 ++++++++++++++----- 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/docs/en/sql-reference/statements/check-table.md b/docs/en/sql-reference/statements/check-table.md index 0209d59b018e..588be8417557 100644 --- a/docs/en/sql-reference/statements/check-table.md +++ b/docs/en/sql-reference/statements/check-table.md @@ -5,19 +5,38 @@ sidebar_label: CHECK TABLE title: "CHECK TABLE Statement" --- -Checks if the data in the table is corrupted. +The `CHECK TABLE` query in ClickHouse is used to perform a validation check on a specific table or its partitions. It ensures the integrity of the data by verifying the checksums and other internal data structures. -``` sql -CHECK TABLE [db.]name [PARTITION partition_expr] +Particularly it compares actual file sizes with the expected values which are stored on the server. If the file sizes do not match the stored values, it means the data is corrupted. This can be caused, for example, by a system crash during query execution. + +:::note +The `CHECK TABLE`` query may read all the data in the table and hold some resources, making it resource-intensive. +Consider the potential impact on performance and resource utilization before executing this query. +::: + +## Syntax + +The basic syntax of the query is as follows: + +```sql +CHECK TABLE table_name [PARTITION partition_expression] [FORMAT format] [SETTINGS check_query_single_value_result = (0|1) [, other_settings] ] ``` -The `CHECK TABLE` query compares actual file sizes with the expected values which are stored on the server. If the file sizes do not match the stored values, it means the data is corrupted. This can be caused, for example, by a system crash during query execution. +- `table_name`: Specifies the name of the table that you want to check. +- `partition_expression`: (Optional) If you want to check a specific partition of the table, you can use this expression to specify the partition. +- `FORMAT format`: (Optional) Allows you to specify the output format of the result. +- `SETTINGS`: (Optional) Allows additional settings. + - **`check_query_single_value_result`**: (Optional) This setting allows you to toggle between a detailed result (`0`) or a summarized result (`1`). + - Other settings (e.g. `max_threads` can be applied as well). + -The query response contains the `result` column with a single row. The row has a value of -[Boolean](../../sql-reference/data-types/boolean.md) type: +The query response depends on the value of contains `check_query_single_value_result` setting. +In case of `check_query_single_value_result = 1` only `result` column with a single row is returned. Value inside this row is `1` if the integrity check is passed and `0` if data is corrupted. -- 0 - The data in the table is corrupted. -- 1 - The data maintains integrity. +With `check_query_single_value_result = 0` the query returns the following columns: + - `part_path`: Indicates the path to the data part or file name. + - `is_passed`: Returns 1 if the check for this part is successful, 0 otherwise. + - `message`: Any additional messages related to the check, such as errors or success messages. The `CHECK TABLE` query supports the following table engines: @@ -26,39 +45,73 @@ The `CHECK TABLE` query supports the following table engines: - [StripeLog](../../engines/table-engines/log-family/stripelog.md) - [MergeTree family](../../engines/table-engines/mergetree-family/mergetree.md) -Performed over the tables with another table engines causes an exception. +Performed over the tables with another table engines causes an `NOT_IMPLEMETED` exception. Engines from the `*Log` family do not provide automatic data recovery on failure. Use the `CHECK TABLE` query to track data loss in a timely manner. -## Checking the MergeTree Family Tables +## Examples -For `MergeTree` family engines, if [check_query_single_value_result](../../operations/settings/settings.md#check_query_single_value_result) = 0, the `CHECK TABLE` query shows a check status for every individual data part of a table on the local server. +By default `CHECK TABLE` query shows the general table check status: ```sql -SET check_query_single_value_result = 0; CHECK TABLE test_table; ``` ```text -┌─part_path─┬─is_passed─┬─message─┐ -│ all_1_4_1 │ 1 │ │ -│ all_1_4_2 │ 1 │ │ -└───────────┴───────────┴─────────┘ +┌─result─┐ +│ 1 │ +└────────┘ ``` -If `check_query_single_value_result` = 1, the `CHECK TABLE` query shows the general table check status. +If you want to see the check status for every individual data part you may use `check_query_single_value_result` setting. + +Also, to check a specific partition of the table, you can use the `PARTITION` keyword. ```sql -SET check_query_single_value_result = 1; -CHECK TABLE test_table; +CHECK TABLE t0 PARTITION ID '201003' +FORMAT PrettyCompactMonoBlock +SETTINGS check_query_single_value_result = 0 ``` +Output: + ```text -┌─result─┐ -│ 1 │ -└────────┘ +┌─part_path────┬─is_passed─┬─message─┐ +│ 201003_7_7_0 │ 1 │ │ +│ 201003_3_3_0 │ 1 │ │ +└──────────────┴───────────┴─────────┘ ``` +### Receiving a 'Corrupted' Result + +:::warning +Disclaimer: The procedure described here, including the manual manipulating or removing files directly from the data directory, is for experimental or development environments only. Do **not** attempt this on a production server, as it may lead to data loss or other unintended consequences. +::: + +Remove the existing checksum file: + +```bash +rm /var/lib/clickhouse-server/data/default/t0/201003_3_3_0/checksums.txt +``` + +```sql +CHECK TABLE t0 PARTITION ID '201003' +FORMAT PrettyCompactMonoBlock +SETTINGS check_query_single_value_result = 0 + + +Output: + +```text +┌─part_path────┬─is_passed─┬─message──────────────────────────────────┐ +│ 201003_7_7_0 │ 1 │ │ +│ 201003_3_3_0 │ 1 │ Checksums recounted and written to disk. │ +└──────────────┴───────────┴──────────────────────────────────────────┘ +``` + +If the checksums.txt file is missing, it can be restored. It will be recalculated and rewritten during the execution of the CHECK TABLE command for the specific partition, and the status will still be reported as 'success.'" + + ## If the Data Is Corrupted If the table is corrupted, you can copy the non-corrupted data to another table. To do this: From 358ef26385fd3b56233babae4b9ab335c16cf6dd Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 4 Aug 2023 09:22:00 +0000 Subject: [PATCH 5/7] small fixes for CHECK TABLE --- src/Interpreters/InterpreterCheckQuery.cpp | 6 +++--- src/Storages/StorageLog.cpp | 2 +- src/Storages/StorageMergeTree.cpp | 6 +++--- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- src/Storages/StorageStripeLog.cpp | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Interpreters/InterpreterCheckQuery.cpp b/src/Interpreters/InterpreterCheckQuery.cpp index 113a5d9fc90b..bd530654dd2b 100644 --- a/src/Interpreters/InterpreterCheckQuery.cpp +++ b/src/Interpreters/InterpreterCheckQuery.cpp @@ -129,7 +129,7 @@ class TableCheckResultEmitter : public IAccumulatingTransform Chunk generate() override { - if (is_valuer_emitted.exchange(true)) + if (is_value_emitted.exchange(true)) return {}; auto block = getSingleValueBlock(result_value); return Chunk(block.getColumns(), block.rows()); @@ -137,7 +137,7 @@ class TableCheckResultEmitter : public IAccumulatingTransform private: std::atomic result_value{1}; - std::atomic_bool is_valuer_emitted{false}; + std::atomic_bool is_value_emitted{false}; }; } @@ -194,7 +194,7 @@ BlockIO InterpreterCheckQuery::execute() if (settings.check_query_single_value_result) { auto emitter_processor = std::make_shared(); - auto input_port = &emitter_processor->getInputPort(); + auto * input_port = &emitter_processor->getInputPort(); processors->emplace_back(emitter_processor); connect(*resize_outport, *input_port); diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index 6fb786927258..0e9f83e886a9 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -876,7 +876,7 @@ IStorage::DataValidationTasksPtr StorageLog::getCheckTaskList(const ASTPtr & /* CheckResult StorageLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) { - return file_checker.checkNextEntry(static_cast(check_task_list.get())->file_checker_tasks, has_nothing_to_do); + return file_checker.checkNextEntry(assert_cast(check_task_list.get())->file_checker_tasks, has_nothing_to_do); } IStorage::ColumnSizeByName StorageLog::getColumnSizes() const diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 4593ed6c84a3..97fc7a6731f1 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2213,9 +2213,9 @@ IStorage::DataValidationTasksPtr StorageMergeTree::getCheckTaskList(const ASTPtr CheckResult StorageMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) { - auto local_context = static_cast(check_task_list.get())->context; - - if (auto part = static_cast(check_task_list.get())->next()) + auto * data_validation_tasks = assert_cast(check_task_list.get()); + auto local_context = data_validation_tasks->context; + if (auto part = data_validation_tasks->next()) { /// If the checksums file is not present, calculate the checksums and write them to disk. static constexpr auto checksums_path = "checksums.txt"; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index b04d824e323d..c08f1ebcc485 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8499,7 +8499,7 @@ IStorage::DataValidationTasksPtr StorageReplicatedMergeTree::getCheckTaskList(co CheckResult StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) { - if (auto part = static_cast(check_task_list.get())->next()) + if (auto part = assert_cast(check_task_list.get())->next()) { try { diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index c242830e657c..a3cbff961998 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -413,7 +413,7 @@ IStorage::DataValidationTasksPtr StorageStripeLog::getCheckTaskList(const ASTPtr CheckResult StorageStripeLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) { - return file_checker.checkNextEntry(static_cast(check_task_list.get())->file_checker_tasks, has_nothing_to_do); + return file_checker.checkNextEntry(assert_cast(check_task_list.get())->file_checker_tasks, has_nothing_to_do); } void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) From 50715ca4fc2682e810ec7cc3d74212fec8098e49 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 7 Aug 2023 11:57:11 +0000 Subject: [PATCH 6/7] fix doc links --- docs/en/sql-reference/statements/check-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/statements/check-table.md b/docs/en/sql-reference/statements/check-table.md index 588be8417557..45fc17b044a3 100644 --- a/docs/en/sql-reference/statements/check-table.md +++ b/docs/en/sql-reference/statements/check-table.md @@ -117,6 +117,6 @@ If the checksums.txt file is missing, it can be restored. It will be recalculate If the table is corrupted, you can copy the non-corrupted data to another table. To do this: 1. Create a new table with the same structure as damaged table. To do this execute the query `CREATE TABLE AS `. -2. Set the [max_threads](../../operations/settings/settings.md#settings-max_threads) value to 1 to process the next query in a single thread. To do this run the query `SET max_threads = 1`. +2. Set the [max_threads](/docs/en/operations/settings/settings.md/#settings-max_threads) value to 1 to process the next query in a single thread. To do this run the query `SET max_threads = 1`. 3. Execute the query `INSERT INTO SELECT * FROM `. This request copies the non-corrupted data from the damaged table to another table. Only the data before the corrupted part will be copied. 4. Restart the `clickhouse-client` to reset the `max_threads` value. From 1183dac293d5d474b8a9bbad34819d1ac14def44 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 8 Aug 2023 09:45:23 +0000 Subject: [PATCH 7/7] fix doc links --- docs/en/sql-reference/statements/check-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/statements/check-table.md b/docs/en/sql-reference/statements/check-table.md index 45fc17b044a3..db8c32249efb 100644 --- a/docs/en/sql-reference/statements/check-table.md +++ b/docs/en/sql-reference/statements/check-table.md @@ -117,6 +117,6 @@ If the checksums.txt file is missing, it can be restored. It will be recalculate If the table is corrupted, you can copy the non-corrupted data to another table. To do this: 1. Create a new table with the same structure as damaged table. To do this execute the query `CREATE TABLE AS `. -2. Set the [max_threads](/docs/en/operations/settings/settings.md/#settings-max_threads) value to 1 to process the next query in a single thread. To do this run the query `SET max_threads = 1`. +2. Set the `max_threads` value to 1 to process the next query in a single thread. To do this run the query `SET max_threads = 1`. 3. Execute the query `INSERT INTO SELECT * FROM `. This request copies the non-corrupted data from the damaged table to another table. Only the data before the corrupted part will be copied. 4. Restart the `clickhouse-client` to reset the `max_threads` value.