Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better updated ports check in processors #7988

Merged
merged 10 commits into from
Dec 5, 2019
238 changes: 127 additions & 111 deletions dbms/src/Processors/Executors/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <ext/scope_guard.h>
#include <Common/CurrentThread.h>

#include <boost/lockfree/queue.hpp>
#include <Common/Stopwatch.h>
#include <Processors/ISource.h>
#include <Common/setThreadName.h>
Expand Down Expand Up @@ -52,26 +51,25 @@ bool PipelineExecutor::addEdges(UInt64 node)

const IProcessor * cur = graph[node].processor;

auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges)
auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges,
bool is_backward, UInt64 input_port_number, UInt64 output_port_number,
std::vector<void *> * update_list)
{
auto it = processors_map.find(to_proc);
if (it == processors_map.end())
throwUnknownProcessor(to_proc, cur, true);

UInt64 proc_num = it->second;
Edge * edge_ptr = nullptr;

for (auto & edge : edges)
if (edge.to == proc_num)
edge_ptr = &edge;

if (!edge_ptr)
{
edge_ptr = &edges.emplace_back();
edge_ptr->to = proc_num;
if (edge.to == proc_num)
throw Exception("Multiple edges are not allowed for the same processors.", ErrorCodes::LOGICAL_ERROR);
}

from_port.setVersion(&edge_ptr->version);
auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number, update_list);

from_port.setUpdateInfo(&edge.update_info);
};

bool was_edge_added = false;
Expand All @@ -83,10 +81,11 @@ bool PipelineExecutor::addEdges(UInt64 node)
{
was_edge_added = true;

for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it)
for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input)
{
const IProcessor * proc = &it->getOutputPort().getProcessor();
add_edge(*it, proc, graph[node].backEdges);
auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort());
add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, &graph[node].post_updated_input_ports);
}
}

Expand All @@ -97,10 +96,11 @@ bool PipelineExecutor::addEdges(UInt64 node)
{
was_edge_added = true;

for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it)
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output)
{
const IProcessor * proc = &it->getInputPort().getProcessor();
add_edge(*it, proc, graph[node].directEdges);
auto input_port_number = proc->getInputPortNumber(&it->getInputPort());
add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, &graph[node].post_updated_output_ports);
}
}

Expand Down Expand Up @@ -131,6 +131,7 @@ void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
if (graph[proc].directEdges.empty())
{
stack.push(proc);
/// do not lock mutex, as this function is executedin single thread
graph[proc].status = ExecStatus::Preparing;
}
}
Expand Down Expand Up @@ -195,9 +196,20 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
UInt64 num_processors = processors.size();
for (UInt64 node = 0; node < num_processors; ++node)
{
size_t num_direct_edges = graph[node].directEdges.size();
size_t num_back_edges = graph[node].backEdges.size();

if (addEdges(node))
{
if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New)
std::lock_guard guard(graph[node].status_mutex);

for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges)
graph[node].updated_input_ports.emplace_back(num_back_edges);

for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges)
graph[node].updated_output_ports.emplace_back(num_direct_edges);

if (graph[node].status == ExecStatus::Idle)
{
graph[node].status = ExecStatus::Preparing;
stack.push(node);
Expand All @@ -212,138 +224,142 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stac

auto & node = graph[edge.to];

ExecStatus status = node.status.load();
std::lock_guard guard(node.status_mutex);

/// Don't add processor if nothing was read from port.
if (status != ExecStatus::New && edge.version == edge.prev_version)
return false;
ExecStatus status = node.status;

if (status == ExecStatus::Finished)
return false;

/// Signal that node need to be prepared.
node.need_to_be_prepared = true;
edge.prev_version = edge.version;

/// Try to get ownership for node.
if (edge.backward)
node.updated_output_ports.push_back(edge.output_port_number);
else
node.updated_input_ports.push_back(edge.input_port_number);

/// Assume that current status is New or Idle. Otherwise, can't prepare node.
if (status != ExecStatus::New)
status = ExecStatus::Idle;

/// Statuses but New and Idle are not interesting because they own node.
/// Prepare will be called in owning thread before changing status.
while (!node.status.compare_exchange_weak(status, ExecStatus::Preparing))
if (!(status == ExecStatus::New || status == ExecStatus::Idle) || !node.need_to_be_prepared)
return false;

stack.push(edge.to);
return true;
if (status == ExecStatus::Idle)
{
node.status = ExecStatus::Preparing;
stack.push(edge.to);
return true;
}

return false;
}

bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async)
{
/// In this method we have ownership on node.
auto & node = graph[pid];

bool need_traverse = false;
bool need_expand_pipeline = false;

std::vector<Edge *> updated_back_edges;
std::vector<Edge *> updated_direct_edges;

{
/// Stopwatch watch;

/// Disable flag before prepare call. Otherwise, we can skip prepare request.
/// Prepare can be called more times than needed, but it's ok.
node.need_to_be_prepared = false;
std::lock_guard guard(node.status_mutex);

auto status = node.processor->prepare();
auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
node.updated_input_ports.clear();
node.updated_output_ports.clear();

/// node.execution_state->preparation_time_ns += watch.elapsed();
node.last_processor_status = status;
}

auto add_neighbours_to_prepare_queue = [&] ()
{
for (auto & edge : node.backEdges)
tryAddProcessorToStackIfUpdated(edge, parents);

for (auto & edge : node.directEdges)
tryAddProcessorToStackIfUpdated(edge, children);
};
switch (node.last_processor_status)
{
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
need_traverse = true;
node.status = ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
need_traverse = true;
node.status = ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
node.status = ExecStatus::Executing;
return true;
}
case IProcessor::Status::Async:
{
throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);

auto try_release_ownership = [&] ()
{
/// This function can be called after expand pipeline, where node from outer scope is not longer valid.
auto & node_ = graph[pid];
ExecStatus expected = ExecStatus::Idle;
node_.status = ExecStatus::Idle;
// node.status = ExecStatus::Executing;
// addAsyncJob(pid);
// break;
}
case IProcessor::Status::Wait:
{
if (!async)
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
break;
}
case IProcessor::Status::ExpandPipeline:
{
need_expand_pipeline = true;
break;
}
}

if (node_.need_to_be_prepared)
if (need_traverse)
{
while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing))
if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared)
return;
for (auto & edge_id : node.post_updated_input_ports)
{
auto edge = static_cast<Edge *>(edge_id);
updated_back_edges.emplace_back(edge);
edge->update_info.trigger();
}

for (auto & edge_id : node.post_updated_output_ports)
{
auto edge = static_cast<Edge *>(edge_id);
updated_direct_edges.emplace_back(edge);
edge->update_info.trigger();
}

children.push(pid);
node.post_updated_input_ports.clear();
node.post_updated_output_ports.clear();
}
};
}

switch (node.last_processor_status)
if (need_traverse)
{
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
add_neighbours_to_prepare_queue();
try_release_ownership();
for (auto & edge : updated_back_edges)
tryAddProcessorToStackIfUpdated(*edge, parents);

break;
}
case IProcessor::Status::Finished:
{
add_neighbours_to_prepare_queue();
node.status = ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
node.status = ExecStatus::Executing;
return true;
}
case IProcessor::Status::Async:
{
throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
for (auto & edge : updated_direct_edges)
tryAddProcessorToStackIfUpdated(*edge, children);
}

// node.status = ExecStatus::Executing;
// addAsyncJob(pid);
// break;
}
case IProcessor::Status::Wait:
{
if (!async)
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
break;
}
case IProcessor::Status::ExpandPipeline:
{
executor_contexts[thread_number]->task_list.emplace_back(
if (need_expand_pipeline)
{
executor_contexts[thread_number]->task_list.emplace_back(
node.execution_state.get(),
&parents
);
);

ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
ExpandPipelineTask * expected = nullptr;
ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
ExpandPipelineTask * expected = nullptr;

while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
{
doExpandPipeline(expected, true);
expected = nullptr;
}
while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
{
doExpandPipeline(expected, true);
expected = nullptr;
}

doExpandPipeline(desired, true);
doExpandPipeline(desired, true);

/// node is not longer valid after pipeline was expanded
graph[pid].need_to_be_prepared = true;
try_release_ownership();
break;
}
/// Add itself back to be prepared again.
children.push(pid);
}

return false;
Expand Down Expand Up @@ -427,7 +443,7 @@ void PipelineExecutor::execute(size_t num_threads)

bool all_processors_finished = true;
for (auto & node : graph)
if (node.status != ExecStatus::Finished)
if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex
all_processors_finished = false;

if (!all_processors_finished)
Expand Down