Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not take lock for shared context in setTempDataOnDisk #48219

Merged
merged 4 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 50 additions & 16 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ struct ContextSharedPart : boost::noncopyable
ConfigurationPtr config; /// Global configuration settings.

String tmp_path; /// Path to the temporary files that occur when processing the request.
TemporaryDataOnDiskScopePtr temp_data_on_disk; /// Temporary files that occur when processing the request accounted here.

/// All temporary files that occur when processing the requests accounted here.
/// Child scopes for more fine-grained accounting are created per user/query/etc.
/// Initialized once during server startup.
TemporaryDataOnDiskScopePtr root_temp_data_on_disk;

mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader;
Expand Down Expand Up @@ -744,25 +748,35 @@ Strings Context::getWarnings() const
}

/// TODO: remove, use `getTempDataOnDisk`
VolumePtr Context::getTemporaryVolume() const
VolumePtr Context::getGlobalTemporaryVolume() const
{
auto lock = getLock();
if (shared->temp_data_on_disk)
return shared->temp_data_on_disk->getVolume();
/// Calling this method we just bypass the `temp_data_on_disk` and write to the file on the volume directly.
/// Volume is the same for `root_temp_data_on_disk` (always set) and `temp_data_on_disk` (if it's set).
if (shared->root_temp_data_on_disk)
return shared->root_temp_data_on_disk->getVolume();
return nullptr;
}

TemporaryDataOnDiskScopePtr Context::getTempDataOnDisk() const
{
auto lock = getLock();
if (this->temp_data_on_disk)
return this->temp_data_on_disk;
return shared->temp_data_on_disk;

auto lock = getLock();
return shared->root_temp_data_on_disk;
}

void Context::setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_)
TemporaryDataOnDiskScopePtr Context::getSharedTempDataOnDisk() const
{
auto lock = getLock();
return shared->root_temp_data_on_disk;
}

void Context::setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_)
{
/// It's set from `ProcessList::insert` in `executeQueryImpl` before query execution
/// so no races with `getTempDataOnDisk` which is called from query execution.
this->temp_data_on_disk = std::move(temp_data_on_disk_);
}

Expand All @@ -772,7 +786,7 @@ void Context::setPath(const String & path)

shared->path = path;

if (shared->tmp_path.empty() && !shared->temp_data_on_disk)
if (shared->tmp_path.empty() && !shared->root_temp_data_on_disk)
shared->tmp_path = shared->path + "tmp/";

if (shared->flags_path.empty())
Expand Down Expand Up @@ -828,6 +842,11 @@ static VolumePtr createLocalSingleDiskVolume(const std::string & path)

void Context::setTemporaryStoragePath(const String & path, size_t max_size)
{
auto lock = getLock();

if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");

shared->tmp_path = path;
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
Expand All @@ -839,17 +858,23 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size)
setupTmpPath(shared->log, disk->getPath());
}

shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
}

void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size)
{
std::lock_guard lock(shared->storage_policies_mutex);
StoragePolicyPtr tmp_policy;
{
/// lock in required only for accessing `shared->merge_tree_storage_policy_selector`
/// StoragePolicy itself is immutable.
std::lock_guard storage_policies_lock(shared->storage_policies_mutex);
tmp_policy = getStoragePolicySelector(storage_policies_lock)->get(policy_name);
}

StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name);

VolumePtr volume = tmp_policy->getVolume(0);

if (volume->getDisks().empty())
Expand All @@ -874,12 +899,21 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
setupTmpPath(shared->log, disk->getPath());
}

shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
}
auto lock = getLock();

if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");

shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
}

void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size)
{
auto lock = getLock();

if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");

auto disk_ptr = getDisk(cache_disk_name);
if (!disk_ptr)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' is not found", cache_disk_name);
Expand All @@ -896,7 +930,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t

shared->tmp_path = file_cache->getBasePath();
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, file_cache.get(), max_size);
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, file_cache.get(), max_size);
}

void Context::setFlagsPath(const String & path)
Expand Down Expand Up @@ -3335,7 +3369,7 @@ void Context::shutdown()
}

/// Special volumes might also use disks that require shutdown.
auto & tmp_data = shared->temp_data_on_disk;
auto & tmp_data = shared->root_temp_data_on_disk;
if (tmp_data && tmp_data->getVolume())
{
auto & disks = tmp_data->getVolume()->getDisks();
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,10 @@ class Context: public std::enable_shared_from_this<Context>
/// A list of warnings about server configuration to place in `system.warnings` table.
Strings getWarnings() const;

VolumePtr getTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk`
VolumePtr getGlobalTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk`

TemporaryDataOnDiskScopePtr getTempDataOnDisk() const;
TemporaryDataOnDiskScopePtr getSharedTempDataOnDisk() const;
void setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_);

void setPath(const String & path);
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/JoinedTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se

auto settings = context->getSettingsRef();
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
auto table_join = std::make_shared<TableJoin>(settings, context->getTemporaryVolume());
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume());

const ASTTablesInSelectQueryElement * ast_join = select_query.join();
const auto & table_to_join = ast_join->table_expression->as<ASTTableExpression &>();
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const

void MergeJoin::initRightTableWriter()
{
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getTemporaryVolume(),
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getGlobalTemporaryVolume(),
right_sample_block, right_sort_description, max_rows_in_right_block, max_files_to_merge,
table_join->temporaryFilesCodec());
disk_writer->addBlocks(right_blocks);
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ProcessList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ ProcessListForUser::ProcessListForUser(ContextPtr global_context, ProcessList *
if (global_context)
{
size_t size_limit = global_context->getSettingsRef().max_temporary_data_on_disk_size_for_user;
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getTempDataOnDisk(), size_limit);
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getSharedTempDataOnDisk(), size_limit);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/TableJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class TableJoin
JoinStrictness strictness() const { return table_join.strictness; }
bool sameStrictnessAndKind(JoinStrictness, JoinKind) const;
const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }

bool isEnabledAlgorithm(JoinAlgorithm val) const
{
Expand Down
2 changes: 1 addition & 1 deletion src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
}
}

auto table_join = std::make_shared<TableJoin>(settings, query_context->getTemporaryVolume());
auto table_join = std::make_shared<TableJoin>(settings, query_context->getGlobalTemporaryVolume());
table_join->getTableJoin() = join_node.toASTTableJoin()->as<ASTTableJoin &>();
table_join->getTableJoin().kind = join_kind;

Expand Down
2 changes: 1 addition & 1 deletion src/Server/HTTPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ void HTTPHandler::processQuery(

if (buffer_until_eof)
{
const std::string tmp_path(server.context()->getTemporaryVolume()->getDisk()->getPath());
const std::string tmp_path(server.context()->getGlobalTemporaryVolume()->getDisk()->getPath());
const std::string tmp_path_template(fs::path(tmp_path) / "http_buffers/");

auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->compression_codec = global_ctx->data->getCompressionCodecForPart(
global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge);

ctx->tmp_disk = global_ctx->context->getTemporaryVolume()->getDisk();
ctx->tmp_disk = global_ctx->context->getGlobalTemporaryVolume()->getDisk();

switch (global_ctx->chosen_merge_algorithm)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ namespace

void StorageMemory::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & /* partitions */)
{
auto temp_disk = backup_entries_collector.getContext()->getTemporaryVolume()->getDisk(0);
auto temp_disk = backup_entries_collector.getContext()->getGlobalTemporaryVolume()->getDisk(0);
auto max_compress_block_size = backup_entries_collector.getContext()->getSettingsRef().max_compress_block_size;
backup_entries_collector.addBackupEntries(
std::make_shared<MemoryBackup>(getInMemoryMetadataPtr(), data.get(), data_path_in_backup, temp_disk, max_compress_block_size)
Expand All @@ -417,7 +417,7 @@ void StorageMemory::restoreDataFromBackup(RestorerFromBackup & restorer, const S
if (!restorer.isNonEmptyTableAllowed() && total_size_bytes)
RestorerFromBackup::throwTableIsNotEmpty(getStorageID());

auto temp_disk = restorer.getContext()->getTemporaryVolume()->getDisk(0);
auto temp_disk = restorer.getContext()->getGlobalTemporaryVolume()->getDisk(0);

restorer.addDataRestoreTask(
[storage = std::static_pointer_cast<StorageMemory>(shared_from_this()), backup, data_path_in_backup, temp_disk]
Expand Down