Skip to content

Commit

Permalink
[BugFix] Acquire rowsets at querying (#13830)
Browse files Browse the repository at this point in the history
(cherry picked from commit c0a1b94)

# Conflicts:
#	be/src/exec/pipeline/scan/olap_scan_context.cpp
#	be/src/exec/pipeline/scan/olap_scan_context.h
#	be/src/exec/pipeline/scan/olap_scan_prepare_operator.cpp
#	be/src/exec/pipeline/scan/olap_scan_prepare_operator.h
  • Loading branch information
ZiheLiu authored and mergify[bot] committed Nov 22, 2022
1 parent aa56518 commit 6c5e2f8
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 0 deletions.
132 changes: 132 additions & 0 deletions be/src/exec/pipeline/scan/olap_scan_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

#include "exec/pipeline/scan/olap_scan_context.h"

#include "exec/vectorized/olap_scan_node.h"
#include "exprs/vectorized/runtime_filter_bank.h"
#include "storage/tablet.h"

namespace starrocks::pipeline {

using namespace vectorized;

/// OlapScanContext.
void OlapScanContext::attach_shared_input(int32_t operator_seq, int32_t source_index) {
auto key = std::make_pair(operator_seq, source_index);
VLOG_ROW << fmt::format("attach_shared_input ({}, {}), active {}", operator_seq, source_index,
_active_inputs.size());
_active_inputs.emplace(key);
}

void OlapScanContext::detach_shared_input(int32_t operator_seq, int32_t source_index) {
auto key = std::make_pair(operator_seq, source_index);
VLOG_ROW << fmt::format("detach_shared_input ({}, {}), remain {}", operator_seq, source_index,
_active_inputs.size());
_active_inputs.erase(key);
}

bool OlapScanContext::has_active_input() const {
return !_active_inputs.empty();
}

BalancedChunkBuffer& OlapScanContext::get_shared_buffer() {
return _chunk_buffer;
}

Status OlapScanContext::prepare(RuntimeState* state) {
return Status::OK();
}

void OlapScanContext::close(RuntimeState* state) {
_chunk_buffer.close();
for (const auto& rowsets_per_tablet : _tablet_rowsets) {
Rowset::release_readers(rowsets_per_tablet);
}
}

Status OlapScanContext::capture_tablet_rowsets(const std::vector<TInternalScanRange*>& olap_scan_ranges) {
_tablet_rowsets.resize(olap_scan_ranges.size());
_tablets.resize(olap_scan_ranges.size());
for (int i = 0; i < olap_scan_ranges.size(); ++i) {
auto* scan_range = olap_scan_ranges[i];

int64_t version = strtoul(scan_range->version.c_str(), nullptr, 10);
ASSIGN_OR_RETURN(TabletSharedPtr tablet, vectorized::OlapScanNode::get_tablet(scan_range));

// Capture row sets of this version tablet.
{
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &_tablet_rowsets[i]));
Rowset::acquire_readers(_tablet_rowsets[i]);
}

_tablets[i] = std::move(tablet);
}

return Status::OK();
}

Status OlapScanContext::parse_conjuncts(RuntimeState* state, const std::vector<ExprContext*>& runtime_in_filters,
RuntimeFilterProbeCollector* runtime_bloom_filters) {
const TOlapScanNode& thrift_olap_scan_node = _scan_node->thrift_olap_scan_node();
const TupleDescriptor* tuple_desc = state->desc_tbl().get_tuple_descriptor(thrift_olap_scan_node.tuple_id);

// Get _conjunct_ctxs.
_conjunct_ctxs = _scan_node->conjunct_ctxs();
_conjunct_ctxs.insert(_conjunct_ctxs.end(), runtime_in_filters.begin(), runtime_in_filters.end());

// eval_const_conjuncts.
Status status;
RETURN_IF_ERROR(vectorized::OlapScanConjunctsManager::eval_const_conjuncts(_conjunct_ctxs, &status));
if (!status.ok()) {
return status;
}

// Init _conjuncts_manager.
vectorized::OlapScanConjunctsManager& cm = _conjuncts_manager;
cm.conjunct_ctxs_ptr = &_conjunct_ctxs;
cm.tuple_desc = tuple_desc;
cm.obj_pool = &_obj_pool;
cm.key_column_names = &thrift_olap_scan_node.key_column_name;
cm.runtime_filters = runtime_bloom_filters;
cm.runtime_state = state;

const TQueryOptions& query_options = state->query_options();
int32_t max_scan_key_num;
if (query_options.__isset.max_scan_key_num && query_options.max_scan_key_num > 0) {
max_scan_key_num = query_options.max_scan_key_num;
} else {
max_scan_key_num = config::max_scan_key_num;
}
bool enable_column_expr_predicate = false;
if (thrift_olap_scan_node.__isset.enable_column_expr_predicate) {
enable_column_expr_predicate = thrift_olap_scan_node.enable_column_expr_predicate;
}

// Parse conjuncts via _conjuncts_manager.
RETURN_IF_ERROR(cm.parse_conjuncts(true, max_scan_key_num, enable_column_expr_predicate));

// Get key_ranges and not_push_down_conjuncts from _conjuncts_manager.
RETURN_IF_ERROR(_conjuncts_manager.get_key_ranges(&_key_ranges));
_conjuncts_manager.get_not_push_down_conjuncts(&_not_push_down_conjuncts);

_dict_optimize_parser.set_mutable_dict_maps(state, state->mutable_query_global_dict_map());
_dict_optimize_parser.rewrite_conjuncts(&_not_push_down_conjuncts, state);

return Status::OK();
}

/// OlapScanContextFactory.
OlapScanContextPtr OlapScanContextFactory::get_or_create(int32_t driver_sequence) {
DCHECK_LT(driver_sequence, _dop);
// ScanOperators sharing one morsel use the same context.
int32_t idx = _shared_morsel_queue ? 0 : driver_sequence;
DCHECK_LT(idx, _contexts.size());

if (_contexts[idx] == nullptr) {
_contexts[idx] = std::make_shared<OlapScanContext>(_scan_node, _dop, _shared_scan, _chunk_buffer);
}
return _contexts[idx];
}

} // namespace starrocks::pipeline
125 changes: 125 additions & 0 deletions be/src/exec/pipeline/scan/olap_scan_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

#pragma once

#include <mutex>

#include "exec/pipeline/context_with_dependency.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/vectorized/olap_scan_prepare.h"
#include "runtime/global_dict/parser.h"
#include "util/phmap/phmap_fwd_decl.h"

namespace starrocks {

class ScanNode;
class Tablet;
using TabletSharedPtr = std::shared_ptr<Tablet>;
class Rowset;
using RowsetSharedPtr = std::shared_ptr<Rowset>;

namespace vectorized {
class RuntimeFilterProbeCollector;
}

namespace pipeline {

class OlapScanContext;
using OlapScanContextPtr = std::shared_ptr<OlapScanContext>;
class OlapScanContextFactory;
using OlapScanContextFactoryPtr = std::shared_ptr<OlapScanContextFactory>;

using namespace vectorized;

class OlapScanContext final : public ContextWithDependency {
public:
explicit OlapScanContext(vectorized::OlapScanNode* scan_node, int32_t dop, bool shared_scan,
BalancedChunkBuffer& chunk_buffer)
: _scan_node(scan_node), _chunk_buffer(chunk_buffer), _shared_scan(shared_scan) {}

Status prepare(RuntimeState* state);
void close(RuntimeState* state) override;

void set_prepare_finished() { _is_prepare_finished.store(true, std::memory_order_release); }
bool is_prepare_finished() const { return _is_prepare_finished.load(std::memory_order_acquire); }

Status parse_conjuncts(RuntimeState* state, const std::vector<ExprContext*>& runtime_in_filters,
RuntimeFilterProbeCollector* runtime_bloom_filters);

vectorized::OlapScanNode* scan_node() const { return _scan_node; }
vectorized::OlapScanConjunctsManager& conjuncts_manager() { return _conjuncts_manager; }
const std::vector<ExprContext*>& not_push_down_conjuncts() const { return _not_push_down_conjuncts; }
const std::vector<std::unique_ptr<OlapScanRange>>& key_ranges() const { return _key_ranges; }
BalancedChunkBuffer& get_chunk_buffer() { return _chunk_buffer; }

// Shared scan states
bool is_shared_scan() const { return _shared_scan; }
// Attach and detach to account the active input for shared chunk buffer
void attach_shared_input(int32_t operator_seq, int32_t source_index);
void detach_shared_input(int32_t operator_seq, int32_t source_index);
bool has_active_input() const;
BalancedChunkBuffer& get_shared_buffer();

Status capture_tablet_rowsets(const std::vector<TInternalScanRange*>& olap_scan_ranges);
const std::vector<TabletSharedPtr>& tablets() const { return _tablets; }
const std::vector<std::vector<RowsetSharedPtr>>& tablet_rowsets() const { return _tablet_rowsets; };

private:
vectorized::OlapScanNode* _scan_node;

std::vector<ExprContext*> _conjunct_ctxs;
vectorized::OlapScanConjunctsManager _conjuncts_manager;
// The conjuncts couldn't push down to storage engine
std::vector<ExprContext*> _not_push_down_conjuncts;
std::vector<std::unique_ptr<OlapScanRange>> _key_ranges;
vectorized::DictOptimizeParser _dict_optimize_parser;
ObjectPool _obj_pool;

// For shared_scan mechanism
using ActiveInputKey = std::pair<int32_t, int32_t>;
using ActiveInputSet = phmap::parallel_flat_hash_set<
ActiveInputKey, typename phmap::Hash<ActiveInputKey>, typename phmap::EqualTo<ActiveInputKey>,
typename std::allocator<ActiveInputKey>, NUM_LOCK_SHARD_LOG, std::mutex, true>;
BalancedChunkBuffer& _chunk_buffer; // Shared Chunk buffer for all scan operators, owned by OlapScanContextFactory.
ActiveInputSet _active_inputs; // Maintain the active chunksource
bool _shared_scan; // Enable shared_scan

std::atomic<bool> _is_prepare_finished{false};

// The row sets of tablets will become stale and be deleted, if compaction occurs
// and these row sets aren't referenced, which will typically happen when the tablets
// of the left table are compacted at building the right hash table. Therefore, reference
// the row sets into _tablet_rowsets in the preparation phase to avoid the row sets being deleted.
std::vector<TabletSharedPtr> _tablets;
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;
};

// OlapScanContextFactory creates different contexts for each scan operator, if _shared_scan is false.
// Otherwise, it outputs the same context for each scan operator.
class OlapScanContextFactory {
public:
OlapScanContextFactory(vectorized::OlapScanNode* const scan_node, int32_t dop, bool shared_morsel_queue,
bool shared_scan, ChunkBufferLimiterPtr chunk_buffer_limiter)
: _scan_node(scan_node),
_dop(dop),
_shared_morsel_queue(shared_morsel_queue),
_shared_scan(shared_scan),
_chunk_buffer(shared_scan ? BalanceStrategy::kRoundRobin : BalanceStrategy::kDirect, dop,
std::move(chunk_buffer_limiter)),
_contexts(shared_morsel_queue ? 1 : dop) {}

OlapScanContextPtr get_or_create(int32_t driver_sequence);

private:
vectorized::OlapScanNode* const _scan_node;
const int32_t _dop;
const bool _shared_morsel_queue; // Whether the scan operators share a morsel queue.
const bool _shared_scan; // Whether the scan operators share a chunk buffer.
BalancedChunkBuffer _chunk_buffer; // Shared Chunk buffer for all the scan operators.

std::vector<OlapScanContextPtr> _contexts;
};

} // namespace pipeline

} // namespace starrocks
99 changes: 99 additions & 0 deletions be/src/exec/pipeline/scan/olap_scan_prepare_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

#include "exec/pipeline/scan/olap_scan_prepare_operator.h"

#include "exec/vectorized/olap_scan_node.h"
#include "storage/storage_engine.h"

namespace starrocks::pipeline {

/// OlapScanPrepareOperator
OlapScanPrepareOperator::OlapScanPrepareOperator(OperatorFactory* factory, int32_t id, const string& name,
int32_t plan_node_id, int32_t driver_sequence, OlapScanContextPtr ctx)
: SourceOperator(factory, id, name, plan_node_id, driver_sequence), _ctx(std::move(ctx)) {
_ctx->ref();
}

OlapScanPrepareOperator::~OlapScanPrepareOperator() {
auto* state = runtime_state();
if (state == nullptr) {
return;
}

_ctx->unref(state);
}

Status OlapScanPrepareOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));

RETURN_IF_ERROR(_ctx->prepare(state));
RETURN_IF_ERROR(_ctx->capture_tablet_rowsets(_morsel_queue->olap_scan_ranges()));

return Status::OK();
}

void OlapScanPrepareOperator::close(RuntimeState* state) {
SourceOperator::close(state);
}

bool OlapScanPrepareOperator::has_output() const {
return !is_finished();
}

bool OlapScanPrepareOperator::is_finished() const {
return _ctx->is_prepare_finished() || _ctx->is_finished();
}

StatusOr<vectorized::ChunkPtr> OlapScanPrepareOperator::pull_chunk(RuntimeState* state) {
Status status = _ctx->parse_conjuncts(state, runtime_in_filters(), runtime_bloom_filters());

_morsel_queue->set_key_ranges(_ctx->key_ranges());
_morsel_queue->set_tablets(_ctx->tablets());
_morsel_queue->set_tablet_rowsets(_ctx->tablet_rowsets());

_ctx->set_prepare_finished();
if (!status.ok()) {
_ctx->set_finished();
return status;
}

return nullptr;
}

/// OlapScanPrepareOperatorFactory
OlapScanPrepareOperatorFactory::OlapScanPrepareOperatorFactory(int32_t id, int32_t plan_node_id,
vectorized::OlapScanNode* const scan_node,
OlapScanContextFactoryPtr ctx_factory)
: SourceOperatorFactory(id, "olap_scan_prepare", plan_node_id),
_scan_node(scan_node),
_ctx_factory(std::move(ctx_factory)) {}

Status OlapScanPrepareOperatorFactory::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperatorFactory::prepare(state));

const auto& conjunct_ctxs = _scan_node->conjunct_ctxs();
const auto& tolap_scan_node = _scan_node->thrift_olap_scan_node();
auto tuple_desc = state->desc_tbl().get_tuple_descriptor(tolap_scan_node.tuple_id);

vectorized::DictOptimizeParser::rewrite_descriptor(state, conjunct_ctxs, tolap_scan_node.dict_string_id_to_int_ids,
&(tuple_desc->decoded_slots()));

RETURN_IF_ERROR(Expr::prepare(conjunct_ctxs, state));
RETURN_IF_ERROR(Expr::open(conjunct_ctxs, state));

return Status::OK();
}

void OlapScanPrepareOperatorFactory::close(RuntimeState* state) {
const auto& conjunct_ctxs = _scan_node->conjunct_ctxs();
Expr::close(conjunct_ctxs, state);

SourceOperatorFactory::close(state);
}

OperatorPtr OlapScanPrepareOperatorFactory::create(int32_t degree_of_parallelism, int32_t driver_sequence) {
return std::make_shared<OlapScanPrepareOperator>(this, _id, _name, _plan_node_id, driver_sequence,
_ctx_factory->get_or_create(driver_sequence));
}

} // namespace starrocks::pipeline

0 comments on commit 6c5e2f8

Please sign in to comment.