Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,14 +963,8 @@ Status RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bool n
size_t row_block_size = _row_len * num_rows;

if (_memory_limitation > 0 && _tracker->consumption() + row_block_size > _memory_limitation) {
LOG(WARNING)
<< "RowBlockAllocator::alocate() memory exceeded. "
<< "m_memory_allocated=" << _tracker->consumption() << " "
<< "mem limit for schema change=" << _memory_limitation << " "
<< "You can increase the memory "
<< "by changing the Config.memory_limitation_per_thread_for_schema_change_bytes";
*row_block = nullptr;
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
}

// TODO(lijiao) : Why abandon the original m_row_block_buffer
Expand Down Expand Up @@ -1356,11 +1350,15 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
if (!_row_block_allocator->allocate(&new_row_block, ref_row_block->row_block_info().row_num,
true)) {
LOG(WARNING) << "failed to allocate RowBlock.";
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
} else {
auto st = _row_block_allocator->allocate(&new_row_block,
ref_row_block->row_block_info().row_num, true);
// if OLAP_ERR_FETCH_MEMORY_EXCEEDED == st.precise_code()
// that mean RowBlockAllocator::alocate() memory exceeded.
// But we can flush row_block_arr if row_block_arr is not empty.
// Don't return directly.
if (OLAP_ERR_MALLOC_ERROR == st.precise_code()) {
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
} else if (st) {
// do memory check for sorting, in case schema change task fail at row block sorting because of
// not doing internal sorting first
if (!_row_block_allocator->is_memory_enough_for_sorting(
Expand All @@ -1375,8 +1373,11 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
if (new_row_block == nullptr) {
if (row_block_arr.empty()) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< "memory_limitation=" << _memory_limitation;
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
<< "memory_limitation=" << _memory_limitation
<< "You can increase the memory "
<< "by changing the "
"Config.memory_limitation_per_thread_for_schema_change_bytes";
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
}

// enter here while memory limitation is reached.
Expand Down