Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6
if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
error_msgs.push_back("The data quality does not satisfy, please check your data. ");
}
status = Status::DataQualityError("The data quality does not satisfy");
status = sc_status;
} else {
status = Status::OK();
}
Expand Down Expand Up @@ -619,7 +619,7 @@ void TaskWorkerPool::_push_worker_thread_callback() {
agent_task_req = _tasks[index];
push_req = agent_task_req.push_req;
_tasks.erase(_tasks.begin() + index);
} while (0);
} while (false);

if (index < 0) {
// there is no high priority task in queue
Expand Down Expand Up @@ -1739,7 +1739,7 @@ void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest& agent_t
if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
error_msgs.push_back("The data quality does not satisfy, please check your data. ");
}
status = Status::DataQualityError("The data quality does not satisfy");
status = sc_status;
} else {
status = Status::OK();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ CONF_Bool(enable_low_cardinality_optimize, "true");
// whether disable automatic compaction task
CONF_mBool(disable_auto_compaction, "false");
// whether enable vectorized compaction
CONF_Bool(enable_vectorized_compaction, "true");
CONF_Bool(enable_vectorized_compaction, "false");
// whether enable vectorized schema change
CONF_Bool(enable_vectorized_alter_table, "true");
// check the configuration of auto compaction in seconds when auto compaction disabled
CONF_mInt32(check_auto_compaction_interval_seconds, "5");

Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <gen_cpp/Exprs_types.h>

#include <memory>

#include "olap/tablet_schema.h"
namespace doris {

class WrapperField;
Expand All @@ -36,8 +38,9 @@ struct ColumnMapping {
// materialize view transform function used in schema change
std::string materialized_function;
std::shared_ptr<TExpr> expr;
const TabletColumn* new_column;
};

using SchemaMapping = std::vector<ColumnMapping>;

} // namespace doris
} // namespace doris
32 changes: 14 additions & 18 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <iostream>
#include <sstream>

#include "common/object_pool.h"
#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "olap/row.h"
Expand All @@ -32,11 +33,6 @@
#include "olap/tablet.h"
#include "runtime/exec_env.h"

using std::list;
using std::map;
using std::string;
using std::vector;

namespace doris {

// Process push command, the main logical is as follows:
Expand All @@ -60,6 +56,9 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP

Status res = Status::OK();
_request = request;

DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl);

std::vector<TabletVars> tablet_vars(1);
tablet_vars[0].tablet = tablet;
res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars, tablet_info_vec);
Expand Down Expand Up @@ -315,16 +314,15 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_
// 5. Convert data for schema change tables
VLOG_TRACE << "load to related tables of schema_change if possible.";
if (new_tablet != nullptr) {
auto schema_change_handler = SchemaChangeHandler::instance();
res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset,
new_rowset);
res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset,
new_rowset, *_desc_tbl);
if (!res.ok()) {
LOG(WARNING) << "failed to change schema version for delta."
<< "[res=" << res << " new_tablet='" << new_tablet->full_name()
<< "']";
}
}
} while (0);
} while (false);

VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name()
<< ", processed_rows" << num_rows;
Expand Down Expand Up @@ -456,16 +454,15 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab
// 7. Convert data for schema change tables
VLOG_TRACE << "load to related tables of schema_change if possible.";
if (new_tablet != nullptr) {
auto schema_change_handler = SchemaChangeHandler::instance();
res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset,
new_rowset);
res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset,
new_rowset, *_desc_tbl);
if (!res.ok()) {
LOG(WARNING) << "failed to change schema version for delta."
<< "[res=" << res << " new_tablet='" << new_tablet->full_name()
<< "']";
}
}
} while (0);
} while (false);

SAFE_DELETE(reader);
VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name()
Expand Down Expand Up @@ -502,7 +499,7 @@ IBinaryReader* IBinaryReader::create(bool need_decompress) {
return reader;
}

BinaryReader::BinaryReader() : IBinaryReader(), _row_buf(nullptr), _row_buf_size(0) {}
BinaryReader::BinaryReader() : _row_buf(nullptr), _row_buf_size(0) {}

Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {
Status res = Status::OK();
Expand All @@ -527,7 +524,7 @@ Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {

_tablet = tablet;
_ready = true;
} while (0);
} while (false);

if (!res.ok()) {
SAFE_DELETE_ARRAY(_row_buf);
Expand Down Expand Up @@ -637,8 +634,7 @@ Status BinaryReader::next(RowCursor* row) {
}

LzoBinaryReader::LzoBinaryReader()
: IBinaryReader(),
_row_buf(nullptr),
: _row_buf(nullptr),
_row_compressed_buf(nullptr),
_row_info_buf(nullptr),
_max_row_num(0),
Expand Down Expand Up @@ -670,7 +666,7 @@ Status LzoBinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {

_tablet = tablet;
_ready = true;
} while (0);
} while (false);

if (!res.ok()) {
SAFE_DELETE_ARRAY(_row_info_buf);
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class PushHandler {
// mainly tablet_id, version and delta file path
TPushReq _request;

ObjectPool _pool;
DescriptorTbl* _desc_tbl = nullptr;

int64_t _write_bytes = 0;
int64_t _write_rows = 0;
DISALLOW_COPY_AND_ASSIGN(PushHandler);
Expand Down
34 changes: 21 additions & 13 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
#include <charconv>
#include <unordered_set>

#include "common/status.h"
#include "olap/bloom_filter_predicate.h"
#include "olap/collect_iterator.h"
#include "olap/comparison_predicate.h"
#include "olap/in_list_predicate.h"
#include "olap/null_predicate.h"
#include "olap/olap_common.h"
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
Expand Down Expand Up @@ -105,14 +107,14 @@ TabletReader::~TabletReader() {
}
}

Status TabletReader::init(const ReaderParams& read_params) {
Status TabletReader::init(const ReaderParams& read_params, bool is_alter_table) {
#ifndef NDEBUG
_predicate_mem_pool.reset(new MemPool("TabletReader:" + read_params.tablet->full_name()));
#else
_predicate_mem_pool.reset(new MemPool());
#endif

Status res = _init_params(read_params);
Status res = _init_params(read_params, is_alter_table);
if (!res.ok()) {
LOG(WARNING) << "fail to init reader when init params. res:" << res
<< ", tablet_id:" << read_params.tablet->tablet_id()
Expand Down Expand Up @@ -232,7 +234,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
return Status::OK();
}

Status TabletReader::_init_params(const ReaderParams& read_params) {
Status TabletReader::_init_params(const ReaderParams& read_params, bool is_alter_table) {
read_params.check_validation();

_direct_mode = read_params.direct_mode;
Expand All @@ -244,7 +246,7 @@ Status TabletReader::_init_params(const ReaderParams& read_params) {
_init_conditions_param(read_params);
_init_load_bf_columns(read_params);

Status res = _init_delete_condition(read_params);
Status res = _init_delete_condition(read_params, is_alter_table);
if (!res.ok()) {
LOG(WARNING) << "fail to init delete param. res = " << res;
return res;
Expand Down Expand Up @@ -316,7 +318,8 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) {
}
VLOG_NOTICE << "return column is empty, using full column as default.";
} else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION ||
read_params.reader_type == READER_BASE_COMPACTION) &&
read_params.reader_type == READER_BASE_COMPACTION ||
read_params.reader_type == READER_ALTER_TABLE) &&
!read_params.return_columns.empty()) {
_return_columns = read_params.return_columns;
for (auto id : read_params.return_columns) {
Expand Down Expand Up @@ -833,24 +836,29 @@ void TabletReader::_init_load_bf_columns(const ReaderParams& read_params, Condit
}
}

Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
Status TabletReader::_init_delete_condition(const ReaderParams& read_params, bool is_alter_table) {
if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
return Status::OK();
}
Status ret;
{
std::shared_lock rdlock(_tablet->get_header_lock());
ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
read_params.version.second, this);
}
// Only BASE_COMPACTION need set filter_delete = true
// other reader type:
// QUERY will filter the row in query layer to keep right result use where clause.
// CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
if (read_params.reader_type == READER_BASE_COMPACTION) {
_filter_delete = true;
}
return ret;

auto delete_init = [&]() -> Status {
return _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
read_params.version.second, this);
};

if (is_alter_table) {
return delete_init();
}

std::shared_lock rdlock(_tablet->get_header_lock());
return delete_init();
}

} // namespace doris
8 changes: 4 additions & 4 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TabletReader {
virtual ~TabletReader();

// Initialize TabletReader with tablet, data version and fetch range.
virtual Status init(const ReaderParams& read_params);
virtual Status init(const ReaderParams& read_params, bool is_alter_table = false);

// Read next row with aggregation.
// Return OLAP_SUCCESS and set `*eof` to false when next row is read into `row_cursor`.
Expand All @@ -131,7 +131,7 @@ class TabletReader {

uint64_t filtered_rows() const {
return _stats.rows_del_filtered + _stats.rows_conditions_filtered +
_stats.rows_vec_del_cond_filtered;
_stats.rows_vec_del_cond_filtered + _stats.rows_vec_cond_filtered;
}

void set_batch_size(int batch_size) { _batch_size = batch_size; }
Expand All @@ -144,7 +144,7 @@ class TabletReader {
friend class vectorized::VCollectIterator;
friend class DeleteHandler;

Status _init_params(const ReaderParams& read_params);
Status _init_params(const ReaderParams& read_params, bool lock_header);

Status _capture_rs_readers(const ReaderParams& read_params,
std::vector<RowsetReaderSharedPtr>* valid_rs_readers);
Expand Down Expand Up @@ -173,7 +173,7 @@ class TabletReader {
ColumnPredicate* _parse_to_predicate(
const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter);

Status _init_delete_condition(const ReaderParams& read_params);
Status _init_delete_condition(const ReaderParams& read_params, bool lock_header);

Status _init_return_columns(const ReaderParams& read_params);
void _init_seek_columns();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/row_block2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ Status RowBlockV2::_append_data_to_column(const ColumnVectorBatch* batch, size_t
}

Status RowBlockV2::convert_to_vec_block(vectorized::Block* block) {
DCHECK_LE(block->columns(), _schema.column_ids().size());
DCHECK(block->columns() <= _schema.column_ids().size());
for (int i = 0; i < block->columns(); ++i) {
auto cid = _schema.column_ids()[i];
auto column = (*std::move(block->get_by_position(i).column)).assume_mutable();
Expand Down Expand Up @@ -642,7 +642,7 @@ std::string RowBlockRow::debug_string() const {
ss << "]";
return ss.str();
}
std::string RowBlockV2::debug_string() {
std::string RowBlockV2::debug_string() const {
std::stringstream ss;
for (int i = 0; i < num_rows(); ++i) {
ss << row(i).debug_string();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/row_block2.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class RowBlockV2 {
}
_delete_state = delete_state;
}
std::string debug_string();
std::string debug_string() const;

private:
Status _copy_data_to_column(int cid, vectorized::MutableColumnPtr& mutable_column_ptr);
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class BetaRowsetReader : public RowsetReader {

// Return the total number of filtered rows, will be used for validation of schema change
int64_t filtered_rows() override {
return _stats->rows_del_filtered + _stats->rows_conditions_filtered;
return _stats->rows_del_filtered + _stats->rows_conditions_filtered +
_stats->rows_vec_del_cond_filtered + _stats->rows_vec_cond_filtered;
}

RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
Expand Down
Loading