Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions be/src/olap/olap_snapshot_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ OLAPStatus OlapSnapshotConverter::to_tablet_meta_pb(const OLAPHeaderMessage& ola
schema->set_bf_fpp(olap_header.bf_fpp());
schema->set_next_column_unique_id(olap_header.next_column_unique_id());


std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> _rs_version_map;
for (auto& delta : olap_header.delta()) {
RowsetMetaPB* rowset_meta = tablet_meta_pb->add_rs_metas();
Expand All @@ -86,7 +85,6 @@ OLAPStatus OlapSnapshotConverter::to_tablet_meta_pb(const OLAPHeaderMessage& ola
convert_to_rowset_meta(delta, next_id, olap_header.tablet_id(), olap_header.schema_hash(), rowset_meta);
Version rowset_version = { delta.start_version(), delta.end_version() };
_rs_version_map[rowset_version] = rowset_meta;

}

for (auto& inc_delta : olap_header.incremental_delta()) {
Expand Down Expand Up @@ -141,24 +139,6 @@ OLAPStatus OlapSnapshotConverter::convert_to_pdelta(const RowsetMetaPB& rowset_m
return OLAP_SUCCESS;
}

OLAPStatus OlapSnapshotConverter::convert_to_ppending_delta(const RowsetMetaPB& rowset_meta_pb, PPendingDelta* pending_delta) {
pending_delta->set_partition_id(rowset_meta_pb.partition_id());
pending_delta->set_transaction_id(rowset_meta_pb.txn_id());
pending_delta->set_creation_time(rowset_meta_pb.creation_time());

AlphaRowsetExtraMetaPB extra_meta_pb;
extra_meta_pb.ParseFromString(rowset_meta_pb.extra_properties());
for (auto& pending_segment_group : extra_meta_pb.pending_segment_groups()) {
PendingSegmentGroupPB* new_pending_segment_group = pending_delta->add_pending_segment_group();
*new_pending_segment_group = pending_segment_group;
}
if (rowset_meta_pb.has_delete_predicate()) {
DeletePredicatePB* delete_condition = pending_delta->mutable_delete_condition();
*delete_condition = rowset_meta_pb.delete_predicate();
}
return OLAP_SUCCESS;
}

OLAPStatus OlapSnapshotConverter::convert_to_rowset_meta(const PDelta& delta,
int64_t rowset_id, int64_t tablet_id, int32_t schema_hash, RowsetMetaPB* rowset_meta_pb) {
rowset_meta_pb->set_rowset_id(rowset_id);
Expand Down Expand Up @@ -223,14 +203,23 @@ OLAPStatus OlapSnapshotConverter::convert_to_rowset_meta(const PPendingDelta& pe
int64_t data_size = 0;
AlphaRowsetExtraMetaPB extra_meta_pb;
for (auto& pending_segment_group : pending_delta.pending_segment_group()) {
PendingSegmentGroupPB* new_pending_segment_group = extra_meta_pb.add_pending_segment_groups();
*new_pending_segment_group = pending_segment_group;
SegmentGroupPB* new_segment_group = extra_meta_pb.add_segment_groups();
new_segment_group->set_segment_group_id(pending_segment_group.pending_segment_group_id());
new_segment_group->set_num_segments(pending_segment_group.num_segments());
new_segment_group->set_index_size(0);
new_segment_group->set_data_size(0);
new_segment_group->set_num_rows(0);
for (auto& pending_zone_map : pending_segment_group.zone_maps()) {
ZoneMap* zone_map = new_segment_group->add_zone_maps();
*zone_map = pending_zone_map;
}
new_segment_group->set_empty(pending_segment_group.empty());
PUniqueId* load_id = new_segment_group->mutable_load_id();
*load_id = pending_segment_group.load_id();

if (!pending_segment_group.empty()) {
empty = false;
}
// num_rows += pending_segment_group.num_rows();
// index_size += pending_segment_group.index_size();
// data_size += pending_segment_group.data_size();
}
std::string extra_properties;
extra_meta_pb.SerializeToString(&extra_properties);
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/olap_snapshot_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class OlapSnapshotConverter {

OLAPStatus convert_to_pdelta(const RowsetMetaPB& rowset_meta_pb, PDelta* delta);

OLAPStatus convert_to_ppending_delta(const RowsetMetaPB& rowset_meta_pb, PPendingDelta* pending_delta);

OLAPStatus convert_to_rowset_meta(const PDelta& delta, int64_t rowset_id, int64_t tablet_id,
int32_t schema_hash, RowsetMetaPB* rowset_meta_pb);

Expand Down
133 changes: 33 additions & 100 deletions be/src/olap/rowset/alpha_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,32 +149,24 @@ void AlphaRowset::set_version_and_version_hash(Version version, VersionHash ver
return;
}

_is_pending_rowset = false;
AlphaRowsetMetaSharedPtr alpha_rowset_meta =
std::dynamic_pointer_cast<AlphaRowsetMeta>(_rowset_meta);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be better to clear_load_id() ?

vector<SegmentGroupPB> published_segment_groups;
alpha_rowset_meta->get_segment_groups(&published_segment_groups);
int32_t segment_group_idx = 0;
for (auto segment_group : _segment_groups) {
segment_group->set_version(version);
segment_group->set_version_hash(version_hash);
SegmentGroupPB segment_group_pb;
segment_group_pb.set_segment_group_id(segment_group->segment_group_id());
segment_group_pb.set_num_segments(segment_group->num_segments());
segment_group_pb.set_index_size(segment_group->index_size());
segment_group_pb.set_data_size(segment_group->data_size());
segment_group_pb.set_num_rows(segment_group->num_rows());
const std::vector<KeyRange>& zone_maps = segment_group->get_zone_maps();
if (zone_maps.size() > 0) {
for (size_t i = 0; i < zone_maps.size(); ++i) {
ZoneMap* new_zone_map = segment_group_pb.add_zone_maps();
new_zone_map->set_min(zone_maps.at(i).first->to_string());
new_zone_map->set_max(zone_maps.at(i).second->to_string());
new_zone_map->set_null_flag(zone_maps.at(i).first->is_null());
}
}
segment_group_pb.set_empty(segment_group->empty());
alpha_rowset_meta->add_segment_group(segment_group_pb);
segment_group->set_pending_finished();
published_segment_groups.at(segment_group_idx).clear_load_id();
++segment_group_idx;
}
alpha_rowset_meta->clear_segment_group();
for (auto& segment_group_meta : published_segment_groups) {
alpha_rowset_meta->add_segment_group(segment_group_meta);
}
alpha_rowset_meta->clear_pending_segment_group();

_is_pending_rowset = false;
}

int64_t AlphaRowset::start_version() const {
Expand Down Expand Up @@ -391,19 +383,25 @@ bool AlphaRowset::check_path(const std::string& path) {
return valid_paths.find(path) != valid_paths.end();
}

OLAPStatus AlphaRowset::_init_non_pending_segment_groups() {
OLAPStatus AlphaRowset::_init_segment_groups() {
std::vector<SegmentGroupPB> segment_group_metas;
AlphaRowsetMetaSharedPtr _alpha_rowset_meta = std::dynamic_pointer_cast<AlphaRowsetMeta>(_rowset_meta);
_alpha_rowset_meta->get_segment_groups(&segment_group_metas);
for (auto& segment_group_meta : segment_group_metas) {
Version version = _rowset_meta->version();
int64_t version_hash = _rowset_meta->version_hash();
std::shared_ptr<SegmentGroup> segment_group(new(std::nothrow) SegmentGroup(_rowset_meta->tablet_id(),
_rowset_meta->rowset_id(), _schema, _rowset_path, version, version_hash,
std::shared_ptr<SegmentGroup> segment_group;
if (_is_pending_rowset) {
segment_group.reset(new SegmentGroup(_rowset_meta->tablet_id(),
_rowset_meta->rowset_id(), _schema, _rowset_path, false, segment_group_meta.segment_group_id(),
segment_group_meta.num_segments(), true,
_rowset_meta->partition_id(), _rowset_meta->txn_id()));
} else {
segment_group.reset(new SegmentGroup(_rowset_meta->tablet_id(),
_rowset_meta->rowset_id(), _schema, _rowset_path,
_rowset_meta->version(), _rowset_meta->version_hash(),
false, segment_group_meta.segment_group_id(), segment_group_meta.num_segments()));
}
if (segment_group == nullptr) {
LOG(WARNING) << "fail to create olap segment_group. [version='" << version.first
<< "-" << version.second << "' rowset_id='" << _rowset_meta->rowset_id() << "']";
LOG(WARNING) << "fail to create olap segment_group. rowset_id='" << _rowset_meta->rowset_id();
return OLAP_ERR_CREATE_FILE_ERROR;
}
if (segment_group_meta.has_empty()) {
Expand Down Expand Up @@ -447,71 +445,6 @@ OLAPStatus AlphaRowset::_init_non_pending_segment_groups() {
return OLAP_SUCCESS;
}

OLAPStatus AlphaRowset::_init_pending_segment_groups() {
std::vector<PendingSegmentGroupPB> pending_segment_group_metas;
AlphaRowsetMetaSharedPtr _alpha_rowset_meta = std::dynamic_pointer_cast<AlphaRowsetMeta>(_rowset_meta);
_alpha_rowset_meta->get_pending_segment_groups(&pending_segment_group_metas);
for (auto& pending_segment_group_meta : pending_segment_group_metas) {
Version version = _rowset_meta->version();
int64_t txn_id = _rowset_meta->txn_id();
int64_t partition_id = _rowset_meta->partition_id();
std::shared_ptr<SegmentGroup> segment_group(new SegmentGroup(_rowset_meta->tablet_id(),
_rowset_meta->rowset_id(), _schema, _rowset_path, false, pending_segment_group_meta.pending_segment_group_id(),
pending_segment_group_meta.num_segments(), true, partition_id, txn_id));
if (segment_group == nullptr) {
LOG(WARNING) << "fail to create olap segment_group. [version='" << version.first
<< "-" << version.second << "' rowset_id='" << _rowset_meta->rowset_id() << "']";
return OLAP_ERR_CREATE_FILE_ERROR;
}
_segment_groups.push_back(segment_group);
if (pending_segment_group_meta.has_empty()) {
segment_group->set_empty(pending_segment_group_meta.empty());
}

if (pending_segment_group_meta.zone_maps_size() != 0) {
size_t zone_maps_size = pending_segment_group_meta.zone_maps_size();
size_t num_key_columns = _schema->num_key_columns();
if (num_key_columns != zone_maps_size) {
LOG(ERROR) << "column pruning size is error."
<< "zone_maps_size=" << zone_maps_size << ", "
<< "num_key_columns=" << _schema->num_key_columns();
return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
}
std::vector<std::pair<std::string, std::string>> zone_map_strings(num_key_columns);
std::vector<bool> null_vec(num_key_columns);
for (size_t j = 0; j < num_key_columns; ++j) {
const ZoneMap& zone_map = pending_segment_group_meta.zone_maps(j);
zone_map_strings[j].first = zone_map.min();
zone_map_strings[j].second = zone_map.max();
if (zone_map.has_null_flag()) {
null_vec[j] = zone_map.null_flag();
} else {
null_vec[j] = false;
}
}
OLAPStatus status = segment_group->add_zone_maps(zone_map_strings, null_vec);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "segment group add column statistics failed, status:" << status;
return status;
}
}
}
if (_is_cumulative_rowset && _segment_groups.size() > 1) {
LOG(WARNING) << "invalid segment group meta for cumulative rowset. segment group size:"
<< _segment_groups.size();
return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
}
return OLAP_SUCCESS;
}

OLAPStatus AlphaRowset::_init_segment_groups() {
if (_is_pending_rowset) {
return _init_pending_segment_groups();
} else {
return _init_non_pending_segment_groups();
}
}

std::shared_ptr<SegmentGroup> AlphaRowset::_segment_group_with_largest_size() {
std::shared_ptr<SegmentGroup> largest_segment_group = nullptr;
size_t largest_segment_group_sizes = 0;
Expand All @@ -532,25 +465,25 @@ OLAPStatus AlphaRowset::reset_sizeinfo() {
if (!is_loaded()) {
RETURN_NOT_OK(load());
}
// std::vector<PendingSegmentGroupPB> pending_segment_group_metas;
std::vector<SegmentGroupPB> segment_group_metas;
AlphaRowsetMetaSharedPtr alpha_rowset_meta = std::dynamic_pointer_cast<AlphaRowsetMeta>(_rowset_meta);
// alpha_rowset_meta->get_pending_segment_groups(&pending_segment_group_metas);
alpha_rowset_meta->get_segment_groups(&segment_group_metas);
int32_t segment_group_idx = 0;
for (auto segment_group : _segment_groups) {
alpha_rowset_meta->set_data_disk_size(alpha_rowset_meta->data_disk_size() + segment_group->data_size());
alpha_rowset_meta->set_index_disk_size(alpha_rowset_meta->index_disk_size() + segment_group->index_size());
alpha_rowset_meta->set_total_disk_size(alpha_rowset_meta->total_disk_size()
+ segment_group->index_size() + segment_group->data_size());
alpha_rowset_meta->set_num_rows(alpha_rowset_meta->num_rows() + segment_group->num_rows());
// pending_segment_group_metas.at(segment_group_idx).set_index_size(segment_group->index_size());
// pending_segment_group_metas.at(segment_group_idx).set_data_size(segment_group->data_size());
// pending_segment_group_metas.at(segment_group_idx).set_num_rows(segment_group->num_rows());
segment_group_metas.at(segment_group_idx).set_index_size(segment_group->index_size());
segment_group_metas.at(segment_group_idx).set_data_size(segment_group->data_size());
segment_group_metas.at(segment_group_idx).set_num_rows(segment_group->num_rows());
++segment_group_idx;
}
// alpha_rowset_meta->clear_pending_segment_group();
// for (auto& pending_segment_group_meta : pending_segment_group_metas) {
// alpha_rowset_meta->add_pending_segment_group(pending_segment_group_meta);
// }
alpha_rowset_meta->clear_segment_group();
for (auto& segment_group_meta : segment_group_metas) {
alpha_rowset_meta->add_segment_group(segment_group_meta);
}
return OLAP_SUCCESS;
}

Expand Down
4 changes: 0 additions & 4 deletions be/src/olap/rowset/alpha_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ class AlphaRowset : public Rowset {
private:
OLAPStatus _init_segment_groups();

OLAPStatus _init_pending_segment_groups();

OLAPStatus _init_non_pending_segment_groups();

std::shared_ptr<SegmentGroup> _segment_group_with_largest_size();

private:
Expand Down
27 changes: 2 additions & 25 deletions be/src/olap/rowset/alpha_rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,10 @@ void AlphaRowsetMeta::add_segment_group(const SegmentGroupPB& segment_group) {
_serialize_extra_meta_pb();
}

void AlphaRowsetMeta::get_pending_segment_groups(
std::vector<PendingSegmentGroupPB>* pending_segment_groups) {
for (auto& pending_segment_group : _extra_meta_pb.pending_segment_groups()) {
pending_segment_groups->push_back(pending_segment_group);
}
}

void AlphaRowsetMeta::add_pending_segment_group(const PendingSegmentGroupPB& pending_segment_group) {
for (int i = 0; i < _extra_meta_pb.pending_segment_groups_size(); i++) {
const PendingSegmentGroupPB& present_segment_group = _extra_meta_pb.pending_segment_groups(i);
if (present_segment_group.pending_segment_group_id() ==
pending_segment_group.pending_segment_group_id()) {
LOG(WARNING) << "pending segment_group already exists in meta."
<< "rowset_id:" << rowset_id()
<< ", pending_segment_group_id: " << pending_segment_group.pending_segment_group_id();
return;
}
}
PendingSegmentGroupPB* new_pending_segment_group = _extra_meta_pb.add_pending_segment_groups();
*new_pending_segment_group = pending_segment_group;
void AlphaRowsetMeta::clear_segment_group() {
_extra_meta_pb.clear_segment_groups();
_serialize_extra_meta_pb();
}

void AlphaRowsetMeta::clear_pending_segment_group() {
_extra_meta_pb.clear_pending_segment_groups();
}

void AlphaRowsetMeta::_serialize_extra_meta_pb() {
std::string extra_properties;
_extra_meta_pb.SerializeToString(&extra_properties);
Expand Down
6 changes: 1 addition & 5 deletions be/src/olap/rowset/alpha_rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ class AlphaRowsetMeta : public RowsetMeta {

void add_segment_group(const SegmentGroupPB& segment_group);

void get_pending_segment_groups(std::vector<PendingSegmentGroupPB>* pending_segment_groups);

void add_pending_segment_group(const PendingSegmentGroupPB& pending_segment_group);

void clear_pending_segment_group();
void clear_segment_group();

private:
void _serialize_extra_meta_pb();
Expand Down
Loading