Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions be/src/exec/operator/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,18 @@ Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
}

bool enable_parallel_scan = state()->enable_parallel_scan();
auto resolve_binlog_scan_type = [](const TPaloScanRange& scan_range) {
if (scan_range.__isset.binlog_scan_type) {
return scan_range.binlog_scan_type;
}
return TBinlogScanType::NONE;
};
auto resolve_binlog_read_source = [](const TPaloScanRange& scan_range) {
if (scan_range.__isset.binlog_read_source) {
return scan_range.binlog_read_source;
}
return TBinlogReadSource::NONE;
};
bool read_row_binlog =
p._olap_scan_node.__isset.read_row_binlog && p._olap_scan_node.read_row_binlog;

Expand Down Expand Up @@ -671,11 +683,10 @@ Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {

int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
for (size_t scan_range_idx = 0; scan_range_idx < _scan_ranges.size(); scan_range_idx++) {
const auto& palo_scan_range = *_scan_ranges[scan_range_idx];
int64_t version = 0;
std::from_chars(_scan_ranges[scan_range_idx]->version.data(),
_scan_ranges[scan_range_idx]->version.data() +
_scan_ranges[scan_range_idx]->version.size(),
version);
std::from_chars(palo_scan_range.version.data(),
palo_scan_range.version.data() + palo_scan_range.version.size(), version);
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges;
int size_based_scanners_per_tablet = 1;

Expand Down Expand Up @@ -703,18 +714,27 @@ Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
for (auto& split : _read_sources[scan_range_idx].rs_splits) {
split.rs_reader = split.rs_reader->clone();
}
auto scanner =
OlapScanner::create_shared(this, OlapScanner::Params {
state(),
_scanner_profile.get(),
scanner_ranges,
_tablets[scan_range_idx].tablet,
version,
_read_sources[scan_range_idx],
p._limit,
p._olap_scan_node.is_preaggregation,
read_row_binlog,
});

auto scanner = OlapScanner::create_shared(
this, OlapScanner::Params {
state(),
_scanner_profile.get(),
scanner_ranges,
_tablets[scan_range_idx].tablet,
version,
_read_sources[scan_range_idx],
p._limit,
p._olap_scan_node.is_preaggregation,
read_row_binlog,
resolve_binlog_scan_type(palo_scan_range),
resolve_binlog_read_source(palo_scan_range),
palo_scan_range.__isset.start_tso
? std::make_optional(palo_scan_range.start_tso)
: std::nullopt,
palo_scan_range.__isset.end_tso
? std::make_optional(palo_scan_range.end_tso)
: std::nullopt,
});
RETURN_IF_ERROR(scanner->init(state(), _conjuncts));
scanners->push_back(std::move(scanner));
}
Expand Down
107 changes: 105 additions & 2 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "common/logging.h"
#include "common/metrics/doris_metrics.h"
#include "core/block/block.h"
#include "core/data_type/data_type_number.h"
#include "exec/common/variant_util.h"
#include "exec/operator/olap_scan_operator.h"
#include "exec/scan/scan_node.h"
Expand All @@ -51,12 +52,14 @@
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "storage/binlog.h"
#include "storage/id_manager.h"
#include "storage/index/inverted/inverted_index_profile.h"
#include "storage/iterator/block_reader.h"
#include "storage/olap_common.h"
#include "storage/olap_tuple.h"
#include "storage/olap_utils.h"
#include "storage/predicate/predicate_creator.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet_schema.h"
#ifndef NDEBUG
Expand Down Expand Up @@ -98,7 +101,11 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& param
.score_runtime {},
.collection_statistics {},
.ann_topn_runtime {},
.condition_cache_digest = parent->get_condition_cache_digest()}) {
.condition_cache_digest = parent->get_condition_cache_digest(),
.binlog_scan_type = params.binlog_scan_type,
.binlog_read_source = params.binlog_read_source}),
_start_tso(params.start_tso),
_end_tso(params.end_tso) {
_tablet_reader_params.set_read_source(std::move(params.read_source),
_state->skip_delete_bitmap());
_has_prepared = false;
Expand Down Expand Up @@ -287,6 +294,49 @@ Status OlapScanner::_open_impl(RuntimeState* state) {
return Status::OK();
}

Status OlapScanner::_init_row_binlog_tso_predicates() {
if (_tablet_reader_params.reader_type != ReaderType::READER_BINLOG) {
return Status::OK();
}

if (!_start_tso.has_value() && !_end_tso.has_value()) {
return Status::OK();
}

auto& tablet_schema = _tablet_reader_params.tablet_schema;
int32_t tso_index = tablet_schema->field_index(std::string(kRowBinlogTimestampColName));
if (tso_index < 0) {
auto source_tablet_schema = _tablet_reader_params.tablet->row_binlog_tablet_schema();
const int32_t source_tso_index =
source_tablet_schema->field_index(std::string(kRowBinlogTimestampColName));
if (source_tso_index < 0) {
return Status::InternalError("Column {} not found in tablet schema",
std::string(kRowBinlogTimestampColName));
}
tablet_schema->append_column(TabletColumn(source_tablet_schema->column(source_tso_index)));
tso_index = tablet_schema->field_index(std::string(kRowBinlogTimestampColName));
}
if (tso_index < 0) {
return Status::InternalError("Column {} not found in tablet schema after append",
std::string(kRowBinlogTimestampColName));
}

auto data_type = std::make_shared<DataTypeInt64>();
if (_start_tso.has_value()) {
Field start_value =
Field::create_field<TYPE_BIGINT>(extract_tso_physical_time(*_start_tso));
_tablet_reader_params.predicates.push_back(create_comparison_predicate<PredicateType::GE>(
tso_index, std::string(kRowBinlogTimestampColName), data_type, start_value, false));
}
if (_end_tso.has_value()) {
Field end_value = Field::create_field<TYPE_BIGINT>(extract_tso_physical_time(*_end_tso));
_tablet_reader_params.predicates.push_back(create_comparison_predicate<PredicateType::LE>(
tso_index, std::string(kRowBinlogTimestampColName), data_type, end_value, false));
}

return Status::OK();
}

// it will be called under tablet read lock because capture rs readers need
Status OlapScanner::_init_tablet_reader_params(
const phmap::flat_hash_map<int, SlotDescriptor*>& slot_id_to_slot_desc,
Expand Down Expand Up @@ -371,7 +421,49 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.origin_return_columns = &_return_columns;
_tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set;

if (_tablet_reader_params.direct_mode) {
auto add_return_column_if_absent = [&](uint32_t cid) {
if (std::find(_tablet_reader_params.return_columns.begin(),
_tablet_reader_params.return_columns.end(),
cid) == _tablet_reader_params.return_columns.end()) {
_tablet_reader_params.return_columns.push_back(cid);
}
};

const bool need_before_columns =
_tablet_reader_params.binlog_scan_type == TBinlogScanType::MIN_DELTA ||
(_tablet_reader_params.binlog_scan_type == TBinlogScanType::DETAIL &&
_tablet_reader_params.binlog_read_source == TBinlogReadSource::CHANGES);
if (need_before_columns) {
for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) {
add_return_column_if_absent(static_cast<uint32_t>(i));
}
for (auto cid : _return_columns) {
add_return_column_if_absent(cid);
}

if (int32_t op_idx = tablet_schema->field_index(std::string(kRowBinlogOpColName));
op_idx >= 0) {
add_return_column_if_absent(static_cast<uint32_t>(op_idx));
}
if (int32_t lsn_idx = tablet_schema->field_index(std::string(kRowBinlogLsnColName));
lsn_idx >= 0) {
add_return_column_if_absent(static_cast<uint32_t>(lsn_idx));
}

for (auto cid : _return_columns) {
if (cid >= tablet_schema->num_key_columns()) {
const auto& col_name = tablet_schema->column(cid).name();
std::string before_col_name;
before_col_name.append("__BEFORE__");
before_col_name.append(col_name);
before_col_name.append("__");
if (int32_t before_idx = tablet_schema->field_index(before_col_name);
before_idx >= 0) {
add_return_column_if_absent(static_cast<uint32_t>(before_idx));
}
}
}
} else if (_tablet_reader_params.direct_mode) {
_tablet_reader_params.return_columns = _return_columns;
} else {
// we need to fetch all key columns to do the right aggregation on storage engine side.
Expand Down Expand Up @@ -424,6 +516,17 @@ Status OlapScanner::_init_tablet_reader_params(
}
}

RETURN_IF_ERROR(_init_row_binlog_tso_predicates());

if (_tablet_reader_params.binlog_scan_type != TBinlogScanType::NONE) {
_tablet_reader_params.read_orderby_key = true;
_tablet_reader_params.read_orderby_key_reverse = false;
_tablet_reader_params.read_orderby_key_num_prefix_columns = 0;
_tablet_reader_params.read_orderby_key_limit = 0;
_tablet_reader_params.force_key_ordered_read = true;
_tablet_reader_params.topn_filter_source_node_ids.clear();
}

_tablet_reader_params.use_page_cache = _state->enable_page_cache();

DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK);
Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/scan/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cstddef>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <unordered_set>
#include <utility>
Expand Down Expand Up @@ -69,6 +70,10 @@ class OlapScanner : public Scanner {
int64_t limit;
bool aggregation;
bool read_row_binlog = false;
TBinlogScanType::type binlog_scan_type = TBinlogScanType::NONE;
TBinlogReadSource::type binlog_read_source = TBinlogReadSource::NONE;
std::optional<int64_t> start_tso;
std::optional<int64_t> end_tso;
};

OlapScanner(ScanLocalStateBase* parent, Params&& params);
Expand All @@ -95,6 +100,8 @@ class OlapScanner : public Scanner {
predicates,
const std::vector<FunctionFilter>& function_filters);

[[nodiscard]] Status _init_row_binlog_tso_predicates();

[[nodiscard]] Status _init_return_columns();
[[nodiscard]] Status _init_variant_columns();
#ifndef NDEBUG
Expand All @@ -105,6 +112,8 @@ class OlapScanner : public Scanner {

TabletReader::ReaderParams _tablet_reader_params;
std::unique_ptr<TabletReader> _tablet_reader;
std::optional<int64_t> _start_tso;
std::optional<int64_t> _end_tso;

int64_t _bytes_read_from_local = 0;
int64_t _bytes_read_from_remote = 0;
Expand Down
15 changes: 13 additions & 2 deletions be/src/exec/scan/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,19 @@ std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges,
TabletReadSource&& read_source) {
OlapScanner::Params params {
_state, _scanner_profile.get(), key_ranges, std::move(tablet),
version, std::move(read_source), _limit, _is_preaggregation,
_state,
_scanner_profile.get(),
key_ranges,
std::move(tablet),
version,
std::move(read_source),
_limit,
_is_preaggregation,
false,
TBinlogScanType::NONE,
TBinlogReadSource::NONE,
std::nullopt,
std::nullopt,
};
return OlapScanner::create_shared(_parent, std::move(params));
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ constexpr std::string_view kBinlogDataPrefix = "binlog_data_";
constexpr std::string_view kRowBinlogPrefix = "binlog_row_";
constexpr std::string_view kRowBinlogLsnColName = "__DORIS_BINLOG_LSN__";
constexpr std::string_view kRowBinlogTimestampColName = "__DORIS_BINLOG_TIMESTAMP__";
constexpr std::string_view kRowBinlogOpColName = "__DORIS_BINLOG_OP__";

constexpr int64_t kBinlogLsnAutoIncId = -1;
// used in file directory
constexpr std::string_view FDRowBinlogSuffix = "_row_binlog";
Expand Down
Loading
Loading