Skip to content

Commit

Permalink
threadpool, coprocessor: set stack size to 10 MB (tikv#2431)
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Oct 28, 2017
1 parent 49229e8 commit 64bffdf
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 7 deletions.
3 changes: 3 additions & 0 deletions etc/config-template.toml
Expand Up @@ -35,6 +35,9 @@
# max count of tasks being handled, new tasks will be rejected.
# end-point-max-tasks = 2000

# stack size of endpoint, complicated tasks may involve very deep recursion.
# end-point-stack-size = "10MB"

# set attributes about this server, e.g. { zone = "us-west-1", disk = "ssd" }.
# labels = {}

Expand Down
3 changes: 3 additions & 0 deletions src/coprocessor/endpoint.rs
Expand Up @@ -175,16 +175,19 @@ impl Host {
thd_name!("endpoint-normal-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.stack_size(cfg.end_point_stack_size.0 as usize)
.build(),
low_priority_pool: ThreadPoolBuilder::new(
thd_name!("endpoint-low-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.stack_size(cfg.end_point_stack_size.0 as usize)
.build(),
high_priority_pool: ThreadPoolBuilder::new(
thd_name!("endpoint-high-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.stack_size(cfg.end_point_stack_size.0 as usize)
.build(),
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/server/config.rs
Expand Up @@ -32,6 +32,9 @@ const DEFAULT_GRPC_CONCURRENT_STREAM: usize = 1024;
const DEFAULT_GRPC_RAFT_CONN_NUM: usize = 10;
const DEFAULT_GRPC_STREAM_INITIAL_WINDOW_SIZE: u64 = 2 * 1024 * 1024;
const DEFAULT_MESSAGES_PER_TICK: usize = 4096;
// Enpoints may occur very deep recursion,
// so enlarge their stack size to 10 MB.
const DEFAULT_ENDPOINT_STACK_SIZE_MB: u64 = 10;

// Assume a request can be finished in 1ms, a request at position x will wait about
// 0.001 * x secs to be actual started. A server-is-busy error will trigger 2 seconds
Expand Down Expand Up @@ -60,6 +63,7 @@ pub struct Config {
pub grpc_stream_initial_window_size: ReadableSize,
pub end_point_concurrency: usize,
pub end_point_max_tasks: usize,
pub end_point_stack_size: ReadableSize,
// Server labels to specify some attributes about this server.
#[serde(with = "config::order_map_serde")]
pub labels: HashMap<String, String>,
Expand All @@ -86,6 +90,7 @@ impl Default for Config {
grpc_stream_initial_window_size: ReadableSize(DEFAULT_GRPC_STREAM_INITIAL_WINDOW_SIZE),
end_point_concurrency: concurrency,
end_point_max_tasks: DEFAULT_MAX_RUNNING_TASK_COUNT,
end_point_stack_size: ReadableSize::mb(DEFAULT_ENDPOINT_STACK_SIZE_MB),
}
}
}
Expand Down Expand Up @@ -114,6 +119,14 @@ impl Config {
return Err(box_err!("server.end-point-max-tasks should not be 0."));
}

// 2MB is the default stack size for threads in rust, but endpoints may occur
// very deep recursion, 2MB considered too small.
//
// See more: https://doc.rust-lang.org/std/thread/struct.Builder.html#method.stack_size
if self.end_point_stack_size.0 < ReadableSize::mb(2).0 {
return Err(box_err!("server.end-point-stack-size is too small."));
}

for (k, v) in &self.labels {
validate_label(k, "key")?;
validate_label(v, "value")?;
Expand Down Expand Up @@ -169,6 +182,10 @@ mod tests {
invalid_cfg.end_point_concurrency = 0;
assert!(invalid_cfg.validate().is_err());

let mut invalid_cfg = cfg.clone();
invalid_cfg.end_point_stack_size = ReadableSize::mb(1);
assert!(invalid_cfg.validate().is_err());

let mut invalid_cfg = cfg.clone();
invalid_cfg.end_point_max_tasks = 0;
assert!(invalid_cfg.validate().is_err());
Expand Down
24 changes: 17 additions & 7 deletions src/util/threadpool.rs
Expand Up @@ -96,6 +96,7 @@ pub struct ThreadPoolBuilder<C, F> {
name: String,
thread_count: usize,
tasks_per_tick: usize,
stack_size: Option<usize>,
factory: F,
_ctx: PhantomData<C>,
}
Expand All @@ -112,6 +113,7 @@ impl<C: Context + 'static, F: ContextFactory<C>> ThreadPoolBuilder<C, F> {
name: name,
thread_count: DEFAULT_THREAD_COUNT,
tasks_per_tick: DEFAULT_TASKS_PER_TICK,
stack_size: None,
factory: factory,
_ctx: PhantomData,
}
Expand All @@ -127,11 +129,17 @@ impl<C: Context + 'static, F: ContextFactory<C>> ThreadPoolBuilder<C, F> {
self
}

pub fn stack_size(mut self, size: usize) -> ThreadPoolBuilder<C, F> {
self.stack_size = Some(size);
self
}

pub fn build(self) -> ThreadPool<C> {
ThreadPool::new(
self.name,
self.thread_count,
self.tasks_per_tick,
self.stack_size,
self.factory,
)
}
Expand Down Expand Up @@ -160,6 +168,7 @@ where
name: String,
num_threads: usize,
tasks_per_tick: usize,
stack_size: Option<usize>,
f: C,
) -> ThreadPool<Ctx> {
assert!(num_threads >= 1);
Expand All @@ -175,13 +184,14 @@ where
let state = state.clone();
let task_num = task_count.clone();
let ctx = f.create();
let thread = Builder::new()
.name(name.clone())
.spawn(move || {
let mut worker = Worker::new(state, task_num, tasks_per_tick, ctx);
worker.run();
})
.unwrap();
let mut tb = Builder::new().name(name.clone());
if let Some(stack_size) = stack_size {
tb = tb.stack_size(stack_size);
}
let thread = tb.spawn(move || {
let mut worker = Worker::new(state, task_num, tasks_per_tick, ctx);
worker.run();
}).unwrap();
threads.push(thread);
}

Expand Down
1 change: 1 addition & 0 deletions tests/config/mod.rs
Expand Up @@ -63,6 +63,7 @@ fn test_serde_custom_tikv_config() {
grpc_stream_initial_window_size: ReadableSize(12_345),
end_point_concurrency: 12,
end_point_max_tasks: 12,
end_point_stack_size: ReadableSize::mb(12),
};
value.metric = MetricConfig {
interval: ReadableDuration::secs(12),
Expand Down
1 change: 1 addition & 0 deletions tests/config/test-custom.toml
Expand Up @@ -12,6 +12,7 @@ grpc-raft-conn-num = 123
grpc-stream-initial-window-size = 12345
end-point-concurrency = 12
end-point-max-tasks = 12
end-point-stack-size = "12MB"

[server.labels]
a = "b"
Expand Down

0 comments on commit 64bffdf

Please sign in to comment.