Skip to content
Permalink
Browse files
[feature] (vec) instead of converting line to src tuple for stream lo…
…ad in vectorized. (#9314)

Co-authored-by: xiepengcheng01 <xiepengcheng01@xafj-palo-rpm64.xafj.baidu.com>
  • Loading branch information
xiepengcheng01 and xiepengcheng01 committed May 9, 2022
1 parent d1b85d5 commit eec1dfde3a6b93eccc28fb1c0b58233987a2b856
Showing 11 changed files with 350 additions and 346 deletions.
@@ -107,10 +107,18 @@ Status BaseScanner::init_expr_ctxes() {

// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
if (!_pre_filter_texprs.empty()) {
RETURN_IF_ERROR(
Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs));
RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
if (_state->enable_vectorized_exec()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
_state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
_mem_tracker));
RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
} else {
RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
&_pre_filter_ctxs));
RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
}
}

// Construct dest slots information
@@ -133,11 +141,22 @@ Status BaseScanner::init_expr_ctxes() {
<< ", name=" << slot_desc->col_name();
return Status::InternalError(ss.str());
}
ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);

if (_state->enable_vectorized_exec()) {
vectorized::VExprContext* ctx = nullptr;
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_vexpr_ctx.emplace_back(ctx);
} else {
ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
}

if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
@@ -284,6 +303,10 @@ void BaseScanner::close() {
if (!_pre_filter_ctxs.empty()) {
Expr::close(_pre_filter_ctxs, _state);
}

if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
vectorized::VExpr::close(_vpre_filter_ctxs, _state);
}
}

} // namespace doris
@@ -20,6 +20,7 @@

#include "common/status.h"
#include "exprs/expr.h"
#include "vec/exprs/vexpr.h"
#include "runtime/tuple.h"
#include "util/runtime_profile.h"

@@ -52,7 +53,12 @@ class BaseScanner {
public:
BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); };
virtual ~BaseScanner() {
Expr::close(_dest_expr_ctx, _state);
if (_state->enable_vectorized_exec()) {
vectorized::VExpr::close(_dest_vexpr_ctx, _state);
}
};

virtual Status init_expr_ctxes();
// Open this scanner, will initialize information need to
@@ -62,8 +68,8 @@ class BaseScanner {
virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) = 0;

// Get next block
virtual Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) {
return Status::NotSupported("Not Implemented get next");
virtual Status get_next(vectorized::Block* block, bool* eof) {
return Status::NotSupported("Not Implemented get block");
}

// Close this scanner
@@ -95,6 +101,9 @@ class BaseScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
// for vectorized
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
@@ -468,8 +468,7 @@ Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool*
return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
}

// Convert one row to this tuple
Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
Status BrokerScanner::_line_split_to_values(const Slice& line) {
bool is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
if (!is_proto_format && !validate_utf8(line.data, line.size)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
@@ -546,6 +545,17 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
}
}

_success = true;
return Status::OK();
}

// Convert one row to this tuple
Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
RETURN_IF_ERROR(_line_split_to_values(line));
if (!_success) {
return Status::OK();
}

for (int i = 0; i < _split_values.size(); ++i) {
auto slot_desc = _src_slot_descs[i];
const Slice& value = _split_values[i];
@@ -560,11 +570,11 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
str_slot->len = value.size;
}

const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.num_of_columns_from_file) {
fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path);
fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path);
}

_success = true;
return Status::OK();
}

@@ -65,8 +65,8 @@ class BrokerScanner : public BaseScanner {
virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof,
bool* fill_tuple) override;

Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) override {
return Status::NotSupported("Not Implemented get columns");
Status get_next(vectorized::Block* block, bool* eof) override {
return Status::NotSupported("Not Implemented get block");
}

// Close this scanner
@@ -78,6 +78,8 @@ class BrokerScanner : public BaseScanner {

Status _line_to_src_tuple(const Slice& line);

Status _line_split_to_values(const Slice& line);

private:
Status open_file_reader();
Status create_decompressor(TFileFormatType::type type);
@@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
return Status::OK();
}

std::shared_ptr<vectorized::Block> scanner_block;
{
std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_runtime_state->is_cancelled() &&
_num_running_scanners > 0 && _block_queue.empty()) {
SCOPED_TIMER(_wait_scanner_timer);
_queue_reader_cond.wait_for(l, std::chrono::seconds(1));
}
if (!_process_status.ok()) {
// Some scanner process failed.
return _process_status;
const int batch_size = _runtime_state->batch_size();
while (true) {
std::shared_ptr<vectorized::Block> scanner_block;
{
std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_runtime_state->is_cancelled() &&
_num_running_scanners > 0 && _block_queue.empty()) {
SCOPED_TIMER(_wait_scanner_timer);
_queue_reader_cond.wait_for(l, std::chrono::seconds(1));
}
if (!_process_status.ok()) {
// Some scanner process failed.
return _process_status;
}
if (_runtime_state->is_cancelled()) {
if (update_status(Status::Cancelled("Cancelled"))) {
_queue_writer_cond.notify_all();
}
return _process_status;
}
if (!_block_queue.empty()) {
scanner_block = _block_queue.front();
_block_queue.pop_front();
}
}
if (_runtime_state->is_cancelled()) {
if (update_status(Status::Cancelled("Cancelled"))) {
_queue_writer_cond.notify_all();

// All scanner has been finished, and all cached batch has been read
if (!scanner_block) {
if (_mutable_block && !_mutable_block->empty()) {
*block = _mutable_block->to_block();
reached_limit(block, eos);
LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit.";
}
return _process_status;
_scan_finished.store(true);
*eos = true;
return Status::OK();
}
if (!_block_queue.empty()) {
scanner_block = _block_queue.front();
_block_queue.pop_front();
// notify one scanner
_queue_writer_cond.notify_one();

if (UNLIKELY(!_mutable_block)) {
_mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
}
}

// All scanner has been finished, and all cached batch has been read
if (scanner_block == nullptr) {
_scan_finished.store(true);
*eos = true;
return Status::OK();
if (_mutable_block->rows() + scanner_block->rows() < batch_size) {
// merge scanner_block into _mutable_block
_mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
continue;
} else {
if (_mutable_block->empty()) {
// directly use scanner_block
*block = *scanner_block;
} else {
// copy _mutable_block firstly, then merge scanner_block into _mutable_block for next.
*block = _mutable_block->to_block();
_mutable_block->set_muatable_columns(scanner_block->clone_empty_columns());
_mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
}
break;
}
}

// notify one scanner
_queue_writer_cond.notify_one();

reached_limit(scanner_block.get(), eos);
*block = *scanner_block;

reached_limit(block, eos);
if (*eos) {
_scan_finished.store(true);
_queue_writer_cond.notify_all();
@@ -120,75 +146,53 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
RETURN_IF_ERROR(scanner->open());
bool scanner_eof = false;

const int batch_size = _runtime_state->batch_size();
size_t slot_num = _tuple_desc->slots().size();

while (!scanner_eof) {
std::shared_ptr<vectorized::Block> block(new vectorized::Block());
std::vector<vectorized::MutableColumnPtr> columns(slot_num);
for (int i = 0; i < slot_num; i++) {
columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
RETURN_IF_CANCELLED(_runtime_state);
// If we have finished all works
if (_scan_finished.load() || !_process_status.ok()) {
return Status::OK();
}

while (columns[0]->size() < batch_size && !scanner_eof) {
RETURN_IF_CANCELLED(_runtime_state);
// If we have finished all works
if (_scan_finished.load()) {
return Status::OK();
}

RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
if (scanner_eof) {
break;
}
std::shared_ptr<vectorized::Block> block(new vectorized::Block());
RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
if (block->rows() == 0) {
continue;
}
auto old_rows = block->rows();
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
_tuple_desc->slots().size()));
counter->num_rows_unselected += old_rows - block->rows();
if (block->rows() == 0) {
continue;
}

if (!columns[0]->empty()) {
auto n_columns = 0;
for (const auto slot_desc : _tuple_desc->slots()) {
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}

auto old_rows = block->rows();

RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
_tuple_desc->slots().size()));

counter->num_rows_unselected += old_rows - block->rows();

std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_scan_finished.load() &&
!_runtime_state->is_cancelled() &&
// stop pushing more batch if
// 1. too many batches in queue, or
// 2. at least one batch in queue and memory exceed limit.
(_block_queue.size() >= _max_buffered_batches ||
(mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
// Process already set failed, so we just return OK
if (!_process_status.ok()) {
return Status::OK();
}
// Scan already finished, just return
if (_scan_finished.load()) {
return Status::OK();
}
// Runtime state is canceled, just return cancel
if (_runtime_state->is_cancelled()) {
return Status::Cancelled("Cancelled");
}
// Queue size Must be smaller than _max_buffered_batches
_block_queue.push_back(block);

// Notify reader to
_queue_reader_cond.notify_one();
std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() &&
// stop pushing more batch if
// 1. too many batches in queue, or
// 2. at least one batch in queue and memory exceed limit.
(_block_queue.size() >= _max_buffered_batches ||
(mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
}
// Process already set failed, so we just return OK
if (!_process_status.ok()) {
return Status::OK();
}
// Scan already finished, just return
if (_scan_finished.load()) {
return Status::OK();
}
// Runtime state is canceled, just return cancel
if (_runtime_state->is_cancelled()) {
return Status::Cancelled("Cancelled");
}
// Queue size Must be smaller than _max_buffered_batches
_block_queue.push_back(block);

// Notify reader to
_queue_reader_cond.notify_one();
}
return Status::OK();
}

@@ -51,6 +51,7 @@ class VBrokerScanNode final : public BrokerScanNode {
Status scanner_scan(const TBrokerScanRange& scan_range, ScannerCounter* counter);

std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
std::unique_ptr<MutableBlock> _mutable_block;
};
} // namespace vectorized
} // namespace doris

0 comments on commit eec1dfd

Please sign in to comment.