Skip to content

Commit

Permalink
response to the review 2
Browse files Browse the repository at this point in the history
Signed-off-by: freemandealer <freeman.zhang1992@gmail.com>
  • Loading branch information
freemandealer committed Nov 1, 2022
1 parent 557e26c commit 8a6d425
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 88 deletions.
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,6 @@ CONF_mInt32(quick_compaction_max_threads, "10");
// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");

// This config can be set to limit thread number in smallcompaction thread pool.
CONF_mInt32(quick_compaction_max_threads, "10");

// This config can be set to limit thread number in segcompaction thread pool.
CONF_mInt32(seg_compaction_max_threads, "10");

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {

Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
SegCompactionCandidatesSharedPtr segments) {
writer->segcompaction(segments);
writer->compact_segments(segments);
// return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
return Status::OK();
}
Expand Down
71 changes: 45 additions & 26 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
auto s = merge_itr->init(read_options);
if (!s.ok()) {
LOG(WARNING) << "failed to init iterator: " << s.to_string();
for (auto& itr : iterators) {
delete itr;
}
return nullptr;
}

Expand All @@ -163,7 +166,7 @@ std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompacti
status = _create_segment_writer_for_segcompaction(&writer, begin, end);
if (status != Status::OK() || writer == nullptr) {
LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
<< " path:" << writer->get_data_dir()->path();
<< " path:" << writer->get_data_dir()->path() << " status:" << status;
return nullptr;
} else {
return writer;
Expand Down Expand Up @@ -191,9 +194,11 @@ Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end)
auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir,
_context.rowset_id, begin, end);
auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id,
_num_segcompacted++);
_num_segcompacted++);
ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
if (ret) {
LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path
<< ". ret:" << ret << " errno:" << errno;
return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
}
return Status::OK();
Expand All @@ -208,9 +213,9 @@ Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
auto src_seg_path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id);
auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id,
_num_segcompacted);
LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to "
<< dst_seg_path;
_num_segcompacted);
VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to "
<< dst_seg_path;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
Expand All @@ -223,15 +228,16 @@ Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
++_num_segcompacted;
ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
if (ret) {
LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path;
LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path
<< ". ret:" << ret << " errno:" << errno;
return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
}
return Status::OK();
}

void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
uint64_t end) {
LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
for (int i = begin; i <= end; ++i) {
_segid_statistics_map.erase(i);
}
Expand Down Expand Up @@ -265,13 +271,13 @@ Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics
return Status::OK();
}

Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segments) {
Status BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
ThreadContext::TaskType::COMPACTION);
// throttle segcompaction task if memory depleted.
if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
LOG(WARNING) << "skip segcompaction due to memory shortage";
return Status::MemoryLimitExceeded("skip segcompaction due to memory shortage");
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
}
uint64_t begin = (*(segments->begin()))->id();
uint64_t end = (*(segments->end() - 1))->id();
Expand Down Expand Up @@ -321,6 +327,11 @@ Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segm
_clear_statistics_for_deleting_segments_unsafe(begin, end);
}
RETURN_NOT_OK(_flush_segment_writer(&writer));

if (_segcompaction_file_writer != nullptr) {
_segcompaction_file_writer->close();
}

RETURN_NOT_OK(_delete_original_segments(begin, end));
RETURN_NOT_OK(_rename_compacted_segments(begin, end));

Expand All @@ -343,11 +354,12 @@ Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segm
return Status::OK();
}

void BetaRowsetWriter::segcompaction(SegCompactionCandidatesSharedPtr segments) {
Status status = _do_segcompaction(segments);
void BetaRowsetWriter::compact_segments(SegCompactionCandidatesSharedPtr segments) {
Status status = _do_compact_segments(segments);
if (!status.ok()) {
int16_t errcode = status.precise_code();
switch (errcode) {
case OLAP_ERR_FETCH_MEMORY_EXCEEDED:
case OLAP_ERR_SEGCOMPACTION_INIT_READER:
case OLAP_ERR_SEGCOMPACTION_INIT_WRITER:
LOG(WARNING) << "segcompaction failed, try next time:" << status;
Expand All @@ -359,8 +371,11 @@ void BetaRowsetWriter::segcompaction(SegCompactionCandidatesSharedPtr segments)
}
}
DCHECK_EQ(_is_doing_segcompaction, true);
_is_doing_segcompaction = false;
_segcompacting_cond.notify_all();
{
std::lock_guard lk(_is_doing_segcompaction_lock);
_is_doing_segcompaction = false;
_segcompacting_cond.notify_all();
}
}

Status BetaRowsetWriter::_load_noncompacted_segments(
Expand Down Expand Up @@ -402,8 +417,8 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
for (auto& segment : all_segments) {
ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]";
}
LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
<< " list of segments:" << ss_all.str();
VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size()
<< " list of segments:" << ss_all.str();

bool is_terminated_by_big = false;
bool let_big_terminate = false;
Expand All @@ -430,7 +445,7 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
return Status::OK();
}
if (s == 1) { // poor bachelor, let it go
LOG(INFO) << "only one candidate segment";
VLOG_DEBUG << "only one candidate segment";
RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
segments->clear();
return Status::OK();
Expand All @@ -439,14 +454,14 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
for (auto& segment : (*segments.get())) {
ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]";
}
LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str();
VLOG_DEBUG << "candidate segments num:" << s << " list of candidates:" << ss.str();
return Status::OK();
}

Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments,
bool is_last) {
if (is_last) {
LOG(INFO) << "segcompaction last few segments";
VLOG_DEBUG << "segcompaction last few segments";
// currently we only rename remaining segments to reduce wait time
// so that transaction can be committed ASAP
RETURN_NOT_OK(_load_noncompacted_segments(segments.get()));
Expand All @@ -461,7 +476,7 @@ Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSh
}

bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
std::lock_guard<std::mutex> l(_is_doing_segcompaction_lock);
if (!_is_doing_segcompaction) {
_is_doing_segcompaction = true;
return true;
Expand Down Expand Up @@ -491,7 +506,11 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
}
}
}
_is_doing_segcompaction = false;
{
std::lock_guard lk(_is_doing_segcompaction_lock);
_is_doing_segcompaction = false;
_segcompacting_cond.notify_all();
}
return status;
}

Expand Down Expand Up @@ -700,7 +719,7 @@ RowsetSharedPtr BetaRowsetWriter::build() {
Status status;
status = _wait_flying_segcompaction();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;
LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status;
return nullptr;
}
status = _segcompaction_ramaining_if_necessary();
Expand All @@ -710,7 +729,7 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
status = _wait_flying_segcompaction();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;
LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status;
return nullptr;
}

Expand All @@ -731,8 +750,8 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}

RowsetSharedPtr rowset;
status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir,
_rowset_meta, &rowset);
status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
&rowset);
if (!status.ok()) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
return nullptr;
Expand Down Expand Up @@ -908,8 +927,8 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
_segid_statistics_map.emplace(segid, segstat);
}
LOG(INFO) << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
<< " data_size:" << segment_size << " index_size:" << index_size;
VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
<< " data_size:" << segment_size << " index_size:" << index_size;

writer->reset();
if (flush_size) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BetaRowsetWriter : public RowsetWriter {
return Status::OK();
}

void segcompaction(SegCompactionCandidatesSharedPtr segments);
void compact_segments(SegCompactionCandidatesSharedPtr segments);

private:
template <typename RowType>
Expand Down Expand Up @@ -118,7 +118,7 @@ class BetaRowsetWriter : public RowsetWriter {
uint64_t row_count, uint64_t begin, uint64_t end);
bool _check_and_set_is_doing_segcompaction();

Status _do_segcompaction(SegCompactionCandidatesSharedPtr segments);
Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments);

private:
RowsetWriterContext _context;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
#include "util/countdown_latch.h"
#include "util/thread.h"
#include "util/threadpool.h"
//#include "olap/rowset/beta_rowset_writer.h"

namespace doris {

Expand Down
55 changes: 1 addition & 54 deletions be/src/vec/olap/vgeneric_iterators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,58 +82,6 @@ Status VStatisticsIterator::next_batch(Block* block) {
return Status::EndOfFile("End of VStatisticsIterator");
}

VStatisticsIterator::~VStatisticsIterator() {
for (auto& pair : _column_iterators_map) {
delete pair.second;
}
}

Status VStatisticsIterator::init(const StorageReadOptions& opts) {
if (!_init) {
_push_down_agg_type_opt = opts.push_down_agg_type_opt;

for (size_t i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
auto unique_id = _schema.column(cid)->unique_id();
if (_column_iterators_map.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_column_iterator(
opts.tablet_schema->column(cid), &_column_iterators_map[unique_id]));
}
_column_iterators.push_back(_column_iterators_map[unique_id]);
}

_target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : _segment->num_rows();
_init = true;
}

return Status::OK();
}

Status VStatisticsIterator::next_batch(Block* block) {
DCHECK(block->columns() == _column_iterators.size());
if (_output_rows < _target_rows) {
block->clear_column_data();
auto columns = block->mutate_columns();

size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX
? 2
: std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
if (_push_down_agg_type_opt == TPushAggOp::COUNT) {
size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT);
for (int i = 0; i < block->columns(); ++i) {
columns[i]->resize(size);
}
} else {
for (int i = 0; i < block->columns(); ++i) {
_column_iterators[i]->next_batch_of_zone_map(&size, columns[i]);
}
}
_output_rows += size;
return Status::OK();
}
return Status::EndOfFile("End of VStatisticsIterator");
}

Status VMergeIteratorContext::block_reset(const std::shared_ptr<Block>& block) {
if (!*block) {
const Schema& schema = _iter->schema();
Expand All @@ -146,8 +94,7 @@ Status VMergeIteratorContext::block_reset(const std::shared_ptr<Block>& block) {
}
auto column = data_type->create_column();
column->reserve(_block_row_max);
block->insert(
ColumnWithTypeAndName(std::move(column), data_type, column_desc->name()));
block->insert(ColumnWithTypeAndName(std::move(column), data_type, column_desc->name()));
}
} else {
block->clear_column_data();
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/segcompaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class SegCompactionTest : public testing::Test {
rowset_writer_context->tablet_schema_hash = 1111;
rowset_writer_context->partition_id = 10;
rowset_writer_context->rowset_type = BETA_ROWSET;
rowset_writer_context->tablet_path = lTestDir;
rowset_writer_context->rowset_dir = lTestDir;
rowset_writer_context->rowset_state = VISIBLE;
rowset_writer_context->tablet_schema = tablet_schema;
rowset_writer_context->version.first = 10;
Expand Down

0 comments on commit 8a6d425

Please sign in to comment.