Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove useless code around locks #9907

Merged
merged 3 commits into from
Mar 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId()));
addTableLock(storage->lockStructureForShare(context.getInitialQueryId()));

/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
Expand Down Expand Up @@ -54,7 +54,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(

if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
addTableLock(materialized_view->lockStructureForShare(true, context.getInitialQueryId()));
addTableLock(materialized_view->lockStructureForShare(context.getInitialQueryId()));

StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseMySQL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ void DatabaseMySQL::cleanOutdatedTables()
++iterator;
else
{
const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY);
const auto table_lock = (*iterator)->lockAlterIntention();

(*iterator)->shutdown();
(*iterator)->is_dropped = true;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Functions/FunctionJoinGet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ FunctionBaseImplPtr JoinGetOverloadResolver::build(const ColumnsWithTypeAndName
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size());

auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId());
auto table_lock = storage_join->lockStructureForShare(context.getInitialQueryId());
for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ BlockIO InterpreterAlterQuery::execute()

if (!mutation_commands.empty())
{
auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId());
auto table_lock_holder = table->lockStructureForShare(context.getCurrentQueryId());
MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder);
table->mutate(mutation_commands, context);
}
Expand All @@ -101,15 +101,15 @@ BlockIO InterpreterAlterQuery::execute()
switch (command.type)
{
case LiveViewCommand::REFRESH:
live_view->refresh(context);
live_view->refresh();
break;
}
}
}

if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
auto table_lock_holder = table->lockAlterIntention();
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, context);
alter_commands.prepare(metadata);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table});

/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
as_storage_lock = as_storage->lockStructureForShare(context.getCurrentQueryId());
properties.columns = as_storage->getColumns();

/// Secondary indices make sense only for MergeTree family of storage engines.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterDescribeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = DatabaseCatalog::instance().getTable(table_id);
}

auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId());
auto table_lock = table->lockStructureForShare(context.getInitialQueryId());
columns = table->getColumns();
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ BlockIO InterpreterInsertQuery::execute()
BlockIO res;

StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
auto table_lock = table->lockStructureForShare(context.getInitialQueryId());

auto query_sample_block = getSampleBlock(query, table);
if (!query.table_function)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(

if (storage)
{
table_lock = storage->lockStructureForShare(false, context->getInitialQueryId());
table_lock = storage->lockStructureForShare(context->getInitialQueryId());
table_id = storage->getStorageID();
}

Expand Down
15 changes: 5 additions & 10 deletions dbms/src/Storages/IStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,22 +314,20 @@ bool IStorage::isVirtualColumn(const String & column_name) const
return getColumns().get(column_name).is_virtual;
}

TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
TableStructureReadLockHolder IStorage::lockStructureForShare(const String & query_id)
{
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);

if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return result;
}

TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id)
TableStructureWriteLockHolder IStorage::lockAlterIntention()
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
result.alter_lock = std::unique_lock(alter_lock);

if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
Expand All @@ -338,23 +336,20 @@ TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_

void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
if (!lock_holder.alter_intention_lock)
if (!lock_holder.alter_lock)
throw Exception("Alter intention lock for table " + getStorageID().getNameForLogs() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR);

if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
}

TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
{
TableStructureWriteLockHolder result;
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
result.alter_lock = std::unique_lock(alter_lock);

if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);

result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);

return result;
Expand Down
11 changes: 3 additions & 8 deletions dbms/src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// Acquire this lock if you need the table structure to remain constant during the execution of
/// the query. If will_add_new_data is true, this means that the query will add new data to the table
/// (INSERT or a parts merge).
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id);
TableStructureReadLockHolder lockStructureForShare(const String & query_id);

/// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you
/// can modify the table structure. It can later be upgraded to the exclusive lock.
TableStructureWriteLockHolder lockAlterIntention(const String & query_id);
TableStructureWriteLockHolder lockAlterIntention();

/// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries
/// to ensure that no other query uses the table structure and it can be safely changed.
Expand Down Expand Up @@ -472,12 +472,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// If you hold this lock exclusively, you can be sure that no other structure modifying queries
/// (e.g. ALTER, DROP) are concurrently executing. But queries that only read table structure
/// (e.g. SELECT, INSERT) can continue to execute.
mutable RWLock alter_intention_lock = RWLockImpl::create();

/// It is taken for share for the entire INSERT query and the entire merge of the parts (for MergeTree).
/// ALTER COLUMN queries acquire an exclusive lock to ensure that no new parts with the old structure
/// are added to the table and thus the set of parts to modify doesn't change.
mutable RWLock new_data_structure_lock = RWLockImpl::create();
mutable std::mutex alter_lock;

/// Lock for the table column structure (names, types, etc.) and data path.
/// It is taken in exclusive mode by queries that modify them (e.g. RENAME, ALTER and DROP)
Expand Down
11 changes: 4 additions & 7 deletions dbms/src/Storages/LiveView/StorageLiveView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,14 +517,11 @@ void StorageLiveView::drop(TableStructureWriteLockHolder &)
condition.notify_all();
}

void StorageLiveView::refresh(const Context & context)
void StorageLiveView::refresh()
{
auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
{
std::lock_guard lock(mutex);
if (getNewBlocks())
condition.notify_all();
}
std::lock_guard lock(mutex);
if (getNewBlocks())
condition.notify_all();
}

Pipes StorageLiveView::read(
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/LiveView/StorageLiveView.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ friend class LiveViewBlockOutputStream;
void startup() override;
void shutdown() override;

void refresh(const Context & context);
void refresh();

Pipes read(
const Names & column_names,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/MergeTree/DataPartsExchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo

try
{
auto storage_lock = data.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto storage_lock = data.lockStructureForShare(RWLockImpl::NO_QUERY);

MergeTreeData::DataPartPtr part = findPart(part_name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()

{
/// TODO: Implement tryLockStructureForShare.
auto lock = storage.lockStructureForShare(false, "");
auto lock = storage.lockStructureForShare("");
storage.clearOldTemporaryDirectories();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
else if (part->name == part_name)
{
auto zookeeper = storage.getZooKeeper();
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto table_lock = storage.lockStructureForShare(RWLockImpl::NO_QUERY);

auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->getColumns(), part->checksums);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Pipes StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);

auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId());
auto destination_lock = destination->lockStructureForShare(context.getCurrentQueryId());

const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/StorageMaterializedView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ Pipes StorageMaterializedView::read(
const unsigned num_streams)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(context.getCurrentQueryId());
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);

Expand All @@ -200,7 +200,7 @@ Pipes StorageMaterializedView::read(
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(context.getCurrentQueryId());
auto stream = storage->write(query, context);
stream->addTableLock(lock);
return stream;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/StorageMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
{
auto & table = iterator->table();
if (table.get() != this)
selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id), iterator->name());
selected_tables.emplace_back(table, table->lockStructureForShare(query_id), iterator->name());

iterator->next();
}
Expand All @@ -389,7 +389,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr

if (storage.get() != this)
{
selected_tables.emplace_back(storage, storage->lockStructureForShare(false, query_id), iterator->name());
selected_tables.emplace_back(storage, storage->lockStructureForShare(query_id), iterator->name());
virtual_column->insert(iterator->name());
}

Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void StorageMergeTree::alter(

DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);

/// We release all locks except alter_intention_lock which allows
/// We release all locks except alter_lock which allows
/// to execute alter queries sequentially
table_lock_holder.releaseAllExceptAlterIntention();

Expand Down Expand Up @@ -532,7 +532,7 @@ bool StorageMergeTree::merge(
bool deduplicate,
String * out_disable_reason)
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY);

FutureMergedMutatedPart future_part;

Expand Down Expand Up @@ -650,7 +650,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()

bool StorageMergeTree::tryMutatePart()
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;

FutureMergedMutatedPart future_part;
Expand Down Expand Up @@ -775,7 +775,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(false, "");
auto lock_structure = lockStructureForShare("");
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
}
Expand Down Expand Up @@ -968,14 +968,14 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma

case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(context.getCurrentQueryId());
freezePartition(command.partition, command.with_name, context, lock);
}
break;

case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
auto lock = lockStructureForShare(context.getCurrentQueryId());
freezeAll(command.with_name, context, lock);
}
break;
Expand Down Expand Up @@ -1040,8 +1040,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par

void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(context.getCurrentQueryId());

Stopwatch watch;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
Expand Down Expand Up @@ -1111,8 +1111,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con

void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
auto lock1 = lockStructureForShare(context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(context.getCurrentQueryId());

auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)
Expand Down