Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] check and handle the error status for functions (#31463) #31466

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ void run_compaction_task(const std::shared_ptr<CompactionTaskRequest>& agent_tas
for (auto tablet_id : compaction_req.tablet_ids) {
EngineManualCompactionTask engine_task(GlobalEnv::GetInstance()->compaction_mem_tracker(), tablet_id,
compaction_req.is_base_compaction);
StorageEngine::instance()->execute_task(&engine_task);
(void)StorageEngine::instance()->execute_task(&engine_task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the usage of (void)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI will enable [[nodiscard]] check for Status return value soon. We must handle all status return in the code explicitly. (void) Means that it is safe to ignore the status and prevent the warning from the complier.

}

task_status.__set_status_code(status_code);
Expand Down
7 changes: 4 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ void* PushTaskWorkerPool::_worker_thread_callback(void* arg_this) {

EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req->signature, &status,
GlobalEnv::GetInstance()->load_mem_tracker());
StorageEngine::instance()->execute_task(&engine_task);
// EngineBatchLoadTask execute always return OK
(void)(StorageEngine::instance()->execute_task(&engine_task));

if (status == STARROCKS_PUSH_HAD_LOADED) {
// remove the task and not return to fe
Expand Down Expand Up @@ -432,10 +433,10 @@ void* DeleteTaskWorkerPool::_worker_thread_callback(void* arg_this) {
LOG(INFO) << "get delete push task. signature: " << agent_task_req->signature << " priority: " << priority
<< " push_type: " << push_req.push_type;
std::vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req->signature, &status,
GlobalEnv::GetInstance()->load_mem_tracker());
StorageEngine::instance()->execute_task(&engine_task);
// EngineBatchLoadTask execute always return OK
(void)(StorageEngine::instance()->execute_task(&engine_task));

if (status == STARROCKS_PUSH_HAD_LOADED) {
// remove the task and not return to fe
Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ AgentStatus MasterServerClient::finish_task(const TFinishTaskRequest& request, T
client->finishTask(*result, request);
}
} catch (TException& e) {
client.reopen(config::thrift_rpc_timeout_ms);
(void)client.reopen(config::thrift_rpc_timeout_ms);
LOG(WARNING) << "Fail to finish_task. "
<< "host=" << network_address.hostname << ", port=" << network_address.port
<< ", error=" << e.what();
Expand Down Expand Up @@ -122,7 +122,7 @@ AgentStatus MasterServerClient::report(const TReportRequest& request, TMasterRes
}
}
} catch (TException& e) {
client.reopen(config::thrift_rpc_timeout_ms);
(void)client.reopen(config::thrift_rpc_timeout_ms);
LOG(WARNING) << "Fail to report to master. "
<< "host=" << network_address.hostname << ", port=" << network_address.port
<< ", code=" << client_status.code();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ Status JsonReader::open() {
}

JsonReader::~JsonReader() {
close();
(void)close();
}

Status JsonReader::close() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/local_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ LocalFileWriter::LocalFileWriter(std::string path, int64_t start_offset)
: _path(std::move(path)), _start_offset(start_offset), _fp(nullptr) {}

LocalFileWriter::~LocalFileWriter() {
close();
(void)close();
}

Status LocalFileWriter::open() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ Status OlapTableSink::send_chunk(RuntimeState* state, Chunk* chunk) {
// automatic bucket
std::set<int64_t> immutable_partition_ids;
if (_tablet_sink_sender->get_immutable_partition_ids(&immutable_partition_ids)) {
_update_immutable_partition(immutable_partition_ids);
RETURN_IF_ERROR(_update_immutable_partition(immutable_partition_ids));
}

// _enable_automatic_partition is true means destination table using automatic partition
Expand Down
6 changes: 3 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void StreamLoadAction::handle(HttpRequest* req) {

if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) {
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx);
(void)_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
}
if (ctx->body_sink != nullptr) {
Expand Down Expand Up @@ -187,7 +187,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
} else {
if (ctx->buffer != nullptr && ctx->buffer->pos > 0) {
ctx->buffer->flip();
ctx->body_sink->append(std::move(ctx->buffer));
RETURN_IF_ERROR(ctx->body_sink->append(std::move(ctx->buffer)));
ctx->buffer = nullptr;
}
RETURN_IF_ERROR(ctx->body_sink->finish());
Expand Down Expand Up @@ -240,7 +240,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
if (!st.ok()) {
ctx->status = st;
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx);
(void)_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
}
if (ctx->body_sink != nullptr) {
Expand Down
7 changes: 4 additions & 3 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {

if (!ctx->status.ok()) {
if (ctx->need_rollback) {
_exec_env->transaction_mgr()->_rollback_transaction(ctx);
(void)_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
}

Expand All @@ -170,7 +170,8 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {
// For JSON, now the buffer contains a complete json.
if (ctx->buffer != nullptr && ctx->buffer->pos > 0) {
ctx->buffer->flip();
ctx->body_sink->append(std::move(ctx->buffer));
WARN_IF_ERROR(ctx->body_sink->append(std::move(ctx->buffer)),
"append MessageBodySink failed when handle TransactionStreamLoad");
ctx->buffer = nullptr;
}

Expand Down Expand Up @@ -231,7 +232,7 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
if (!st.ok()) {
ctx->status = st;
if (ctx->need_rollback) {
_exec_env->transaction_mgr()->_rollback_transaction(ctx);
(void)_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();
Expand Down
13 changes: 7 additions & 6 deletions be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,17 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
}
});
_config_callback.emplace("max_compaction_concurrency", [&]() {
StorageEngine::instance()->compaction_manager()->update_max_threads(config::max_compaction_concurrency);
(void)StorageEngine::instance()->compaction_manager()->update_max_threads(
config::max_compaction_concurrency);
});
_config_callback.emplace("flush_thread_num_per_store", [&]() {
const size_t dir_cnt = StorageEngine::instance()->get_stores().size();
StorageEngine::instance()->memtable_flush_executor()->update_max_threads(
(void)StorageEngine::instance()->memtable_flush_executor()->update_max_threads(
config::flush_thread_num_per_store * dir_cnt);
StorageEngine::instance()->segment_replicate_executor()->update_max_threads(
(void)StorageEngine::instance()->segment_replicate_executor()->update_max_threads(
config::flush_thread_num_per_store * dir_cnt);
(void)StorageEngine::instance()->segment_flush_executor()->update_max_threads(
config::flush_thread_num_per_store * dir_cnt);
StorageEngine::instance()->segment_flush_executor()->update_max_threads(config::flush_thread_num_per_store *
dir_cnt);
});
_config_callback.emplace("update_compaction_num_threads_per_disk", [&]() {
StorageEngine::instance()->increase_update_compaction_thread(
Expand All @@ -113,7 +114,7 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
}
});
_config_callback.emplace("update_memory_limit_percent", [&]() {
StorageEngine::instance()->update_manager()->update_primary_index_memory_limit(
(void)StorageEngine::instance()->update_manager()->update_primary_index_memory_limit(
config::update_memory_limit_percent);
#if defined(USE_STAROS) && !defined(BE_TEST)
_exec_env->lake_update_manager()->update_primary_index_memory_limit(config::update_memory_limit_percent);
Expand Down
10 changes: 5 additions & 5 deletions be/src/runtime/routine_load/data_consumer_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
_queue.shutdown();
// cancel all consumers
for (auto& consumer : _consumers) {
consumer->cancel(ctx);
(void)consumer->cancel(ctx);
}

// waiting all threads finished
Expand All @@ -172,7 +172,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
// we need to commit and tell fe to move offset to the newest offset, otherwise, fe will retry consume.
for (auto& item : cmt_offset) {
if (item.second > ctx->kafka_info->cmt_offset[item.first]) {
kafka_pipe->finish();
RETURN_IF_ERROR(kafka_pipe->finish());
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
ctx->receive_bytes = 0;
return Status::OK();
Expand All @@ -182,7 +182,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
return Status::Cancelled("Cancelled");
} else {
DCHECK(left_bytes < ctx->max_batch_size);
kafka_pipe->finish();
RETURN_IF_ERROR(kafka_pipe->finish());
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
return Status::OK();
Expand Down Expand Up @@ -359,7 +359,7 @@ Status PulsarDataConsumerGroup::start_all(StreamLoadContext* ctx) {
_queue.shutdown();
// cancel all consumers
for (auto& consumer : _consumers) {
consumer->cancel(ctx);
(void)consumer->cancel(ctx);
}

// waiting all threads finished
Expand All @@ -378,7 +378,7 @@ Status PulsarDataConsumerGroup::start_all(StreamLoadContext* ctx) {
return Status::Cancelled("Cancelled");
} else {
DCHECK(left_bytes < ctx->max_batch_size);
pulsar_pipe->finish();
RETURN_IF_ERROR(pulsar_pipe->finish());
ctx->pulsar_info->ack_offset = std::move(ack_offset);
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
get_backlog_nums(ctx);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/routine_load/data_consumer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Status DataConsumerPool::get_consumer(StreamLoadContext* ctx, std::shared_ptr<Da
while (iter != std::end(_pool)) {
if ((*iter)->match(ctx)) {
VLOG(3) << "get an available data consumer from pool: " << (*iter)->id();
(*iter)->reset();
(void)(*iter)->reset();
*ret = *iter;
iter = _pool.erase(iter);
return Status::OK();
Expand Down Expand Up @@ -137,7 +137,7 @@ Status DataConsumerPool::get_consumer_grp(StreamLoadContext* ctx, std::shared_pt
void DataConsumerPool::return_consumer(const std::shared_ptr<DataConsumer>& consumer) {
std::unique_lock<std::mutex> l(_lock);

consumer->reset();
(void)consumer->reset();

if (_pool.size() == _max_pool_size) {
VLOG(3) << "data consumer pool is full: " << _pool.size() << "-" << _max_pool_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status&
LOG(WARNING) << err_msg << " " << ctx->brief();
ctx->status = st;
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx);
(void)_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
}
if (ctx->body_sink != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class StreamLoadContext {

~StreamLoadContext() noexcept {
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(this);
(void)_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Status StreamLoadPipe::no_block_read(uint8_t* data, size_t* data_size, bool* eof
Status StreamLoadPipe::finish() {
if (_write_buf != nullptr) {
_write_buf->flip();
_append(_write_buf);
RETURN_IF_ERROR(_append(_write_buf));
_write_buf.reset();
}
{
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/stream_load/transaction_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Status TransactionMgr::begin_transaction(const HttpRequest* req, std::string* re
if (!st.ok()) {
ctx->status = st;
if (ctx->need_rollback) {
_rollback_transaction(ctx);
(void)_rollback_transaction(ctx);
}
}
LOG(INFO) << "new transaction manage request. " << ctx->brief() << ", tbl=" << ctx->table << " op=begin";
Expand Down Expand Up @@ -262,7 +262,7 @@ Status TransactionMgr::commit_transaction(const HttpRequest* req, std::string* r
LOG(ERROR) << "Fail to commit txn: " << st << " " << ctx->brief();
ctx->status = st;
if (ctx->need_rollback) {
_rollback_transaction(ctx);
(void)_rollback_transaction(ctx);
}
}
*resp = _build_reply(TXN_COMMIT, ctx);
Expand Down Expand Up @@ -331,7 +331,7 @@ Status TransactionMgr::_commit_transaction(StreamLoadContext* ctx, bool prepare)
// 1. finish stream pipe & wait it done
if (ctx->buffer != nullptr && ctx->buffer->pos > 0) {
ctx->buffer->flip();
ctx->body_sink->append(std::move(ctx->buffer));
RETURN_IF_ERROR(ctx->body_sink->append(std::move(ctx->buffer)));
ctx->buffer = nullptr;
}
RETURN_IF_ERROR(ctx->body_sink->finish());
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/stream_load/transaction_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ class StreamContextMgr {
// 1. finish stream pipe & wait it done
if (ctx->buffer != nullptr && ctx->buffer->pos > 0) {
ctx->buffer->flip();
ctx->body_sink->append(std::move(ctx->buffer));
RETURN_IF_ERROR(ctx->body_sink->append(std::move(ctx->buffer)));
ctx->buffer = nullptr;
}
ctx->body_sink->finish();
RETURN_IF_ERROR(ctx->body_sink->finish());
} else {
std::string error_msg = fmt::format("stream load {} channel_id {}'s pipe doesn't exist", label,
std::to_string(channel_id));
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/backend_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ void BackendServiceBase::open_scanner(TScanOpenResult& result_, const TScanOpenP
TStatus t_status;
TUniqueId fragment_instance_id = generate_uuid();
std::shared_ptr<ScanContext> p_context;
_exec_env->external_scan_context_mgr()->create_scan_context(&p_context);
(void)_exec_env->external_scan_context_mgr()->create_scan_context(&p_context);
p_context->fragment_instance_id = fragment_instance_id;
p_context->offset = 0;
p_context->last_access_time = time(nullptr);
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/service_be/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ Status HttpServiceBE::start() {

// register pprof actions
if (!config::pprof_profile_dir.empty()) {
fs::create_directories(config::pprof_profile_dir);
RETURN_IF_ERROR(fs::create_directories(config::pprof_profile_dir));
}

auto* heap_action = new HeapAction();
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/service_be/starrocks_be.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void start_be(const std::vector<StorePath>& paths, bool as_cn) {

#if defined(WITH_CACHELIB) || defined(WITH_STARCACHE)
if (config::block_cache_enable) {
BlockCache::instance()->shutdown();
(void)BlockCache::instance()->shutdown();
LOG(INFO) << process_name << " exit step " << exit_step++ << ": block cache shutdown successfully";
}
#endif
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/binlog_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Status BinlogBuilder::commit(BinlogBuildResult* result) {
if (_current_writer->file_size() > _params->max_file_size) {
// ignore close failure for the last writer because
// all binlog data has been committed (persisted)
_close_current_writer();
(void)_close_current_writer();
_current_writer.reset();
}

Expand Down Expand Up @@ -282,7 +282,7 @@ BinlogFileWriterPtr BinlogBuilder::discard_binlog_build_result(int64_t version,
// 2.1 reset the writer to the previous meta for reuse
st = current_active_writer->reset(params->active_file_meta.get());
if (!st.ok()) {
current_active_writer->close(false);
WARN_IF_ERROR(current_active_writer->close(false), "Fail to close active writer");
LOG(WARNING) << "Fail to reset active writer when discarding data for version " << version
<< ", file path: " << current_active_writer->file_path() << ", " << st;
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/binlog_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ StatusOr<std::shared_ptr<BinlogFileWriter>> BinlogFileWriter::reopen(int64_t fil
std::make_shared<BinlogFileWriter>(file_id, file_path, page_size, compression_type);
Status status = writer->init(previous_meta);
if (!status.ok()) {
writer->close(false);
(void)writer->close(false);
LOG(WARNING) << "Failed to reopen writer: " << file_path << ", " << status;
return status;
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/storage/binlog_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ BinlogManager::BinlogManager(int64_t tablet_id, std::string path, int64_t max_fi
BinlogManager::~BinlogManager() {
std::unique_lock lock(_meta_lock);
if (_active_binlog_writer != nullptr) {
_active_binlog_writer->close(true);
WARN_IF_ERROR(_active_binlog_writer->close(true), "Close binlog writer failed");
_active_binlog_writer.reset();
}
}
Expand Down Expand Up @@ -569,9 +569,10 @@ BinlogRange BinlogManager::current_binlog_range() {
end->file_meta()->end_seq_id()};
}

// close_active_writer only used for UT
void BinlogManager::close_active_writer() {
if (_active_binlog_writer != nullptr) {
_active_binlog_writer->close(true);
(void)_active_binlog_writer->close(true);
_active_binlog_writer.reset();
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/column_expr_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ bool ColumnExprPredicate::zone_map_filter(const ZoneMapDetail& detail) const {
if (!evaluate(col.get(), selection, 0, size).ok()) {
return true;
}

for (uint16_t i = 0; i < size; i++) {
if (selection[i] != 0) {
return true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/column_predicate_rewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ StatusOr<bool> ColumnPredicateRewriter::_rewrite_expr_predicate(ObjectPool* pool
builder.set_is_not_in(is_not_in);
builder.use_array_set(code_size);
DCHECK_IF_ERROR(builder.create());
builder.add_values(used_values, 0);
(void)builder.add_values(used_values, 0);
ExprContext* filter = builder.get_in_const_predicate();

DCHECK_IF_ERROR(filter->prepare(state));
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ Status Compaction::_merge_rowsets_vertically(size_t segment_iterator_num, Statis
bool is_key = (i == 0);
if (!is_key) {
// read mask buffer from the beginning
mask_buffer->flip_to_read();
RETURN_IF_ERROR(mask_buffer->flip_to_read());
}

Schema schema = ChunkHelper::convert_schema(tablet_schema, _column_groups[i]);
Expand Down
Loading
Loading