Skip to content

Commit

Permalink
[BugFix] Acquire rowsets at querying (backport #13830) (#14046)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZiheLiu committed Nov 25, 2022
1 parent a8f30a0 commit c6dcbdb
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 36 deletions.
27 changes: 27 additions & 0 deletions be/src/exec/pipeline/scan/olap_scan_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "exec/vectorized/olap_scan_node.h"
#include "exprs/vectorized/runtime_filter_bank.h"
#include "storage/tablet.h"

namespace starrocks::pipeline {

Expand All @@ -20,6 +21,32 @@ Status OlapScanContext::prepare(RuntimeState* state) {
void OlapScanContext::close(RuntimeState* state) {
const auto& conjunct_ctxs = _scan_node->conjunct_ctxs();
Expr::close(conjunct_ctxs, state);

for (const auto& rowsets_per_tablet : _tablet_rowsets) {
Rowset::release_readers(rowsets_per_tablet);
}
}

Status OlapScanContext::capture_tablet_rowsets(const std::vector<TInternalScanRange*>& olap_scan_ranges) {
_tablet_rowsets.resize(olap_scan_ranges.size());
_tablets.resize(olap_scan_ranges.size());
for (int i = 0; i < olap_scan_ranges.size(); ++i) {
auto* scan_range = olap_scan_ranges[i];

int64_t version = strtoul(scan_range->version.c_str(), nullptr, 10);
ASSIGN_OR_RETURN(TabletSharedPtr tablet, vectorized::OlapScanNode::get_tablet(scan_range));

// Capture row sets of this version tablet.
{
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &_tablet_rowsets[i]));
Rowset::acquire_readers(_tablet_rowsets[i]);
}

_tablets[i] = std::move(tablet);
}

return Status::OK();
}

Status OlapScanContext::parse_conjuncts(RuntimeState* state, const std::vector<ExprContext*>& runtime_in_filters,
Expand Down
15 changes: 15 additions & 0 deletions be/src/exec/pipeline/scan/olap_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
namespace starrocks {

class ScanNode;
class Tablet;
using TabletSharedPtr = std::shared_ptr<Tablet>;
class Rowset;
using RowsetSharedPtr = std::shared_ptr<Rowset>;

namespace vectorized {
class RuntimeFilterProbeCollector;
Expand Down Expand Up @@ -38,6 +42,10 @@ class OlapScanContext final : public ContextWithDependency {
const std::vector<ExprContext*>& not_push_down_conjuncts() const { return _not_push_down_conjuncts; }
const std::vector<std::unique_ptr<OlapScanRange>>& key_ranges() const { return _key_ranges; }

Status capture_tablet_rowsets(const std::vector<TInternalScanRange*>& olap_scan_ranges);
const std::vector<TabletSharedPtr>& tablets() const { return _tablets; }
const std::vector<std::vector<RowsetSharedPtr>>& tablet_rowsets() const { return _tablet_rowsets; };

private:
vectorized::OlapScanNode* _scan_node;

Expand All @@ -50,6 +58,13 @@ class OlapScanContext final : public ContextWithDependency {
ObjectPool _obj_pool;

std::atomic<bool> _is_prepare_finished{false};

// The row sets of tablets will become stale and be deleted, if compaction occurs
// and these row sets aren't referenced, which will typically happen when the tablets
// of the left table are compacted at building the right hash table. Therefore, reference
// the row sets into _tablet_rowsets in the preparation phase to avoid the row sets being deleted.
std::vector<TabletSharedPtr> _tablets;
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;
};

} // namespace pipeline
Expand Down
32 changes: 6 additions & 26 deletions be/src/exec/pipeline/scan/olap_scan_prepare_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ OlapScanPrepareOperator::~OlapScanPrepareOperator() {
Status OlapScanPrepareOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));

RETURN_IF_ERROR(_capture_tablet_rowsets());
return _ctx->prepare(state);
RETURN_IF_ERROR(_ctx->prepare(state));
RETURN_IF_ERROR(_ctx->capture_tablet_rowsets(_morsel_queue->olap_scan_ranges()));

return Status::OK();
}

void OlapScanPrepareOperator::close(RuntimeState* state) {
Expand All @@ -46,8 +48,8 @@ StatusOr<vectorized::ChunkPtr> OlapScanPrepareOperator::pull_chunk(RuntimeState*
Status status = _ctx->parse_conjuncts(state, runtime_in_filters(), runtime_bloom_filters());

_morsel_queue->set_key_ranges(_ctx->key_ranges());
_morsel_queue->set_tablets(_tablets);
_morsel_queue->set_tablet_rowsets(_tablet_rowsets);
_morsel_queue->set_tablets(_ctx->tablets());
_morsel_queue->set_tablet_rowsets(_ctx->tablet_rowsets());

_ctx->set_prepare_finished();
if (!status.ok()) {
Expand All @@ -58,28 +60,6 @@ StatusOr<vectorized::ChunkPtr> OlapScanPrepareOperator::pull_chunk(RuntimeState*
return nullptr;
}

Status OlapScanPrepareOperator::_capture_tablet_rowsets() {
auto olap_scan_ranges = _morsel_queue->olap_scan_ranges();
_tablet_rowsets.resize(olap_scan_ranges.size());
_tablets.resize(olap_scan_ranges.size());
for (int i = 0; i < olap_scan_ranges.size(); ++i) {
auto* scan_range = olap_scan_ranges[i];

int64_t version = strtoul(scan_range->version.c_str(), nullptr, 10);
ASSIGN_OR_RETURN(TabletSharedPtr tablet, vectorized::OlapScanNode::get_tablet(scan_range));

// Capture row sets of this version tablet.
{
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &_tablet_rowsets[i]));
}

_tablets[i] = std::move(tablet);
}

return Status::OK();
}

/// OlapScanPrepareOperatorFactory
OlapScanPrepareOperatorFactory::OlapScanPrepareOperatorFactory(int32_t id, int32_t plan_node_id, OlapScanContextPtr ctx)
: SourceOperatorFactory(id, "olap_scan_prepare", plan_node_id), _ctx(std::move(ctx)) {}
Expand Down
10 changes: 0 additions & 10 deletions be/src/exec/pipeline/scan/olap_scan_prepare_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@ class OlapScanPrepareOperator final : public SourceOperator {
StatusOr<vectorized::ChunkPtr> pull_chunk(RuntimeState* state) override;

private:
Status _capture_tablet_rowsets();

private:
// The row sets of tablets will become stale and be deleted, if compaction occurs
// and these row sets aren't referenced, which will typically happen when the tablets
// of the left table are compacted at building the right hash table. Therefore, reference
// the row sets into _tablet_rowsets in the preparation phase to avoid the row sets being deleted.
std::vector<TabletSharedPtr> _tablets;
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;

OlapScanContextPtr _ctx;
};

Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/vectorized/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ Status OlapScanNode::close(RuntimeState* state) {
release_large_columns<BinaryColumn>(runtime_state()->chunk_size() * 512);
}

for (const auto& rowsets_per_tablet : _tablet_rowsets) {
Rowset::release_readers(rowsets_per_tablet);
}

return ScanNode::close(state);
}

Expand Down Expand Up @@ -670,6 +674,7 @@ Status OlapScanNode::_capture_tablet_rowsets() {
{
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &_tablet_rowsets[i]));
Rowset::acquire_readers(_tablet_rowsets[i]);
}
}

Expand Down

0 comments on commit c6dcbdb

Please sign in to comment.