Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/exec/operator/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ Status OlapScanLocalState::_init_profile() {
_lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime");
_lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT);

_deferred_nested_read_timer = ADD_TIMER(_segment_profile, "DeferredNestedReadTime");

_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");

_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/operator/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
RuntimeProfile::Counter* _lazy_read_timer = nullptr;
RuntimeProfile::Counter* _lazy_read_seek_timer = nullptr;
RuntimeProfile::Counter* _lazy_read_seek_counter = nullptr;
RuntimeProfile::Counter* _deferred_nested_read_timer = nullptr;

// total pages read
// used by segment v2
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ void OlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(local_state->_predicate_column_read_seek_counter,
stats.predicate_column_read_seek_num);
COUNTER_UPDATE(local_state->_lazy_read_timer, stats.lazy_read_ns);
COUNTER_UPDATE(local_state->_deferred_nested_read_timer, stats.deferred_nested_read_ns);
COUNTER_UPDATE(local_state->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns);
COUNTER_UPDATE(local_state->_lazy_read_seek_counter, stats.block_lazy_read_seek_num);
COUNTER_UPDATE(local_state->_output_col_timer, stats.output_col_ns);
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,11 @@ class RuntimeState {
_query_options.enable_aggregate_function_null_v2;
}

bool enable_prune_nested_column() const {
return _query_options.__isset.enable_prune_nested_column &&
_query_options.enable_prune_nested_column;
}

bool is_read_csv_empty_line_as_null() const {
return _query_options.__isset.read_csv_empty_line_as_null &&
_query_options.read_csv_empty_line_as_null;
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ struct OlapReaderStatistics {
int64_t lazy_read_ns = 0;
int64_t block_lazy_read_seek_num = 0;
int64_t block_lazy_read_seek_ns = 0;
int64_t deferred_nested_read_ns = 0;

int64_t raw_rows_read = 0;

Expand Down
553 changes: 417 additions & 136 deletions be/src/storage/segment/column_reader.cpp

Large diffs are not rendered by default.

133 changes: 132 additions & 1 deletion be/src/storage/segment/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

#include <cstddef> // for size_t
#include <cstdint> // for uint32_t
#include <memory> // for unique_ptr
#include <map>
#include <memory> // for unique_ptr
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -424,18 +425,81 @@ class ColumnIterator {
bool read_offset_only() const { return _read_mode == ReadMode::OFFSET_ONLY; }
bool read_null_map_only() const { return _read_mode == ReadMode::NULL_MAP_ONLY; }

enum class ReadPhase : int { FULL, PREDICATE, DEFERRED };

virtual void activate_read_phase(ReadPhase mode) {
_read_phase = mode;
if (mode == ReadPhase::PREDICATE) {
_nested_read_plan.has_deferred_defaults = false;
}
}

virtual bool need_to_read() const {
switch (_read_phase) {
case ReadPhase::FULL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadPhase::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadPhase::DEFERRED:
return _reading_flag == ReadingFlag::NEED_TO_READ;
default:
return false;
}
}

// Whether need to read meta columns, such as null map column, offset column.
bool need_to_read_meta_columns() const {
if (_reading_flag == ReadingFlag::SKIP_READING) {
return false;
}
switch (_read_phase) {
case ReadPhase::FULL:
case ReadPhase::PREDICATE:
return true;
case ReadPhase::DEFERRED:
return _reading_flag != ReadingFlag::READING_FOR_PREDICATE;
}
return false;
}

virtual void finish_deferred_read(MutableColumnPtr& dst) { _drop_deferred_defaults(dst); }

virtual void set_reading_flag_recursively(ReadingFlag flag) { set_reading_flag(flag); }

// Whether this iterator or any nested iterator has data that must be materialized
// in deferred phase. Predicate-only and meta-only branches are read before filtering and
// must not be re-read in the lazy phase.
virtual bool has_deferred_read_target() const {
return _reading_flag == ReadingFlag::NEED_TO_READ;
}

bool is_pruned() const { return _nested_read_plan.pruned; }

protected:
void _append_deferred_defaults(MutableColumnPtr& dst, size_t count);

void _drop_deferred_defaults(MutableColumnPtr& dst);

// Checks sub access paths for OFFSET or NULL meta-only modes and
// updates _read_mode accordingly. Use the accessor helpers
// read_offset_only() / read_null_map_only() to query the current mode.
void _check_and_set_meta_read_mode(const TColumnAccessPaths& sub_all_access_paths);

Result<TColumnAccessPaths> _get_sub_access_paths(const TColumnAccessPaths& access_paths);
Result<TColumnAccessPaths> _normalize_access_paths(const TColumnAccessPaths& access_paths,
const bool is_predicate);
ColumnIteratorOptions _opts;

ReadingFlag _reading_flag {ReadingFlag::NORMAL_READING};
ReadMode _read_mode = ReadMode::DEFAULT;
ReadPhase _read_phase {ReadPhase::FULL};
std::string _column_name;

struct NestedReadPlan {
bool pruned {false};
bool has_deferred_defaults {false};
};
NestedReadPlan _nested_read_plan;
};

// This iterator is used to read column data from file
Expand Down Expand Up @@ -633,6 +697,29 @@ class MapFileColumnIterator final : public ColumnIterator {

void remove_pruned_sub_iterators() override;

void activate_read_phase(ReadPhase mode) override;

bool need_to_read() const override {
switch (_read_phase) {
case ReadPhase::FULL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadPhase::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadPhase::DEFERRED:
// In deferred phase, read this map only when at least one key/value branch still
// has non-predicate data to materialize.
return has_deferred_read_target();
default:
return false;
}
}

void finish_deferred_read(MutableColumnPtr& dst) override;

void set_reading_flag_recursively(ReadingFlag flag) override;

bool has_deferred_read_target() const override;

private:
std::shared_ptr<ColumnReader> _map_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -682,6 +769,27 @@ class StructFileColumnIterator final : public ColumnIterator {
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;

void activate_read_phase(ReadPhase mode) override;

bool need_to_read() const override {
switch (_read_phase) {
case ReadPhase::FULL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadPhase::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadPhase::DEFERRED:
// In deferred phase, read this struct only when at least one nested branch still
// has non-predicate data to materialize.
return has_deferred_read_target();
default:
return false;
}
}

void finish_deferred_read(MutableColumnPtr& dst) override;
void set_reading_flag_recursively(ReadingFlag flag) override;
bool has_deferred_read_target() const override;

private:
std::shared_ptr<ColumnReader> _struct_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -729,6 +837,29 @@ class ArrayFileColumnIterator final : public ColumnIterator {
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;

void activate_read_phase(ReadPhase mode) override;

bool need_to_read() const override {
switch (_read_phase) {
case ReadPhase::FULL:
return _reading_flag != ReadingFlag::SKIP_READING;
case ReadPhase::PREDICATE:
return _reading_flag == ReadingFlag::READING_FOR_PREDICATE;
case ReadPhase::DEFERRED:
// In deferred phase, read this array only when its item branch still has
// non-predicate data to materialize.
return has_deferred_read_target();
default:
return false;
}
}

void finish_deferred_read(MutableColumnPtr& dst) override;

void set_reading_flag_recursively(ReadingFlag flag) override;

bool has_deferred_read_target() const override;

private:
std::shared_ptr<ColumnReader> _array_reader = nullptr;
std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
Expand Down
Loading
Loading