Skip to content

Commit

Permalink
dump pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Jan 8, 2024
1 parent 6feafb6 commit 7fb6fa5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
26 changes: 26 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#include <Processors/Transforms/MaterializingTransform.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/printPipeline.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
Expand Down Expand Up @@ -2102,6 +2103,8 @@ SharedContextHolder SerializedPlanParser::shared_context;

LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Dump pipeline:\n{}", dumpPipeline());
if (spark_buffer)
{
ch_column_to_spark_row->freeMem(spark_buffer->address, spark_buffer->size);
Expand Down Expand Up @@ -2234,6 +2237,29 @@ LocalExecutor::LocalExecutor(QueryContext & _query_context, ContextPtr context_)
{
}

std::string LocalExecutor::dumpPipeline()
{
const auto & processors = query_pipeline.getProcessors();
for (auto & processor : processors)
{
DB::WriteBufferFromOwnString buffer;
auto data_stats = processor->getProcessorDataStats();
buffer << "(";
buffer << "\nexcution time: " << processor->getElapsedUs() << " us.";
buffer << "\ninput wait time: " << processor->getInputWaitElapsedUs() << " us.";
buffer << "\noutput wait time: " << processor->getOutputWaitElapsedUs() << " us.";
buffer << "\ninput rows: " << data_stats.input_rows;
buffer << "\ninput bytes: " << data_stats.input_bytes;
buffer << "\noutput rows: " << data_stats.output_rows;
buffer << "\noutput bytes: " << data_stats.output_bytes;
buffer << ")";
processor->setDescription(buffer.str());
}
DB::WriteBufferFromOwnString out;
DB::printPipeline(processors, out);
return out.str();
}

NonNullableColumnsResolver::NonNullableColumnsResolver(
const DB::Block & header_, SerializedPlanParser & parser_, const substrait::Expression & cond_rel_)
: header(header_), parser(parser_), cond_rel(cond_rel_)
Expand Down
3 changes: 3 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ class LocalExecutor : public BlockIterator
DB::QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;

/// Dump processor runtime information to log
std::string dumpPipeline();
};


Expand Down

0 comments on commit 7fb6fa5

Please sign in to comment.