Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/StarRocks/starrocks into mo…
Browse files Browse the repository at this point in the history
…dify_partition_clause

� Conflicts:
�	fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableStatementAnalyzer.java
�	fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java
�	fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java
�	fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4
  • Loading branch information
zhuxt2015 committed Jul 16, 2022
2 parents 9ff10ab + 5ab5166 commit 9de71de
Show file tree
Hide file tree
Showing 845 changed files with 46,780 additions and 4,565 deletions.
7 changes: 5 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# StarRocks will be the owner of all codes
* @StarRocks/PMC
# committer will be the owner of all codes
* @StarRocks/starrocks-committer

# /docs/ belong to docs-maintainer
/docs/ @StarRocks/docs-maintainer
4 changes: 2 additions & 2 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ add_library(crypto STATIC IMPORTED)
set_target_properties(crypto PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libcrypto.a)

set(AWSSDK_ROOT_DIR ${THIRDPARTY_DIR})
set(AWSSDK_COMMON_RUNTIME_LIBS "aws-crt-cpp;aws-c-auth;aws-c-cal;aws-c-common;aws-c-compression;aws-c-event-stream;aws-c-http;aws-c-io;aws-c-mqtt;aws-c-s3;aws-checksums;s2n")
set(AWSSDK_COMMON_RUNTIME_LIBS "aws-crt-cpp;aws-c-auth;aws-c-cal;aws-c-common;aws-c-compression;aws-c-event-stream;aws-c-http;aws-c-io;aws-c-mqtt;aws-c-s3;aws-checksums;s2n;aws-c-sdkutils")
foreach(lib IN ITEMS ${AWSSDK_COMMON_RUNTIME_LIBS})
list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib/${lib}/cmake)
list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib64/${lib}/cmake)
Expand Down Expand Up @@ -692,7 +692,7 @@ ENDFUNCTION()
# Add a benchmark to BE
FUNCTION(ADD_BE_BENCH TEST_NAME)

if (${WITH_BENCH} STREQUAL "OFF")
if (WITH_BENCH STREQUAL "OFF")
return()
endif()

Expand Down
56 changes: 40 additions & 16 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@

namespace starrocks {

namespace {
constexpr size_t DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE = 2048;
constexpr size_t MIN_CLONE_TASK_THREADS_IN_POOL = 2;
} // namespace

const uint32_t REPORT_TASK_WORKER_COUNT = 1;
const uint32_t REPORT_DISK_STATE_WORKER_COUNT = 1;
const uint32_t REPORT_OLAP_TABLE_WORKER_COUNT = 1;
Expand Down Expand Up @@ -70,6 +75,7 @@ class AgentServer::Impl {
ExecEnv* _exec_env;

std::unique_ptr<ThreadPool> _thread_pool_publish_version;
std::unique_ptr<ThreadPool> _thread_pool_clone;

std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
Expand Down Expand Up @@ -110,25 +116,43 @@ void AgentServer::Impl::init_or_die() {
}
}

auto st =
ThreadPoolBuilder("publish_version")
.set_min_threads(config::transaction_publish_version_worker_count)
.set_max_threads(config::transaction_publish_version_worker_count)
// The ideal queue size of threadpool should be larger than the maximum number of tablet of a partition.
// But it seems that there's no limit for the number of tablets of a partition.
// Since a large queue size brings a little overhead, a big one is chosen here.
.set_max_queue_size(2048)
.build(&_thread_pool_publish_version);
CHECK(st.ok()) << st;
#define BUILD_DYNAMIC_TASK_THREAD_POOL(name, min_threads, max_threads, queue_size, pool) \
do { \
auto st = ThreadPoolBuilder(name) \
.set_min_threads(min_threads) \
.set_max_threads(max_threads) \
.set_max_queue_size(queue_size) \
.build(&(pool)); \
CHECK(st.ok()) << st; \
} while (false)

// The ideal queue size of threadpool should be larger than the maximum number of tablet of a partition.
// But it seems that there's no limit for the number of tablets of a partition.
// Since a large queue size brings a little overhead, a big one is chosen here.
BUILD_DYNAMIC_TASK_THREAD_POOL("publish_version", config::transaction_publish_version_worker_count,
config::transaction_publish_version_worker_count,
DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_publish_version);
#ifndef BE_TEST
// Currently FE can have at most num_of_storage_path * schedule_slot_num_per_path(default 2) clone tasks
// scheduled simultaneously, but previously we have only 3 clone worker threads by default,
// so this is to keep the dop of clone task handling in sync with FE.
//
// TODO(shangyiming): using dynamic thread pool to handle task directly instead of using TaskThreadPool
// Currently, the task submission and processing logic is deeply coupled with TaskThreadPool, change that will
// need to modify many interfaces. So for now we still use TaskThreadPool to submit clone tasks, but with
// only a single worker thread, then we use dynamic thread pool to handle the task concurrently in clone task
// callback, so that we can match the dop of FE clone task scheduling.
BUILD_DYNAMIC_TASK_THREAD_POOL("clone", MIN_CLONE_TASK_THREADS_IN_POOL,
_exec_env->store_paths().size() * config::parallel_clone_task_per_path,
DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_clone);
#endif

// It is the same code to create workers of each type, so we use a macro
// to make code to be more readable.

#ifndef BE_TEST
#define CREATE_AND_START_POOL(type, pool_name, worker_num) \
pool_name.reset(new TaskWorkerPool(TaskWorkerPool::TaskWorkerType::type, _exec_env, worker_num)); \
pool_name->start();

#else
#define CREATE_AND_START_POOL(type, pool_name, worker_num)
#endif // BE_TEST
Expand All @@ -144,7 +168,7 @@ void AgentServer::Impl::init_or_die() {
CREATE_AND_START_POOL(DELETE, _delete_workers,
config::delete_worker_count_normal_priority + config::delete_worker_count_high_priority);
CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers, config::alter_tablet_worker_count);
CREATE_AND_START_POOL(CLONE, _clone_workers, config::clone_worker_count);
CREATE_AND_START_POOL(CLONE, _clone_workers, 1);
CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers,
config::storage_medium_migrate_count);
CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers, config::check_consistency_worker_count);
Expand All @@ -165,6 +189,7 @@ AgentServer::Impl::~Impl() {
_thread_pool_publish_version->shutdown();

#ifndef BE_TEST
_thread_pool_clone->shutdown();
#define STOP_POOL(type, pool_name) pool_name->stop();
#else
#define STOP_POOL(type, pool_name)
Expand All @@ -191,8 +216,6 @@ AgentServer::Impl::~Impl() {
STOP_POOL(MOVE, _move_dir_workers);
STOP_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
#undef STOP_POOL

_thread_pool_publish_version->wait();
}

// TODO(lingbin): each task in the batch may have it own status or FE must check and
Expand Down Expand Up @@ -407,10 +430,11 @@ ThreadPool* AgentServer::Impl::get_thread_pool(int type) const {
switch (type) {
case TTaskType::PUBLISH_VERSION:
return _thread_pool_publish_version.get();
case TTaskType::CLONE:
return _thread_pool_clone.get();
case TTaskType::CREATE:
case TTaskType::DROP:
case TTaskType::PUSH:
case TTaskType::CLONE:
case TTaskType::STORAGE_MEDIUM_MIGRATE:
case TTaskType::ROLLUP:
case TTaskType::SCHEMA_CHANGE:
Expand Down

0 comments on commit 9de71de

Please sign in to comment.