Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class HierarchicalDataReader : public ColumnIterator {

// copy container variant to dst variant, todo avoid copy
variant.insert_range_from(container_variant, 0, nrows);
variant.set_num_rows(nrows);
// variant.set_num_rows(nrows);
_rows_read += nrows;
variant.finalize();
#ifndef NDEBUG
Expand Down Expand Up @@ -180,7 +180,7 @@ class ExtractReader : public ColumnIterator {
private:
Status extract_to(vectorized::MutableColumnPtr& dst, size_t nrows);

const TabletColumn& _col;
TabletColumn _col;
// may shared among different column iterators
std::unique_ptr<StreamReader> _root_reader;
};
Expand Down
75 changes: 75 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@
#include "util/slice.h" // Slice
#include "vec/columns/column.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_object.h"
#include "vec/json/path_in_data.h"
#include "vec/olap/vgeneric_iterators.h"

namespace doris {
Expand Down Expand Up @@ -672,5 +674,78 @@ bool Segment::is_same_file_col_type_with_expected(int32_t cid, const Schema& sch
return (!file_column_type) || (file_column_type && file_column_type->equals(*expected_type));
}

std::shared_ptr<const vectorized::IDataType>
Segment::get_data_type_of(vectorized::PathInData path,
bool ignore_children) const {
auto node = _sub_column_tree.find_leaf(path);
if (node) {
if (ignore_children || node->children.empty()) {
return node->data.file_column_type;
}
}
// it contains children or column missing in storage, so treat it as variant
return std::make_shared<vectorized::DataTypeObject>();
}

Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
uint32_t row_id, vectorized::MutableColumnPtr& result,
OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint) {
StorageReadOptions storage_read_opt;
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
io::IOContext io_ctx;
io_ctx.reader_type = ReaderType::READER_QUERY;
segment_v2::ColumnIteratorOptions opt {
.file_reader = file_reader().get(),
.stats = &stats,
.use_page_cache = !config::disable_storage_page_cache,
.io_ctx = io_ctx,
};
std::vector<segment_v2::rowid_t> single_row_loc {row_id};
if (!slot->column_paths().empty()) {
vectorized::PathInData path(schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
slot->column_paths());
auto storage_type = get_data_type_of(path, false);
vectorized::MutableColumnPtr file_storage_column = storage_type->create_column();
DCHECK(storage_type != nullptr);
TabletColumn column = TabletColumn::create_materialized_variant_column(
schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(),
slot->col_unique_id());
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(
new_column_iterator(column, &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(
iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
// iterator_hint.reset(nullptr);
// Get it's inner field, for JSONB case
vectorized::Field field = remove_nullable(storage_type)->get_default();
file_storage_column->get(0, field);
result->insert(field);
} else {
int index = -1;
if (slot->col_unique_id() >= 0) {
index = schema.field_index(slot->col_name());
} else {
index = schema.field_index(slot->col_name());
}
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name()
<< ", field_name_to_index=" << schema.get_all_field_names();
return Status::InternalError(ss.str());
}
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(
new_column_iterator(schema.column(index), &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result));
}
return Status::OK();
}

} // namespace segment_v2
} // namespace doris
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@
#include "olap/rowset/segment_v2/page_handle.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "util/once.h"
#include "util/slice.h"
#include "vec/columns/column.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/json/path_in_data.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -124,6 +129,10 @@ class Segment : public std::enable_shared_from_this<Segment> {

Status read_key_by_rowid(uint32_t row_id, std::string* key);

Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, uint32_t row_id,
vectorized::MutableColumnPtr& result, OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint);

Status load_index();

Status load_pk_index_and_bf();
Expand All @@ -147,6 +156,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
std::shared_ptr<const vectorized::IDataType> get_data_type_of(const Field& filed,
bool ignore_children) const;

std::shared_ptr<const vectorized::IDataType> get_data_type_of(vectorized::PathInData path,
bool ignore_children) const;
// If column in segment is the same type in schema
bool is_same_file_col_type_with_expected(int32_t cid, const Schema& schema,
bool ignore_children) const;
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,20 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
}
}

TabletColumn TabletColumn::create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id) {
TabletColumn subcol;
subcol.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcol.set_is_nullable(true);
subcol.set_unique_id(-1);
subcol.set_parent_unique_id(parent_unique_id);
vectorized::PathInData path(root, paths);
subcol.set_path_info(path);
subcol.set_name(path.get_path());
return subcol;
}

void TabletColumn::to_schema_pb(ColumnPB* column) const {
column->set_unique_id(_unique_id);
column->set_name(_col_name);
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ class TabletColumn {
_type == FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE ||
_type == FieldType::OLAP_FIELD_TYPE_AGG_STATE;
}
// Such columns are not exist in frontend schema info, so we need to
// add them into tablet_schema for later column indexing.
static TabletColumn create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id);
bool has_default_value() const { return _has_default_value; }
std::string default_value() const { return _default_value; }
size_t length() const { return _length; }
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_field_idx(-1),
_is_materialized(pdesc.is_materialized()),
_is_key(pdesc.is_key()),
_need_materialize(true) {}
_need_materialize(true),
_column_paths(pdesc.column_paths().begin(), pdesc.column_paths().end()) {}

void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_id(_id);
Expand All @@ -100,6 +101,9 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_col_unique_id(_col_unique_id);
pslot->set_is_key(_is_key);
pslot->set_col_type(_col_type);
for (const std::string& path : _column_paths) {
pslot->add_column_paths(path);
}
}

vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
Expand Down
90 changes: 62 additions & 28 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <butil/errno.h>
#include <butil/iobuf.h>
#include <fcntl.h>
#include <fmt/core.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
Expand All @@ -46,6 +47,7 @@
#include <set>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

Expand All @@ -69,6 +71,7 @@
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "olap/segment_loader.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
Expand Down Expand Up @@ -1596,6 +1599,38 @@ auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
return res;
}

struct IteratorKey {
int64_t tablet_id;
RowsetId rowset_id;
uint64_t segment_id;
int slot_id;

// unordered map std::equal_to
bool operator==(const IteratorKey& rhs) const {
return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
segment_id == rhs.segment_id && slot_id == rhs.slot_id;
}
};

struct HashOfIteratorKey {
size_t operator()(const IteratorKey& key) const {
size_t seed = 0;
seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi), seed);
seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi), seed);
seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo), seed);
seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
seed = HashUtil::hash64(&key.slot_id, sizeof(key.slot_id), seed);
return seed;
}
};

struct IteratorItem {
std::unique_ptr<ColumnIterator> iterator;
// for holding the reference of segment to avoid use after release
SegmentSharedPtr segment;
};

Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
PMultiGetResponse* response) {
OlapReaderStatistics stats;
Expand All @@ -1620,6 +1655,7 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
full_read_schema.append_column(TabletColumn(column_pb));
}

std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> iterator_map;
// read row by row
for (size_t i = 0; i < request.row_locs_size(); ++i) {
const auto& row_loc = request.row_locs(i);
Expand Down Expand Up @@ -1658,8 +1694,8 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
},
&acquire_segments_ms));
// find segment
auto it = std::find_if(segment_cache.get_segments().begin(),
segment_cache.get_segments().end(),
auto it = std::find_if(segment_cache.get_segments().cbegin(),
segment_cache.get_segments().cend(),
[&row_loc](const segment_v2::SegmentSharedPtr& seg) {
return seg->id() == row_loc.segment_id();
});
Expand Down Expand Up @@ -1687,33 +1723,28 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
if (result_block.is_empty_column()) {
result_block = vectorized::Block(desc.slots(), request.row_locs().size());
}
VLOG_DEBUG << "Read row location "
<< fmt::format("{}, {}, {}, {}", row_location.tablet_id,
row_location.row_location.rowset_id.to_string(),
row_location.row_location.segment_id,
row_location.row_location.row_id);
for (int x = 0; x < desc.slots().size(); ++x) {
int index = -1;
if (desc.slots()[x]->col_unique_id() >= 0) {
// light sc enabled
index = full_read_schema.field_index(desc.slots()[x]->col_unique_id());
} else {
index = full_read_schema.field_index(desc.slots()[x]->col_name());
}
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << desc.slots()[x]->col_name()
<< ", field_name_to_index=" << full_read_schema.get_all_field_names();
return Status::InternalError(ss.str());
}
std::unique_ptr<segment_v2::ColumnIterator> column_iterator;
auto row_id = static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
vectorized::MutableColumnPtr column =
result_block.get_by_position(x).column->assume_mutable();
RETURN_IF_ERROR(
segment->new_column_iterator(full_read_schema.column(index), &column_iterator));
segment_v2::ColumnIteratorOptions opt;
opt.file_reader = segment->file_reader().get();
opt.stats = &stats;
opt.use_page_cache = !config::disable_storage_page_cache;
column_iterator->init(opt);
std::vector<segment_v2::rowid_t> single_row_loc {
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
RETURN_IF_ERROR(column_iterator->read_by_rowids(single_row_loc.data(), 1, column));
IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
.rowset_id = rowset_id,
.segment_id = row_loc.segment_id(),
.slot_id = desc.slots()[x]->id()};
IteratorItem& iterator_item = iterator_map[iterator_key];
if (iterator_item.segment == nullptr) {
// hold the reference
iterator_map[iterator_key].segment = segment;
}
segment = iterator_item.segment;
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema, desc.slots()[x],
row_id, column, stats,
iterator_item.iterator));
}
}
// serialize block if not empty
Expand All @@ -1733,11 +1764,13 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
"hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
"io_latency:{}ns, "
"uncompressed_bytes_read:{},"
"bytes_read:{},"
"acquire_tablet_ms:{}, acquire_rowsets_ms:{}, acquire_segments_ms:{}, "
"lookup_row_data_ms:{}",
stats.cached_pages_num, stats.total_pages_num, stats.compressed_bytes_read,
stats.io_ns, stats.uncompressed_bytes_read, acquire_tablet_ms,
acquire_rowsets_ms, acquire_segments_ms, lookup_row_data_ms);
stats.io_ns, stats.uncompressed_bytes_read, stats.bytes_read,
acquire_tablet_ms, acquire_rowsets_ms, acquire_segments_ms,
lookup_row_data_ms);
return Status::OK();
}

Expand All @@ -1746,6 +1779,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
PMultiGetResponse* response,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([request, response, done, this]() {
signal::set_signal_task_id(request->query_id());
// multi get data by rowid
MonotonicStopWatch watch;
watch.start();
Expand Down
9 changes: 5 additions & 4 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,11 +642,12 @@ void ColumnObject::for_each_subcolumn(ColumnCallback callback) {
}

void ColumnObject::insert_from(const IColumn& src, size_t n) {
const auto& src_v = assert_cast<const ColumnObject&>(src);
const auto* src_v = check_and_get_column<ColumnObject>(src);
// optimize when src and this column are scalar variant, since try_insert is inefficiency
if (src_v.is_scalar_variant() && is_scalar_variant() &&
src_v.get_root_type()->equals(*get_root_type()) && src_v.is_finalized() && is_finalized()) {
assert_cast<ColumnNullable&>(*get_root()).insert_from(*src_v.get_root(), n);
if (src_v != nullptr && src_v->is_scalar_variant() && is_scalar_variant() &&
src_v->get_root_type()->equals(*get_root_type()) && src_v->is_finalized() &&
is_finalized()) {
assert_cast<ColumnNullable&>(*get_root()).insert_from(*src_v->get_root(), n);
++num_rows;
return;
}
Expand Down
Loading