Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Status Merger::vertical_compact_one_group(
CompactionSampleInfo* sample_info) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
vectorized::VerticalBlockReader reader(row_source_buf, true);
TabletReader::ReaderParams reader_params;
reader_params.is_key_column_group = is_key;
reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ struct OlapReaderStatistics {

// total read bytes in memory
int64_t bytes_read = 0;
int64_t cache_bytes_read = 0;
int64_t local_bytes_read = 0;
int64_t s3_bytes_read = 0;

int64_t block_fetch_ns = 0; // time of rowset reader's `next_batch()` call
int64_t block_load_ns = 0;
Expand Down
13 changes: 11 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ Status ColumnReader::new_inverted_index_iterator(

Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
BlockCompressionCodec* codec) const {
BlockCompressionCodec* codec) {
iter_opts.sanity_check();
PageReadOptions opts {
.verify_checksum = _opts.verify_checksum,
Expand All @@ -369,7 +369,12 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
};
// index page should not pre decode
if (iter_opts.type == INDEX_PAGE) opts.pre_decode = false;
return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
Status s = PageIO::read_and_decompress_page(opts, handle, page_body, footer);
_compaction_io_time_ns += opts.stats->io_ns;
_compaction_cache_bytes += opts.stats->cache_bytes_read;
_compaction_local_bytes += opts.stats->local_bytes_read;
_compaction_s3_bytes += opts.stats->s3_bytes_read;
return s;
}

Status ColumnReader::get_row_ranges_by_zone_map(
Expand Down Expand Up @@ -1352,6 +1357,10 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
_opts.type = DATA_PAGE;
RETURN_IF_ERROR(
_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer, _compress_codec));
_compaction_io_time_ns += _reader->get_compaction_io_time_ns();
_compaction_cache_bytes += _reader->get_compaction_cache_bytes();
_compaction_s3_bytes += _reader->get_compaction_s3_bytes();
_compaction_local_bytes += _reader->get_compaction_local_bytes();
// parse data page
RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(),
_reader->encoding_info(), iter.page(), iter.page_index(),
Expand Down
38 changes: 36 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
// read a page from file into a page handle
Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
BlockCompressionCodec* codec) const;
BlockCompressionCodec* codec);

bool is_nullable() const { return _meta_is_nullable; }

Expand Down Expand Up @@ -206,6 +206,22 @@ class ColumnReader : public MetadataAdder<ColumnReader> {

FieldType get_meta_type() { return _meta_type; }

int64_t get_compaction_io_time_ns() const {
return _compaction_io_time_ns;
}

int64_t get_compaction_cache_bytes() const {
return _compaction_cache_bytes;
}

int64_t get_compaction_local_bytes() const {
return _compaction_local_bytes;
}

int64_t get_compaction_s3_bytes() const {
return _compaction_s3_bytes;
}

private:
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
io::FileReaderSPtr file_reader);
Expand Down Expand Up @@ -254,6 +270,11 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
bool _use_index_page_cache;
int _be_exec_version = -1;

int64_t _compaction_io_time_ns;
int64_t _compaction_cache_bytes;
int64_t _compaction_local_bytes;
int64_t _compaction_s3_bytes;

PagePointer _meta_dict_page;
CompressionTypePB _meta_compression;

Expand Down Expand Up @@ -342,6 +363,11 @@ class ColumnIterator {

virtual bool is_all_dict_encoding() const { return false; }

virtual int64_t get_compaction_io_time_ns() const { return 0; }
virtual int64_t get_compaction_cache_bytes() const { return 0; }
virtual int64_t get_compaction_local_bytes() const { return 0; }
virtual int64_t get_compaction_s3_bytes() const { return 0; }

protected:
ColumnIteratorOptions _opts;
};
Expand Down Expand Up @@ -389,12 +415,20 @@ class FileColumnIterator final : public ColumnIterator {

bool is_all_dict_encoding() const override { return _is_all_dict_encoding; }

int64_t get_compaction_io_time_ns() const override { return _compaction_io_time_ns; }
int64_t get_compaction_cache_bytes() const override { return _compaction_cache_bytes; }
int64_t get_compaction_local_bytes() const override { return _compaction_local_bytes; }
int64_t get_compaction_s3_bytes() const override { return _compaction_s3_bytes; }

private:
void _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) const;
Status _load_next_page(bool* eos);
Status _read_data_page(const OrdinalPageIndexIterator& iter);
Status _read_dict_data();

int64_t _compaction_io_time_ns = 0;
int64_t _compaction_cache_bytes = 0;
int64_t _compaction_local_bytes = 0;
int64_t _compaction_s3_bytes = 0;
ColumnReader* _reader = nullptr;

// iterator owned compress codec, should NOT be shared by threads, initialized in init()
Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <string>
#include <utility>

#include "cloud/config.h"
#include "common/logging.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_reader.h"
Expand Down Expand Up @@ -133,6 +134,7 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle
footer_size, opts.file_reader->path().native());
}
*body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
opts.stats->cache_bytes_read += page_slice.size;
return Status::OK();
}

Expand All @@ -154,8 +156,12 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle
&opts.io_ctx));
DCHECK_EQ(bytes_read, page_size);
opts.stats->compressed_bytes_read += page_size;
if (config::is_cloud_mode()) {
opts.stats->s3_bytes_read += bytes_read;
} else {
opts.stats->local_bytes_read += bytes_read;
}
}

if (opts.verify_checksum) {
uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4);
Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <utility>
#include <vector>

#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -1895,6 +1896,27 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu

RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size,
_current_return_columns[cid]));
if (_is_compaction) {
_compaction_io_time_ns += _column_iterators[cid]->get_compaction_io_time_ns();
_compaction_cache_bytes += _column_iterators[cid]->get_compaction_cache_bytes();
_compaction_local_bytes += _column_iterators[cid]->get_compaction_local_bytes();
_compaction_s3_bytes += _column_iterators[cid]->get_compaction_s3_bytes();
if (config::is_cloud_mode()) {
int64_t total_bytes = _compaction_cache_bytes + _compaction_s3_bytes;
double cache_percent = total_bytes > 0 ? (_compaction_cache_bytes * 100.0 / total_bytes) : 0;
double s3_percent = 1 - cache_percent;
LOG(INFO) << "Compaction IO time: " << _compaction_io_time_ns << " ns, "
<< "Cache bytes: " << _compaction_cache_bytes << " bytes (" << std::fixed << std::setprecision(2) << cache_percent << "%), "
<< "S3 bytes: " << _compaction_s3_bytes << " bytes (" << std::fixed << std::setprecision(2) << s3_percent << "%)";
} else {
int64_t total_bytes = _compaction_cache_bytes + _compaction_local_bytes;
double cache_percent = total_bytes > 0 ? (_compaction_cache_bytes * 100.0 / total_bytes) : 0;
double local_percent = 1 - cache_percent;
LOG(INFO) << "Compaction IO time: " << _compaction_io_time_ns << " ns, "
<< "Cache bytes: " << _compaction_cache_bytes << " bytes (" << std::fixed << std::setprecision(2) << cache_percent << "%), "
<< "Local bytes: " << _compaction_local_bytes << " bytes (" << std::fixed << std::setprecision(2) << local_percent << "%)";
}
}
}

return Status::OK();
Expand Down
31 changes: 30 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,38 @@ class SegmentIterator : public RowwiseIterator {
[](const auto& iterator) { return iterator != nullptr; });
}

int64_t get_compaction_io_time_ns() const {
return _compaction_io_time_ns;
}

int64_t get_compaction_cache_bytes() const {
return _compaction_cache_bytes;
}

int64_t get_compaction_local_bytes() const {
return _compaction_local_bytes;
}

int64_t get_compaction_s3_bytes() const {
return _compaction_s3_bytes;
}

bool get_is_compaction() const {
return _is_compaction;
}

void set_is_compaction(bool is_compaction) {
_is_compaction = is_compaction;
}


private:
bool _is_compaction = false;
int64_t _compaction_io_time_ns = 0;
int64_t _compaction_cache_bytes = 0;
int64_t _compaction_local_bytes = 0;
int64_t _compaction_s3_bytes = 0;
Status _next_batch_internal(vectorized::Block* block);

template <typename Container>
bool _update_profile(RuntimeProfile* profile, const Container& predicates,
const std::string& title) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/olap/vertical_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "olap/tablet_schema.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -84,6 +85,12 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para
bool use_cache = !rs_split.rs_reader->rowset()->is_local();
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_iterators(&_reader_context, segment_iters,
use_cache));
for (auto& iter : *segment_iters) {
auto* seg_iter = dynamic_cast<SegmentIterator*>(iter.get());
if (seg_iter != nullptr) {
seg_iter->set_is_compaction(_is_compaction);
}
}
// if segments overlapping, all segment iterator should be inited in
// heap merge iterator. If segments are none overlapping, only first segment of this
// rowset will be inited and push to heap, other segment will be inited later when current
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/olap/vertical_block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class RowSourcesBuffer;

class VerticalBlockReader final : public TabletReader {
public:
VerticalBlockReader(RowSourcesBuffer* row_sources_buffer)
: _row_sources_buffer(row_sources_buffer) {
VerticalBlockReader(RowSourcesBuffer* row_sources_buffer, bool is_compaction = false)
: _row_sources_buffer(row_sources_buffer), _is_compaction(is_compaction) {
_id = nextId++;
}

Expand Down Expand Up @@ -103,6 +103,7 @@ class VerticalBlockReader final : public TabletReader {
Status (VerticalBlockReader::*_next_block_func)(Block* block, bool* eof) = nullptr;

RowSourcesBuffer* _row_sources_buffer;
bool _is_compaction;
ColumnPtr _delete_filter_column;

// for agg mode
Expand Down