Skip to content

Commit

Permalink
Add tidb executor id in Join's log (pingcap#7964)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored and JaySon-Huang committed Aug 18, 2023
1 parent cf80175 commit be77811
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 14 deletions.
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Expand Up @@ -346,6 +346,7 @@ void DAGQueryBlockInterpreter::handleJoin(
match_helper_name,
flag_mapped_entry_helper_name,
0,
0,
context.isTest());

recordJoinExecuteInfo(tiflash_join.build_side_index, join_ptr);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/PhysicalPlanNode.cpp
Expand Up @@ -35,7 +35,7 @@ PhysicalPlanNode::PhysicalPlanNode(
, type(type_)
, schema(schema_)
, fine_grained_shuffle(fine_grained_shuffle_)
, log(Logger::get(req_id, type_.toString(), executor_id_))
, log(Logger::get(fmt::format("{}_{}_{}", req_id, type_.toString(), executor_id_)))
{}

String PhysicalPlanNode::toString()
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -131,9 +131,10 @@ PhysicalPlanNodePtr PhysicalJoin::build(

const Settings & settings = context.getSettingsRef();
size_t max_bytes_before_external_join = settings.max_bytes_before_external_join;
auto join_req_id = fmt::format("{}_{}", log->identifier(), executor_id);
SpillConfig build_spill_config(
context.getTemporaryPath(),
fmt::format("{}_hash_join_0_build", log->identifier()),
fmt::format("{}_0_build", join_req_id),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand All @@ -142,7 +143,7 @@ PhysicalPlanNodePtr PhysicalJoin::build(
settings.max_block_size);
SpillConfig probe_spill_config(
context.getTemporaryPath(),
fmt::format("{}_hash_join_0_probe", log->identifier()),
fmt::format("{}_0_probe", join_req_id),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand All @@ -169,7 +170,7 @@ PhysicalPlanNodePtr PhysicalJoin::build(
build_key_names,
tiflash_join.kind,
tiflash_join.strictness,
log->identifier(),
join_req_id,
fine_grained_shuffle.enable(),
fine_grained_shuffle.stream_count,
max_bytes_before_external_join,
Expand All @@ -184,6 +185,7 @@ PhysicalPlanNodePtr PhysicalJoin::build(
match_helper_name,
flag_mapped_entry_helper_name,
0,
0,
context.isTest(),
runtime_filter_list);

Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Interpreters/Join.cpp
Expand Up @@ -141,6 +141,7 @@ Join::Join(
const String & match_helper_name_,
const String & flag_mapped_entry_helper_name_,
size_t restore_round_,
size_t restore_part,
bool is_test_,
const std::vector<RuntimeFilterPtr> & runtime_filter_list_)
: restore_round(restore_round_)
Expand All @@ -149,6 +150,7 @@ Join::Join(
, kind(kind_)
, strictness(strictness_)
, original_strictness(strictness)
, join_req_id(req_id)
, may_probe_side_expanded_after_join(mayProbeSideExpandedAfterJoin(kind, strictness))
, key_names_left(key_names_left_)
, key_names_right(key_names_right_)
Expand All @@ -166,7 +168,9 @@ Join::Join(
: std::max(1, max_block_size / 10))
, tidb_output_column_names(tidb_output_column_names_)
, is_test(is_test_)
, log(Logger::get(req_id))
, log(Logger::get(
restore_round == 0 ? join_req_id
: fmt::format("{}_round_{}_part_{}", join_req_id, restore_round, restore_part)))
, enable_fine_grained_shuffle(enable_fine_grained_shuffle_)
, fine_grained_shuffle_count(fine_grained_shuffle_count_)
{
Expand Down Expand Up @@ -348,21 +352,19 @@ void Join::setSampleBlock(const Block & block)
sample_block_with_columns_to_add.insert(ColumnWithTypeAndName(Join::match_helper_type, match_helper_name));
}

std::shared_ptr<Join> Join::createRestoreJoin(size_t max_bytes_before_external_join_)
std::shared_ptr<Join> Join::createRestoreJoin(size_t max_bytes_before_external_join_, size_t restore_partition_id)
{
return std::make_shared<Join>(
key_names_left,
key_names_right,
kind,
original_strictness,
log->identifier(),
join_req_id,
false,
0,
max_bytes_before_external_join_,
hash_join_spill_context->createBuildSpillConfig(
fmt::format("{}_hash_join_{}_build", log->identifier(), restore_round + 1)),
hash_join_spill_context->createProbeSpillConfig(
fmt::format("{}_hash_join_{}_probe", log->identifier(), restore_round + 1)),
hash_join_spill_context->createBuildSpillConfig(fmt::format("{}_{}_build", join_req_id, restore_round + 1)),
hash_join_spill_context->createProbeSpillConfig(fmt::format("{}_{}_probe", join_req_id, restore_round + 1)),
join_restore_concurrency,
tidb_output_column_names,
collators,
Expand All @@ -372,6 +374,7 @@ std::shared_ptr<Join> Join::createRestoreJoin(size_t max_bytes_before_external_j
match_helper_name,
flag_mapped_entry_helper_name,
restore_round + 1,
restore_partition_id,
is_test);
}

Expand Down Expand Up @@ -2183,7 +2186,7 @@ std::optional<RestoreInfo> Join::getOneRestoreStream(size_t max_block_size_)
auto new_max_bytes_before_external_join = static_cast<size_t>(
hash_join_spill_context->getOperatorSpillThreshold()
* (static_cast<double>(restore_join_build_concurrency) / build_concurrency));
restore_join = createRestoreJoin(std::max(1, new_max_bytes_before_external_join));
restore_join = createRestoreJoin(std::max(1, new_max_bytes_before_external_join), spilled_partition_index);
restore_join->initBuild(build_sample_block, restore_join_build_concurrency);
restore_join->setInitActiveBuildThreads();
restore_join->initProbe(probe_sample_block, restore_join_build_concurrency);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/Join.h
Expand Up @@ -172,6 +172,7 @@ class Join
const String & match_helper_name_ = "",
const String & flag_mapped_entry_helper_name_ = "",
size_t restore_round = 0,
size_t restore_partition_id = 0,
bool is_test = true,
const std::vector<RuntimeFilterPtr> & runtime_filter_list_ = dummy_runtime_filter_list);

Expand Down Expand Up @@ -315,6 +316,7 @@ class Join
ASTTableJoin::Strictness strictness;
bool has_other_condition;
ASTTableJoin::Strictness original_strictness;
String join_req_id;
const bool may_probe_side_expanded_after_join;

/// Names of key columns (columns for equi-JOIN) in "left" table (in the order they appear in USING clause).
Expand Down Expand Up @@ -481,7 +483,7 @@ class Join
void releaseAllPartitions();

void spillMostMemoryUsedPartitionIfNeed(size_t stream_index);
std::shared_ptr<Join> createRestoreJoin(size_t max_bytes_before_external_join_);
std::shared_ptr<Join> createRestoreJoin(size_t max_bytes_before_external_join_, size_t restore_partition_id);

void workAfterBuildFinish(size_t stream_index);
void workAfterProbeFinish(size_t stream_index);
Expand Down

0 comments on commit be77811

Please sign in to comment.