Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ Status Compaction::merge_input_rowsets() {
}

RowsetWriterContext ctx;
ctx.input_rs_readers = input_rs_readers;
RETURN_IF_ERROR(construct_output_rowset_writer(ctx));

// write merged rows to output rowset
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ struct RowsetWriterContext {
// For remote rowset
std::optional<StorageResource> storage_resource;

// For collect segment statistics for compaction
std::vector<RowsetReaderSharedPtr> input_rs_readers;

bool is_local_rowset() const { return !storage_resource; }

std::string segment_path(int seg_id) const {
Expand Down
146 changes: 146 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/bloom_filter_index_reader.h"
#include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo
#include "olap/rowset/segment_v2/hierarchical_data_reader.h"
#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
#include "olap/rowset/segment_v2/page_decoder.h"
Expand All @@ -52,6 +53,7 @@
#include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/tablet_schema.h"
#include "olap/types.h" // for TypeInfo
Expand Down Expand Up @@ -220,6 +222,146 @@ Status ColumnReader::create_agg_state(const ColumnReaderOptions& opts, const Col
agg_state_type->get_name(), int(type));
}

const SubcolumnColumnReaders::Node* VariantColumnReader::get_reader_by_path(
const vectorized::PathInData& relative_path) const {
return _subcolumn_readers->find_leaf(relative_path);
}

Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'new_iterator' has cognitive complexity of 51 (threshold 50) [readability-function-cognitive-complexity]

Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
                            ^
Additional context

be/src/olap/rowset/segment_v2/column_reader.cpp:237: +1, including nesting penalty of 0, nesting level increased to 1

    if (node != nullptr) {
    ^

be/src/olap/rowset/segment_v2/column_reader.cpp:238: +2, including nesting penalty of 1, nesting level increased to 2

        if (node->is_leaf_node()) {
        ^

be/src/olap/rowset/segment_v2/column_reader.cpp:242: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(node->data.reader->new_iterator(iterator));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/olap/rowset/segment_v2/column_reader.cpp:242: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(node->data.reader->new_iterator(iterator));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/olap/rowset/segment_v2/column_reader.cpp:243: +1, nesting level increased to 2

        } else {
          ^

be/src/olap/rowset/segment_v2/column_reader.cpp:247: +3, including nesting penalty of 2, nesting level increased to 3

            if (!_sparse_column_set_in_stats.empty()) {
            ^

be/src/olap/rowset/segment_v2/column_reader.cpp:250: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter));
                ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/olap/rowset/segment_v2/column_reader.cpp:250: +5, including nesting penalty of 4, nesting level increased to 5

                RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter));
                ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/olap/rowset/segment_v2/column_reader.cpp:255: +3, including nesting penalty of 2, nesting level increased to 3

                    (relative_path == root->path) ? HierarchicalDataReader::ReadType::MERGE_ROOT
                                                  ^

be/src/olap/rowset/segment_v2/column_reader.cpp:257: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(HierarchicalDataReader::create(iterator, relative_path, node, root,
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/olap/rowset/segment_v2/column_reader.cpp:257: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(HierarchicalDataReader::create(iterator, relative_path, node, root,
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/olap/rowset/segment_v2/column_reader.cpp:260: +1, nesting level increased to 1

    } else {
      ^

be/src/olap/rowset/segment_v2/column_reader.cpp:261: +2, including nesting penalty of 1, nesting level increased to 2

        if (_sparse_column_set_in_stats.contains(StringRef {relative_path.get_path()}) ||
        ^

be/src/olap/rowset/segment_v2/column_reader.cpp:266: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/olap/rowset/segment_v2/column_reader.cpp:266: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/olap/rowset/segment_v2/column_reader.cpp:269: +1, nesting level increased to 2

        } else {
          ^

be/src/olap/rowset/segment_v2/column_reader.cpp:272: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &iter));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/olap/rowset/segment_v2/column_reader.cpp:272: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &iter));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

const TabletColumn& target_col) {
// root column use unique id, leaf column use parent_unique_id
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
const auto* root = _subcolumn_readers->get_root();
const auto* node =
target_col.has_path_info() ? _subcolumn_readers->find_exact(relative_path) : nullptr;

if (node != nullptr) {
if (node->is_leaf_node()) {
// Node contains column without any child sub columns and no corresponding sparse columns
// Direct read extracted columns
const auto* node = _subcolumn_readers->find_leaf(relative_path);
RETURN_IF_ERROR(node->data.reader->new_iterator(iterator));
} else {
// Node contains column with children columns or has correspoding sparse columns
// Create reader with hirachical data.
std::unique_ptr<ColumnIterator> sparse_iter;
if (!_sparse_column_set_in_stats.empty()) {
// Sparse column exists or reached sparse size limit, read sparse column
ColumnIterator* iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter));
sparse_iter.reset(iter);
}
// If read the full path of variant read in MERGE_ROOT, otherwise READ_DIRECT
HierarchicalDataReader::ReadType read_type =
(relative_path == root->path) ? HierarchicalDataReader::ReadType::MERGE_ROOT
: HierarchicalDataReader::ReadType::READ_DIRECT;
RETURN_IF_ERROR(HierarchicalDataReader::create(iterator, relative_path, node, root,
read_type, std::move(sparse_iter)));
}
} else {
if (_sparse_column_set_in_stats.contains(StringRef {relative_path.get_path()}) ||
_sparse_column_set_in_stats.size() >
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
// Sparse column exists or reached sparse size limit, read sparse column
ColumnIterator* inner_iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
*iterator = new SparseColumnExtractReader(relative_path.get_path(),
std::unique_ptr<ColumnIterator>(inner_iter));
} else {
// Sparse column not exists and not reached stats limit, then the target path is not exist, get a default iterator
std::unique_ptr<ColumnIterator> iter;
RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &iter));
*iterator = iter.release();
}
}
return Status::OK();
}

Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentFooterPB& footer,
uint32_t column_id, uint64_t num_rows,
io::FileReaderSPtr file_reader) {
// init sub columns
_subcolumn_readers = std::make_unique<SubcolumnColumnReaders>();
std::unordered_map<vectorized::PathInData, uint32_t, vectorized::PathInData::Hash>
column_path_to_footer_ordinal;
for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
const auto& column_pb = footer.columns(ordinal);
// column path for accessing subcolumns of variant
if (column_pb.has_column_path_info()) {
vectorized::PathInData path;
path.from_protobuf(column_pb.column_path_info());
column_path_to_footer_ordinal.emplace(path, ordinal);
}
}

const ColumnMetaPB& self_column_pb = footer.columns(column_id);
for (const ColumnMetaPB& column_pb : footer.columns()) {
if (column_pb.unique_id() != self_column_pb.unique_id()) {
continue;
}
DCHECK(column_pb.has_column_path_info());
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(
ColumnReader::create(opts, column_pb, footer.num_rows(), file_reader, &reader));
vectorized::PathInData path;
path.from_protobuf(column_pb.column_path_info());
// init sparse column
if (path.get_path() == SPARSE_COLUMN_PATH) {
RETURN_IF_ERROR(ColumnReader::create(opts, column_pb, footer.num_rows(), file_reader,
&_sparse_column_reader));
continue;
}
// init subcolumns
auto relative_path = path.copy_pop_front();
if (_subcolumn_readers->get_root() == nullptr) {
_subcolumn_readers->create_root(SubcolumnReader {nullptr, nullptr});
}
if (relative_path.empty()) {
// root column
_subcolumn_readers->get_mutable_root()->modify_to_scalar(SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
} else {
// check the root is already a leaf node
_subcolumn_readers->add(
relative_path,
SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
}
}

// init sparse column set in stats
if (self_column_pb.has_variant_statistics()) {
const auto& variant_stats = self_column_pb.variant_statistics();
for (const auto& [path, _] : variant_stats.sparse_column_non_null_size()) {
_sparse_column_set_in_stats.emplace(path.data(), path.size());
}
}
return Status::OK();
}

Status ColumnReader::create_variant(const ColumnReaderOptions& opts, const SegmentFooterPB& footer,
uint32_t column_id, uint64_t num_rows,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader) {
std::unique_ptr<VariantColumnReader> reader_local(new VariantColumnReader());
RETURN_IF_ERROR(reader_local->init(opts, footer, column_id, num_rows, file_reader));
*reader = std::move(reader_local);
return Status::OK();
}

Status ColumnReader::create(const ColumnReaderOptions& opts, const SegmentFooterPB& footer,
uint32_t column_id, uint64_t num_rows,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader) {
if ((FieldType)footer.columns(column_id).type() != FieldType::OLAP_FIELD_TYPE_VARIANT) {
return ColumnReader::create(opts, footer.columns(column_id), num_rows, file_reader, reader);
}
// create variant column reader with extracted columns info in footer
return create_variant(opts, footer, column_id, num_rows, file_reader, reader);
}

Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader) {
Expand Down Expand Up @@ -706,6 +848,10 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterat
return Status::OK();
}

Status ColumnReader::new_iterator(ColumnIterator** iterator, const TabletColumn& col) {
return new_iterator(iterator);
}

Status ColumnReader::new_iterator(ColumnIterator** iterator) {
if (is_empty()) {
*iterator = new EmptyFileColumnIterator();
Expand Down
37 changes: 36 additions & 1 deletion be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "olap/rowset/segment_v2/page_handle.h" // for PageHandle
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/parsed_page.h" // for ParsedPage
#include "olap/rowset/segment_v2/stream_reader.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "util/once.h"
Expand Down Expand Up @@ -78,6 +79,8 @@ class InvertedIndexFileReader;
class PageDecoder;
class RowRanges;
class ZoneMapIndexReader;
// struct SubcolumnReader;
// using SubcolumnColumnReaders = vectorized::SubcolumnsTree<SubcolumnReader>;

struct ColumnReaderOptions {
// whether verify checksum when read page
Expand Down Expand Up @@ -112,11 +115,16 @@ struct ColumnIteratorOptions {
// This will cache data shared by all reader
class ColumnReader : public MetadataAdder<ColumnReader> {
public:
ColumnReader() = default;
// Create an initialized ColumnReader in *reader.
// This should be a lightweight operation without I/O.
static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
static Status create(const ColumnReaderOptions& opts, const SegmentFooterPB& footer,
uint32_t column_id, uint64_t num_rows,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
static Status create_array(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
Expand All @@ -129,11 +137,16 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
static Status create_agg_state(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
static Status create_variant(const ColumnReaderOptions& opts, const SegmentFooterPB& footer,
uint32_t column_id, uint64_t num_rows,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, ALL_DICT_ENCODING };

virtual ~ColumnReader();
~ColumnReader() override;

// create a new column iterator. Client should delete returned iterator
virtual Status new_iterator(ColumnIterator** iterator, const TabletColumn& col);
Status new_iterator(ColumnIterator** iterator);
Status new_array_iterator(ColumnIterator** iterator);
Status new_struct_iterator(ColumnIterator** iterator);
Expand Down Expand Up @@ -283,6 +296,28 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
DorisCallOnce<Status> _set_dict_encoding_type_once;
};

class VariantColumnReader : public ColumnReader {
public:
VariantColumnReader() = default;

Status init(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, uint32_t column_id,
uint64_t num_rows, io::FileReaderSPtr file_reader);
Status new_iterator(ColumnIterator** iterator, const TabletColumn& col) override;

const SubcolumnColumnReaders::Node* get_reader_by_path(
const vectorized::PathInData& relative_path) const;

~VariantColumnReader() override = default;

private:
std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers;
std::unique_ptr<ColumnReader> _sparse_column_reader;
// Some sparse column record in stats, use StringRef to reduce memory usage,
// notice: make sure the ref is not released before the ColumnReader is destructed,
// used to decide whether to read from sparse column
std::unordered_set<StringRef> _sparse_column_set_in_stats;
};

// Base iterator to read one column data
class ColumnIterator {
public:
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,11 @@ VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts,
const TabletColumn* column, std::unique_ptr<Field> field)
: ColumnWriter(std::move(field), opts.meta->is_nullable()) {
_impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
};
}

Status VariantColumnWriter::init() {
return _impl->init();
}

Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
_next_rowid += num_rows;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct ColumnWriterOptions {
io::FileWriter* file_writer = nullptr;
CompressionTypePB compression_type = UNKNOWN_COMPRESSION;
RowsetWriterContext* rowset_ctx = nullptr;
// For collect segment statistics for compaction
std::vector<RowsetReaderSharedPtr> input_rs_readers;
std::string to_string() const {
std::stringstream ss;
ss << std::boolalpha << "meta=" << meta->DebugString()
Expand Down Expand Up @@ -480,7 +482,7 @@ class VariantColumnWriter : public ColumnWriter {

~VariantColumnWriter() override = default;

Status init() override { return Status::OK(); }
Status init() override;

Status append_data(const uint8_t** ptr, size_t num_rows) override;

Expand Down
Loading