Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix not started database shutdown deadlock #59137

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,6 @@ try
LOG_INFO(log, "Stopping AsyncLoader.");

// Waits for all currently running jobs to finish and do not run any other pending jobs.
// Pending jobs will be canceled and destructed later by `load_metadata_tasks` dtor.
global_context->getAsyncLoader().stop();
);

Expand Down
31 changes: 24 additions & 7 deletions src/Databases/DatabaseAtomic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const

void DatabaseAtomic::drop(ContextPtr)
{
waitDatabaseStarted(false);
waitDatabaseStarted();
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
try
{
Expand Down Expand Up @@ -115,7 +115,7 @@ StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String &

void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
waitDatabaseStarted(false);
waitDatabaseStarted();
auto table = tryGetTable(table_name, local_context);
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
Expand Down Expand Up @@ -179,7 +179,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
if (exchange && !supportsAtomicRename())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RENAME EXCHANGE is not supported");

waitDatabaseStarted(false);
waitDatabaseStarted();

auto & other_db = dynamic_cast<DatabaseAtomic &>(to_database);
bool inside_database = this == &other_db;
Expand Down Expand Up @@ -468,13 +468,30 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second, true);
});
std::scoped_lock lock(mutex);
return startup_atomic_database_task = makeLoadTask(async_loader, {job});
}

void DatabaseAtomic::waitDatabaseStarted(bool no_throw) const
void DatabaseAtomic::waitDatabaseStarted() const
{
if (startup_atomic_database_task)
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_atomic_database_task, no_throw);
LoadTaskPtr task;
{
std::scoped_lock lock(mutex);
task = startup_atomic_database_task;
}
if (task)
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task, false);
}

void DatabaseAtomic::stopLoading()
{
LoadTaskPtr stop_atomic_database;
{
std::scoped_lock lock(mutex);
stop_atomic_database.swap(startup_atomic_database_task);
}
stop_atomic_database.reset();
DatabaseOrdinary::stopLoading();
}

void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)
Expand Down Expand Up @@ -544,7 +561,7 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
{
/// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard

waitDatabaseStarted(false);
waitDatabaseStarted();

bool check_ref_deps = query_context->getSettingsRef().check_referential_table_dependencies;
bool check_loading_deps = !check_ref_deps && query_context->getSettingsRef().check_table_dependencies;
Expand Down
5 changes: 3 additions & 2 deletions src/Databases/DatabaseAtomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class DatabaseAtomic : public DatabaseOrdinary
void beforeLoadingMetadata(ContextMutablePtr context, LoadingStrictnessLevel mode) override;

LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
void waitDatabaseStarted(bool no_throw) const override;
void waitDatabaseStarted() const override;
void stopLoading() override;

/// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override;
Expand Down Expand Up @@ -87,7 +88,7 @@ class DatabaseAtomic : public DatabaseOrdinary
String path_to_metadata_symlink;
const UUID db_uuid;

LoadTaskPtr startup_atomic_database_task;
LoadTaskPtr startup_atomic_database_task TSA_GUARDED_BY(mutex);
};

}
12 changes: 6 additions & 6 deletions src/Databases/DatabaseOnDisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ DatabaseOnDisk::DatabaseOnDisk(

void DatabaseOnDisk::shutdown()
{
waitDatabaseStarted(/* no_throw = */ true);
stopLoading();
DatabaseWithOwnTablesBase::shutdown();
}

Expand Down Expand Up @@ -196,7 +196,7 @@ void DatabaseOnDisk::createTable(
throw Exception(
ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists", backQuote(getDatabaseName()), backQuote(table_name));

waitDatabaseStarted(false);
waitDatabaseStarted();

String table_metadata_path = getObjectMetadataPath(table_name);

Expand Down Expand Up @@ -287,7 +287,7 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora

void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const String & table_name)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

auto table = detachTable(query_context, table_name);

Expand All @@ -305,7 +305,7 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri

void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop = table_metadata_path + drop_suffix;
Expand Down Expand Up @@ -391,7 +391,7 @@ void DatabaseOnDisk::renameTable(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases of different engines is not supported");
}

waitDatabaseStarted(false);
waitDatabaseStarted();

auto table_data_relative_path = getTableDataPath(table_name);
TableExclusiveLockHolder table_lock;
Expand Down Expand Up @@ -534,7 +534,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const

void DatabaseOnDisk::drop(ContextPtr local_context)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
if (local_context->getSettingsRef().force_remove_data_recursively_on_drop)
Expand Down
33 changes: 29 additions & 4 deletions src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ LoadTaskPtr DatabaseOrdinary::startupDatabaseAsync(
// 1) startup should be done after tables loading
// 2) load or startup errors for tables should not lead to not starting up the whole database
});
std::scoped_lock lock(mutex);
return startup_database_task = makeLoadTask(async_loader, {job});
}

Expand All @@ -255,11 +256,35 @@ void DatabaseOrdinary::waitTableStarted(const String & name) const
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task);
}

void DatabaseOrdinary::waitDatabaseStarted(bool no_throw) const
void DatabaseOrdinary::waitDatabaseStarted() const
{
/// Prioritize load and startup of all tables and database itself and wait for them synchronously
if (startup_database_task)
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_database_task, no_throw);
LoadTaskPtr task;
{
std::scoped_lock lock(mutex);
task = startup_database_task;
}
if (task)
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task);
}

void DatabaseOrdinary::stopLoading()
{
std::unordered_map<String, LoadTaskPtr> stop_load_table;
std::unordered_map<String, LoadTaskPtr> stop_startup_table;
LoadTaskPtr stop_startup_database;
{
std::scoped_lock lock(mutex);
stop_load_table.swap(load_table);
stop_startup_table.swap(startup_table);
stop_startup_database.swap(startup_database_task);
}

// Cancel pending tasks and wait for currently running tasks
// Note that order must be backward of how it was created to make sure no dependent task is run after waiting for current task
stop_startup_database.reset();
stop_startup_table.clear();
stop_load_table.clear();
}

DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
Expand All @@ -272,7 +297,7 @@ DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_c

void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

String table_name = table_id.table_name;

Expand Down
5 changes: 3 additions & 2 deletions src/Databases/DatabaseOrdinary.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class DatabaseOrdinary : public DatabaseOnDisk

void waitTableStarted(const String & name) const override;

void waitDatabaseStarted(bool no_throw) const override;
void waitDatabaseStarted() const override;
void stopLoading() override;

LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;

Expand All @@ -76,7 +77,7 @@ class DatabaseOrdinary : public DatabaseOnDisk

std::unordered_map<String, LoadTaskPtr> load_table TSA_GUARDED_BY(mutex);
std::unordered_map<String, LoadTaskPtr> startup_table TSA_GUARDED_BY(mutex);
LoadTaskPtr startup_database_task;
LoadTaskPtr startup_database_task TSA_GUARDED_BY(mutex);
std::atomic<size_t> total_tables_to_startup{0};
std::atomic<size_t> tables_started{0};
AtomicStopwatch startup_watch;
Expand Down
43 changes: 30 additions & 13 deletions src/Databases/DatabaseReplicated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,30 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
ddl_worker->startup();
ddl_worker_initialized = true;
});
std::scoped_lock lock(mutex);
return startup_replicated_database_task = makeLoadTask(async_loader, {job});
}

void DatabaseReplicated::waitDatabaseStarted(bool no_throw) const
void DatabaseReplicated::waitDatabaseStarted() const
{
if (startup_replicated_database_task)
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_replicated_database_task, no_throw);
LoadTaskPtr task;
{
std::scoped_lock lock(mutex);
task = startup_replicated_database_task;
}
if (task)
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task);
}

void DatabaseReplicated::stopLoading()
{
LoadTaskPtr stop_startup_replicated_database;
{
std::scoped_lock lock(mutex);
stop_startup_replicated_database.swap(startup_replicated_database_task);
}
stop_startup_replicated_database.reset();
DatabaseAtomic::stopLoading();
}

bool DatabaseReplicated::checkDigestValid(const ContextPtr & local_context, bool debug_check /* = true */) const
Expand Down Expand Up @@ -743,7 +760,7 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_

BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

if (query_context->getCurrentTransaction() && query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Distributed DDL queries inside transactions are not supported");
Expand Down Expand Up @@ -807,7 +824,7 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context

void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

is_recovering = true;
SCOPE_EXIT({ is_recovering = false; });
Expand Down Expand Up @@ -1258,7 +1275,7 @@ void DatabaseReplicated::drop(ContextPtr context_)
return;
}

waitDatabaseStarted(false);
waitDatabaseStarted();

auto current_zookeeper = getZooKeeper();
current_zookeeper->set(replica_path, DROPPED_MARK, -1);
Expand All @@ -1277,7 +1294,7 @@ void DatabaseReplicated::drop(ContextPtr context_)

void DatabaseReplicated::stopReplication()
{
waitDatabaseStarted(/* no_throw = */ true);
stopLoading();
if (ddl_worker)
ddl_worker->shutdown();
}
Expand All @@ -1293,7 +1310,7 @@ void DatabaseReplicated::shutdown()

void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker || !ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
Expand Down Expand Up @@ -1337,7 +1354,7 @@ void DatabaseReplicated::renameTable(ContextPtr local_context, const String & ta
if (exchange && !to_database.isTableExist(to_table_name, local_context))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name);

waitDatabaseStarted(false);
waitDatabaseStarted();

String statement = readMetadataFile(table_name);
String statement_to;
Expand Down Expand Up @@ -1439,7 +1456,7 @@ bool DatabaseReplicated::canExecuteReplicatedMetadataAlter() const

void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const String & table_name)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
Expand All @@ -1464,7 +1481,7 @@ void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const

void DatabaseReplicated::removeDetachedPermanentlyFlag(ContextPtr local_context, const String & table_name, const String & table_metadata_path, bool attach)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
Expand Down Expand Up @@ -1502,7 +1519,7 @@ String DatabaseReplicated::readMetadataFile(const String & table_name) const
std::vector<std::pair<ASTPtr, StoragePtr>>
DatabaseReplicated::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr &) const
{
waitDatabaseStarted(false);
waitDatabaseStarted();

/// Here we read metadata from ZooKeeper. We could do that by simple call of DatabaseAtomic::getTablesForBackup() however
/// reading from ZooKeeper is better because thus we won't be dependent on how fast the replication queue of this database is.
Expand Down Expand Up @@ -1545,7 +1562,7 @@ void DatabaseReplicated::createTableRestoredFromBackup(
std::shared_ptr<IRestoreCoordination> restore_coordination,
UInt64 timeout_ms)
{
waitDatabaseStarted(false);
waitDatabaseStarted();

/// Because of the replication multiple nodes can try to restore the same tables again and failed with "Table already exists"
/// because of some table could be restored already on other node and then replicated to this node.
Expand Down
5 changes: 3 additions & 2 deletions src/Databases/DatabaseReplicated.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class DatabaseReplicated : public DatabaseAtomic
UInt64 getMetadataHash(const String & table_name) const;
bool checkDigestValid(const ContextPtr & local_context, bool debug_check = true) const TSA_REQUIRES(metadata_mutex);

void waitDatabaseStarted(bool no_throw) const override;
void waitDatabaseStarted() const override;
void stopLoading() override;

String zookeeper_path;
String shard_name;
Expand Down Expand Up @@ -155,7 +156,7 @@ class DatabaseReplicated : public DatabaseAtomic

mutable ClusterPtr cluster;

LoadTaskPtr startup_replicated_database_task;
LoadTaskPtr startup_replicated_database_task TSA_GUARDED_BY(mutex);
};

}
7 changes: 5 additions & 2 deletions src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,11 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>
virtual void waitTableStarted(const String & /*name*/) const {}

/// Waits for the database to be started up, i.e. task returned by `startupDatabaseAsync()` is done
/// NOTE: `no_throw` wait should be used during shutdown to (1) prevent race with startup and (2) avoid exceptions if startup failed
virtual void waitDatabaseStarted(bool /*no_throw*/) const {}
virtual void waitDatabaseStarted() const {}

/// Cancels all load and startup tasks and waits for currently running tasks to finish.
/// Should be used during shutdown to (1) prevent race with startup, (2) stop any not yet started task and (3) avoid exceptions if startup failed
virtual void stopLoading() {}

/// Check the existence of the table in memory (attached).
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;
Expand Down