Skip to content

Commit

Permalink
Merge branch 'master' into cooldown_replica_as_clone_dst_tablet
Browse files Browse the repository at this point in the history
  • Loading branch information
platoneko committed Sep 1, 2023
2 parents b652347 + c31cb5f commit 1477542
Show file tree
Hide file tree
Showing 993 changed files with 36,565 additions and 11,418 deletions.
67 changes: 32 additions & 35 deletions be/src/agent/heartbeat_server.cpp
Expand Up @@ -62,6 +62,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
<< "host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;

MonotonicStopWatch watch;
Expand Down Expand Up @@ -92,20 +93,17 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
if (_master_info->cluster_id == -1) {
LOG(INFO) << "get first heartbeat. update cluster id";
// write and update cluster id
auto st = _olap_engine->set_cluster_id(master_info.cluster_id);
if (!st.ok()) {
LOG(WARNING) << "fail to set cluster id. status=" << st;
return Status::InternalError("fail to set cluster id.");
} else {
_master_info->cluster_id = master_info.cluster_id;
LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname
<< ". port: " << master_info.network_address.port
<< ". cluster id: " << master_info.cluster_id;
}
RETURN_IF_ERROR(_olap_engine->set_cluster_id(master_info.cluster_id));

_master_info->cluster_id = master_info.cluster_id;
LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname
<< ". port: " << master_info.network_address.port
<< ". cluster id: " << master_info.cluster_id
<< ". frontend_infos: " << PrintFrontendInfos(master_info.frontend_infos);
} else {
if (_master_info->cluster_id != master_info.cluster_id) {
LOG(WARNING) << "invalid cluster id: " << master_info.cluster_id << ". ignore.";
return Status::InternalError("invalid cluster id. ignore.");
return Status::InternalError("invalid cluster id. ignore. cluster_id={}",
master_info.cluster_id);
}
}

Expand All @@ -132,10 +130,9 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
std::vector<InetAddress> hosts;
status = get_hosts(&hosts);
if (!status.ok() || hosts.empty()) {
std::stringstream ss;
ss << "the status was not ok when get_hosts, error is " << status.to_string();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InternalError(
"the status was not ok when get_hosts, error is {}",
status.to_string());
}

//step4: check if the IP of FQDN belongs to the current machine and update BackendOptions._s_localhost
Expand All @@ -149,12 +146,10 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

if (!set_new_localhost) {
std::stringstream ss;
ss << "the host recorded in master is " << master_info.backend_ip
<< ", but we cannot found the local ip that mapped to that host."
<< BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InternalError(
"the host recorded in master is {}, but we cannot found the local ip "
"that mapped to that host. backend={}",
master_info.backend_ip, BackendOptions::get_localhost());
}
} else {
// if is ip,not check anything,use it
Expand All @@ -179,12 +174,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
<< ". port: " << _master_info->network_address.port
<< ". epoch: " << _fe_epoch;
} else {
LOG(WARNING) << "epoch is not greater than local. ignore heartbeat. host: "
<< _master_info->network_address.hostname
<< " port: " << _master_info->network_address.port
<< " local epoch: " << _fe_epoch
<< " received epoch: " << master_info.epoch;
return Status::InternalError("epoch is not greater than local. ignore heartbeat.");
return Status::InternalError(
"epoch is not greater than local. ignore heartbeat. host: {}, port: {}, local "
"epoch: {}, received epoch: {}",
_master_info->network_address.hostname, _master_info->network_address.port,
_fe_epoch, master_info.epoch);
}
} else {
// when Master FE restarted, host and port remains the same, but epoch will be increased.
Expand All @@ -200,9 +194,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
_master_info->__set_token(master_info.token);
LOG(INFO) << "get token. token: " << _master_info->token;
} else if (_master_info->token != master_info.token) {
LOG(WARNING) << "invalid token. local_token:" << _master_info->token
<< ". token:" << master_info.token;
return Status::InternalError("invalid token.");
return Status::InternalError("invalid token. local_token: {}, token: {}",
_master_info->token, master_info.token);
}
}

Expand All @@ -219,6 +212,10 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
_master_info->__set_backend_id(master_info.backend_id);
}

if (master_info.__isset.frontend_infos) {
ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_olap_engine->notify_listeners();
Expand All @@ -228,8 +225,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port,
ThriftServer** thrift_server, uint32_t worker_thread_num,
TMasterInfo* local_master_info) {
std::unique_ptr<ThriftServer>* thrift_server,
uint32_t worker_thread_num, TMasterInfo* local_master_info) {
HeartbeatServer* heartbeat_server = new HeartbeatServer(local_master_info);
if (heartbeat_server == nullptr) {
return Status::InternalError("Get heartbeat server failed");
Expand All @@ -240,8 +237,8 @@ Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port,
std::shared_ptr<HeartbeatServer> handler(heartbeat_server);
std::shared_ptr<HeartbeatServiceProcessor::TProcessor> server_processor(
new HeartbeatServiceProcessor(handler));
*thrift_server =
new ThriftServer("heartbeat", server_processor, server_port, worker_thread_num);
*thrift_server = std::make_unique<ThriftServer>("heartbeat", server_processor, server_port,
worker_thread_num);
return Status::OK();
}
} // namespace doris
4 changes: 2 additions & 2 deletions be/src/agent/heartbeat_server.h
Expand Up @@ -66,6 +66,6 @@ class HeartbeatServer : public HeartbeatServiceIf {
}; // class HeartBeatServer

Status create_heartbeat_server(ExecEnv* exec_env, uint32_t heartbeat_server_port,
ThriftServer** heart_beat_server, uint32_t worker_thread_num,
TMasterInfo* local_master_info);
std::unique_ptr<ThriftServer>* heart_beat_server,
uint32_t worker_thread_num, TMasterInfo* local_master_info);
} // namespace doris
24 changes: 12 additions & 12 deletions be/src/agent/task_worker_pool.cpp
Expand Up @@ -355,7 +355,7 @@ void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
alter_inverted_index_rq.tablet_id);
if (tablet_ptr != nullptr) {
EngineIndexChangeTask engine_task(alter_inverted_index_rq);
status = _env->storage_engine()->execute_task(&engine_task);
status = StorageEngine::instance()->execute_task(&engine_task);
} else {
status =
Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
Expand Down Expand Up @@ -570,7 +570,7 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() {
EngineChecksumTask engine_task(check_consistency_req.tablet_id,
check_consistency_req.schema_hash,
check_consistency_req.version, &checksum);
Status status = _env->storage_engine()->execute_task(&engine_task);
Status status = StorageEngine::instance()->execute_task(&engine_task);
if (!status.ok()) {
LOG_WARNING("failed to check consistency")
.tag("signature", agent_task_req.signature)
Expand Down Expand Up @@ -667,7 +667,7 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
request.__isset.disks = true;

std::vector<DataDirInfo> data_dir_infos;
_env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */);
StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, true /* update */);

for (auto& root_path_info : data_dir_infos) {
TDisk disk;
Expand Down Expand Up @@ -1305,7 +1305,7 @@ void CreateTableTaskPool::_create_tablet_worker_thread_callback() {

std::vector<TTabletInfo> finish_tablet_infos;
VLOG_NOTICE << "create tablet: " << create_tablet_req;
Status status = _env->storage_engine()->create_tablet(create_tablet_req, profile);
Status status = StorageEngine::instance()->create_tablet(create_tablet_req, profile);
if (!status.ok()) {
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
LOG_WARNING("failed to create tablet, reason={}", status.to_string())
Expand Down Expand Up @@ -1384,7 +1384,7 @@ void DropTableTaskPool::_drop_tablet_worker_thread_callback() {
// if tablet is dropped by fe, then the related txn should also be removed
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
drop_tablet_req.schema_hash, dropped_tablet->tablet_uid());
dropped_tablet->tablet_uid());
LOG_INFO("successfully drop tablet")
.tag("signature", agent_task_req.signature)
.tag("tablet_id", drop_tablet_req.tablet_id);
Expand Down Expand Up @@ -1470,7 +1470,7 @@ void PushTaskPool::_push_worker_thread_callback() {
std::vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(push_req, &tablet_infos);
auto status = _env->storage_engine()->execute_task(&engine_task);
auto status = StorageEngine::instance()->execute_task(&engine_task);

// Return result to fe
TFinishTaskRequest finish_task_request;
Expand Down Expand Up @@ -1539,7 +1539,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablet_ids, &discontinuous_version_tablets);
status = _env->storage_engine()->execute_task(&engine_task);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
} else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
Expand Down Expand Up @@ -1664,11 +1664,11 @@ void ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback()
// If it is not greater than zero, no need to execute
// the following clear_transaction_task() function.
if (!clear_transaction_task_req.partition_id.empty()) {
_env->storage_engine()->clear_transaction_task(
StorageEngine::instance()->clear_transaction_task(
clear_transaction_task_req.transaction_id,
clear_transaction_task_req.partition_id);
} else {
_env->storage_engine()->clear_transaction_task(
StorageEngine::instance()->clear_transaction_task(
clear_transaction_task_req.transaction_id);
}
LOG(INFO) << "finish to clear transaction task. signature=" << agent_task_req.signature
Expand Down Expand Up @@ -1765,7 +1765,7 @@ void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& agent_task_req,
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
status = _env->storage_engine()->execute_task(&engine_task);
status = StorageEngine::instance()->execute_task(&engine_task);
}

if (status.ok()) {
Expand Down Expand Up @@ -1866,7 +1866,7 @@ void CloneTaskPool::_clone_worker_thread_callback() {
std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(clone_req, _master_info, agent_task_req.signature,
&tablet_infos);
auto status = _env->storage_engine()->execute_task(&engine_task);
auto status = StorageEngine::instance()->execute_task(&engine_task);
// Return result to fe
TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_local_backend());
Expand Down Expand Up @@ -1923,7 +1923,7 @@ void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
auto status = _check_migrate_request(storage_medium_migrate_req, tablet, &dest_store);
if (status.ok()) {
EngineStorageMigrationTask engine_task(tablet, dest_store);
status = _env->storage_engine()->execute_task(&engine_task);
status = StorageEngine::instance()->execute_task(&engine_task);
}
if (!status.ok()) {
LOG_WARNING("failed to migrate storage medium")
Expand Down
17 changes: 16 additions & 1 deletion be/src/common/config.cpp
Expand Up @@ -286,6 +286,9 @@ DEFINE_mInt32(default_num_rows_per_column_file_block, "1024");
DEFINE_mInt32(pending_data_expire_time_sec, "1800");
// inc_rowset snapshot rs sweep time interval
DEFINE_mInt32(tablet_rowset_stale_sweep_time_sec, "300");
// tablet stale rowset sweep by threshold size
DEFINE_Bool(tablet_rowset_stale_sweep_by_size, "false");
DEFINE_mInt32(tablet_rowset_stale_sweep_threshold_size, "100");
// garbage sweep policy
DEFINE_Int32(max_garbage_sweep_interval, "3600");
DEFINE_Int32(min_garbage_sweep_interval, "180");
Expand All @@ -297,6 +300,7 @@ DEFINE_mInt32(trash_file_expire_time_sec, "259200");
// minimum file descriptor number
// modify them upon necessity
DEFINE_Int32(min_file_descriptor_number, "60000");
DEFINE_mBool(disable_segment_cache, "false");
DEFINE_Int64(index_stream_cache_capacity, "10737418240");
DEFINE_String(row_cache_mem_limit, "20%");

Expand Down Expand Up @@ -483,6 +487,8 @@ DEFINE_mInt32(stream_load_record_batch_size, "50");
DEFINE_Int32(stream_load_record_expire_time_secs, "28800");
// time interval to clean expired stream load records
DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800");
// The buffer size to store stream table function schema info
DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
Expand Down Expand Up @@ -845,6 +851,9 @@ DEFINE_Validator(jsonb_type_length_soft_limit_bytes,
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
DEFINE_Int32(object_pool_buffer_size, "100");

// Threshold of reading a small file into memory
DEFINE_mInt32(in_memory_file_size, "1048576"); // 1MB

// ParquetReaderWrap prefetch buffer size
DEFINE_Int32(parquet_reader_max_buffer_size, "50");
// Max size of parquet page header in bytes
Expand Down Expand Up @@ -1070,6 +1079,8 @@ DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");

DEFINE_Int32(partition_topn_partition_threshold, "1024");

DEFINE_Int32(fe_expire_duration_seconds, "60");

#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");
Expand Down Expand Up @@ -1507,7 +1518,11 @@ std::vector<std::vector<std::string>> get_config_info() {
_config.push_back(it.first);

_config.push_back(field_it->second.type);
_config.push_back(it.second);
if (0 == strcmp(field_it->second.type, "bool")) {
_config.push_back(it.second == "1" ? "true" : "false");
} else {
_config.push_back(it.second);
}
_config.push_back(field_it->second.valmutable ? "true" : "false");

configs.push_back(_config);
Expand Down
14 changes: 14 additions & 0 deletions be/src/common/config.h
Expand Up @@ -332,6 +332,9 @@ DECLARE_mInt32(default_num_rows_per_column_file_block);
DECLARE_mInt32(pending_data_expire_time_sec);
// inc_rowset snapshot rs sweep time interval
DECLARE_mInt32(tablet_rowset_stale_sweep_time_sec);
// tablet stale rowset sweep by threshold size
DECLARE_Bool(tablet_rowset_stale_sweep_by_size);
DECLARE_mInt32(tablet_rowset_stale_sweep_threshold_size);
// garbage sweep policy
DECLARE_Int32(max_garbage_sweep_interval);
DECLARE_Int32(min_garbage_sweep_interval);
Expand All @@ -344,6 +347,7 @@ DECLARE_mInt32(trash_file_expire_time_sec);
// minimum file descriptor number
// modify them upon necessity
DECLARE_Int32(min_file_descriptor_number);
DECLARE_mBool(disable_segment_cache);
DECLARE_Int64(index_stream_cache_capacity);
DECLARE_String(row_cache_mem_limit);

Expand All @@ -356,6 +360,7 @@ DECLARE_Int32(storage_page_cache_shard_size);
// all storage page cache will be divided into data_page_cache and index_page_cache
DECLARE_Int32(index_page_cache_percentage);
// whether to disable page cache feature in storage
// TODO delete it. Divided into Data page, Index page, pk index page
DECLARE_Bool(disable_storage_page_cache);
// whether to disable row cache feature in storage
DECLARE_Bool(disable_storage_row_cache);
Expand Down Expand Up @@ -534,6 +539,8 @@ DECLARE_mInt32(stream_load_record_batch_size);
DECLARE_Int32(stream_load_record_expire_time_secs);
// time interval to clean expired stream load records
DECLARE_mInt64(clean_stream_load_record_interval_secs);
// The buffer size to store stream table function schema info
DECLARE_Int64(stream_tvf_buffer_size);

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
Expand Down Expand Up @@ -892,6 +899,9 @@ DECLARE_mInt32(jsonb_type_length_soft_limit_bytes);
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
DECLARE_Int32(object_pool_buffer_size);

// Threshold fo reading a small file into memory
DECLARE_mInt32(in_memory_file_size);

// ParquetReaderWrap prefetch buffer size
DECLARE_Int32(parquet_reader_max_buffer_size);
// Max size of parquet page header in bytes
Expand Down Expand Up @@ -1129,6 +1139,10 @@ DECLARE_mString(user_files_secure_path);
// and if this threshold is exceeded, the remaining data will be pass through to other node directly.
DECLARE_Int32(partition_topn_partition_threshold);

// If fe's frontend info has not been updated for more than fe_expire_duration_seconds, it will be regarded
// as an abnormal fe, this will cause be to cancel this fe's related query.
DECLARE_Int32(fe_expire_duration_seconds);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down

0 comments on commit 1477542

Please sign in to comment.