Skip to content

Commit

Permalink
perf: fix tokio trace filter & config more perf parameter (#1372)
Browse files Browse the repository at this point in the history
  • Loading branch information
roseboy-liu committed Jul 26, 2023
1 parent d00a38f commit 9a42b66
Show file tree
Hide file tree
Showing 28 changed files with 133 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async-recursion = "1.0.0"
async-stream = "0.3"
async-trait = "0.1"
backtrace = "0.3"
base64 = { version = "0.13" }
base64 = "0.13"
bincode = "1.3.3"
blake3 = "1.3.3"
byteorder = "1.4.3"
Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ datafusion = { workspace = true }
dirs = { workspace = true }
env_logger = { workspace = true }
rustyline = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "tracing"] }
async-backtrace = { workspace = true, optional = true }
walkdir = { workspace = true }
anyhow = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion common/lru_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ harness = false
[dependencies]
async-backtrace = { workspace = true, optional = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio = { workspace = true, features = ["sync", "tracing"] }
tracing = { workspace = true }
utils = { path = "../utils" }

Expand Down
2 changes: 1 addition & 1 deletion common/models/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "tracing"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
uuid = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion common/trace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ pub fn init_global_tracing(
let guards = vec![guard];

let registry_builder = Registry::default()
.with(env_filter(tracing_level))
.with(ErrorLayer::default())
.with(formatting_layer)
.with(file_layer)
Expand Down
2 changes: 1 addition & 1 deletion common/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ trace = { path = "../trace" }

async-backtrace = { workspace = true, optional = true }
libc = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "tracing"] }

[target.'cfg(unix)'.dependencies]
pprof = { workspace = true, features = ["flamegraph", "protobuf-codec", "frame-pointer"] }
Expand Down
1 change: 1 addition & 0 deletions config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ serde = { workspace = true }
toml = { workspace = true }
async-backtrace = { workspace = true, optional = true }
sys-info = {workspace = true}
num_cpus = {workspace = true}

[features]
default = []
Expand Down
10 changes: 9 additions & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#reporting_disabled = false
#node_id = 100

Expand All @@ -12,6 +11,10 @@ max_server_connections = 10240
query_sql_limit = 16777216 # 16 * 1024 * 1024
write_sql_limit = 167772160 # 160 * 1024 * 1024
auth_enabled = false
read_timeout_ms = 3000
write_timeout_ms = 3000
stream_trigger_cpu = 1
stream_executor_cpu = 2

[storage]

Expand Down Expand Up @@ -83,9 +86,14 @@ path = 'data/wal'
## The maximum amount of immutable caches.
#max_immutable_number = 4

## The partion number of memcache cache,default equal to cpu number
# partition = 16

[log]
level = 'info'
path = 'data/log'
## Tokio trace, default turn off tokio trace
#tokio_trace = { addr = "127.0.0.1:6669" }

[security]
# [security.tls_config]
Expand Down
9 changes: 6 additions & 3 deletions config/config_8902.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#reporting_disabled = false
host = "localhost"

Expand All @@ -10,8 +9,12 @@ host = "localhost"
[query]
max_server_connections = 10240
query_sql_limit = 16777216 # 16 * 1024 * 1024
write_sql_limit = 167772160 # 160 * 1024 * 1024
write_sql_limit = 167772160 # 160 * 1024 * 1024
auth_enabled = false
read_timeout_ms = 3000
write_timeout_ms = 3000
stream_trigger_cpu = 1
stream_executor_cpu = 2

[storage]
# Directory for summary: $path/summary/
Expand Down Expand Up @@ -41,7 +44,7 @@ sync_interval = "0"
[cache]
max_buffer_size = "128M" # 134217728
max_immutable_number = 4

partition = 16 # default memcache partition number
[log]
level = 'info'
path = '/tmp/cnosdb/1001/log'
Expand Down
9 changes: 6 additions & 3 deletions config/config_8912.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#reporting_disabled = false
host = "localhost"

Expand All @@ -10,9 +9,12 @@ host = "localhost"
[query]
max_server_connections = 10240
query_sql_limit = 16777216 # 16 * 1024 * 1024
write_sql_limit = 167772160 # 160 * 1024 * 1024
write_sql_limit = 167772160 # 160 * 1024 * 1024
auth_enabled = false
store_metrics = true
read_timeout_ms = 3000
write_timeout_ms = 3000
stream_trigger_cpu = 1
stream_executor_cpu = 2

[storage]
# Directory for summary: $path/summary/
Expand Down Expand Up @@ -42,6 +44,7 @@ sync_interval = "0"
[cache]
max_buffer_size = "128M" # 134217728
max_immutable_number = 4
partition = 16 # default memcache partition number

[log]
level = 'info'
Expand Down
19 changes: 18 additions & 1 deletion config/src/cache_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub struct CacheConfig {
pub max_buffer_size: u64,
#[serde(default = "CacheConfig::default_max_immutable_number")]
pub max_immutable_number: u16,
#[serde(default = "CacheConfig::default_partitions")]
pub partition: usize,
}

impl CacheConfig {
Expand All @@ -21,6 +23,9 @@ impl CacheConfig {
fn default_max_immutable_number() -> u16 {
4
}
fn default_partitions() -> usize {
num_cpus::get()
}

pub fn override_by_env(&mut self) {
if let Ok(size) = std::env::var("CNOSDB_CACHE_MAX_BUFFER_SIZE") {
Expand All @@ -29,6 +34,9 @@ impl CacheConfig {
if let Ok(size) = std::env::var("CNOSDB_CACHE_MAX_IMMUTABLE_NUMBER") {
self.max_immutable_number = size.parse::<u16>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_CACHE_PARTITIONS") {
self.partition = size.parse::<usize>().unwrap();
}
}
}

Expand All @@ -37,6 +45,7 @@ impl Default for CacheConfig {
Self {
max_buffer_size: Self::default_max_buffer_size(),
max_immutable_number: Self::default_max_immutable_number(),
partition: Self::default_partitions(),
}
}
}
Expand All @@ -58,12 +67,20 @@ impl CheckConfig for CacheConfig {
.is_none()
{
ret.add_error(CheckConfigItemResult {
config: config_name,
config: config_name.clone(),
item: "max_immutable_number".to_string(),
message: "'max_immutable_number' maybe too big( ('max_immutable_number' + 1) * 'max_buffer_size' caused an overflow)".to_string(),
});
}

if self.partition > 1024 {
ret.add_warn(CheckConfigItemResult {
config: config_name,
item: "partition".to_string(),
message: "'partition' maybe too big(more than 1024)".to_string(),
});
}

if ret.is_empty() {
None
} else {
Expand Down
1 change: 1 addition & 0 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ max_immutable_number = 4
[log]
level = 'info'
path = 'data/log'
tokio_trace = { addr = "127.0.0.1:6669" }
[security]
# [security.tls_config]
Expand Down
35 changes: 34 additions & 1 deletion config/src/query_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub struct QueryConfig {
pub read_timeout_ms: u64,
#[serde(default = "QueryConfig::default_write_timeout_ms")]
pub write_timeout_ms: u64,
#[serde(default = "QueryConfig::default_stream_trigger_cpu")]
pub stream_trigger_cpu: usize,
#[serde(default = "QueryConfig::default_stream_executor_cpu")]
pub stream_executor_cpu: usize,
}

impl QueryConfig {
Expand All @@ -44,6 +48,12 @@ impl QueryConfig {
fn default_write_timeout_ms() -> u64 {
3 * 1000
}
fn default_stream_trigger_cpu() -> usize {
1
}
fn default_stream_executor_cpu() -> usize {
2
}

pub fn override_by_env(&mut self) {
if let Ok(size) = std::env::var("MAX_SERVER_CONNECTIONS") {
Expand All @@ -64,6 +74,12 @@ impl QueryConfig {
if let Ok(size) = std::env::var("WRITE_TIMEOUT_MS") {
self.write_timeout_ms = size.parse::<u64>().unwrap();
}
if let Ok(size) = std::env::var("STREAM_TRIGGER_CPU") {
self.stream_trigger_cpu = size.parse::<usize>().unwrap();
}
if let Ok(size) = std::env::var("STREAM_EXECUTOR_CPU") {
self.stream_executor_cpu = size.parse::<usize>().unwrap();
}
}
}

Expand All @@ -76,6 +92,8 @@ impl Default for QueryConfig {
auth_enabled: Self::default_auth_enabled(),
read_timeout_ms: Self::default_read_timeout_ms(),
write_timeout_ms: Self::default_write_timeout_ms(),
stream_trigger_cpu: Self::default_stream_trigger_cpu(),
stream_executor_cpu: Self::default_stream_executor_cpu(),
}
}
}
Expand Down Expand Up @@ -117,12 +135,27 @@ impl CheckConfig for QueryConfig {

if self.write_timeout_ms < 10 {
ret.add_warn(CheckConfigItemResult {
config: config_name,
config: config_name.clone(),
item: "write_timeout_ms".to_string(),
message: "'write_timeout_ms' maybe too small(less than 10)".to_string(),
})
}

if self.stream_executor_cpu > 1024 {
ret.add_warn(CheckConfigItemResult {
config: config_name.clone(),
item: "stream_executor_cpu".to_string(),
message: "'stream_executor_cpu' maybe too big(more than 1024)".to_string(),
})
}
if self.stream_trigger_cpu > 1024 {
ret.add_warn(CheckConfigItemResult {
config: config_name,
item: "stream_trigger_cpu".to_string(),
message: "'stream_trigger_cpu' maybe too big(more than 1024)".to_string(),
})
}

if ret.is_empty() {
None
} else {
Expand Down
2 changes: 1 addition & 1 deletion coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ tracing = { workspace = true }
tracing-futures = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio = { workspace = true, features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio-util = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
Expand Down
4 changes: 2 additions & 2 deletions query_server/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ coordinator = { path = "../../coordinator" }
protocol_parser = { path = "../../common/protocol_parser" }
memory_pool = { path = "../../common/memory_pool" }
spi = { path = "../spi" }
metrics = {path = "../../common/metrics"}
metrics = { path = "../../common/metrics" }


async-trait = { workspace = true }
Expand All @@ -29,7 +29,7 @@ num_cpus = { workspace = true }
parking_lot = { workspace = true }
paste = { workspace = true }
pin-project = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "tracing"] }
tokio-util = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
Expand Down
12 changes: 8 additions & 4 deletions query_server/query/src/execution/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use spi::query::logical_planner::{Plan, QueryPlan};
use spi::query::optimizer::Optimizer;
use spi::query::scheduler::SchedulerRef;
use spi::QueryError;
use tskv::kv_option::QueryOptions;

use super::query::SqlQueryExecution;
use super::stream::trigger::executor::{TriggerExecutorFactory, TriggerExecutorFactoryRef};
Expand Down Expand Up @@ -35,17 +36,20 @@ impl SqlQueryExecutionFactory {
scheduler: SchedulerRef,
query_tracker: Arc<QueryTracker>,
stream_checker_manager: StreamCheckerManagerRef,
config: Arc<QueryOptions>,
) -> Self {
// TODO configurable
// Only do periodic scheduling, no need for many threads
let trigger_executor_runtime = DedicatedExecutor::new("stream-trigger", 4);
let trigger_executor_runtime =
DedicatedExecutor::new("stream-trigger", config.stream_trigger_cpu);
let trigger_executor_factory = Arc::new(TriggerExecutorFactory::new(Arc::new(
trigger_executor_runtime,
)));

// TODO configurable
// perform stream-related preparations, not actual operator execution
let runtime = Arc::new(DedicatedExecutor::new("stream-executor", num_cpus::get()));
let runtime = Arc::new(DedicatedExecutor::new(
"stream-executor",
config.stream_executor_cpu,
));

Self {
optimizer,
Expand Down
1 change: 1 addition & 0 deletions query_server/query/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub async fn make_cnosdbms(
scheduler,
query_tracker.clone(),
Arc::new(stream_checker_manager),
options.query.clone(),
));

let meta_manager = coord.meta_manager();
Expand Down
2 changes: 1 addition & 1 deletion query_server/sqllogicaltests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition.workspace = true
trace = { path = "../../common/trace" }
sqllogictest = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["full", "tracing"] }
arrow = { workspace = true }
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
tonic = { workspace = true, features = ["transport", "tls"] }
Expand Down
2 changes: 1 addition & 1 deletion query_server/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ diff = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "tracing"] }
toml = { workspace = true }
walkdir = { workspace = true }
async-backtrace = { workspace = true, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion tskv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ sled = { workspace = true }
snafu = { workspace = true }
snap = { workspace = true }
static_assertions = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "tracing"] }
tokio-util = { workspace = true }
walkdir = { workspace = true }
zstd = { workspace = true }
Expand Down

0 comments on commit 9a42b66

Please sign in to comment.