Skip to content

Commit

Permalink
chore(query): support custom query graceful shutdown timeout (#14712)
Browse files Browse the repository at this point in the history
* chore(query): support custom query graceful shutdown timeout

* chore(query): support custom query graceful shutdown timeout

* chore(query): support custom query graceful shutdown timeout

* chore(query): support custom query graceful shutdown timeout
  • Loading branch information
zhang2014 committed Feb 23, 2024
1 parent 11ee029 commit 5c46a27
Show file tree
Hide file tree
Showing 18 changed files with 56 additions and 41 deletions.
2 changes: 1 addition & 1 deletion docker/query-config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

flight_api_address = "0.0.0.0:9090"
admin_api_address = "0.0.0.0:8080"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# Databend Query http address.
# For admin RESET API.
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:9091"
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:9092"
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-3.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:9093"
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-hive.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:9090"
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-native.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:9091"
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-share-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:19091"
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-share-2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:29091"
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/config/databend-query-node-share-3.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:19391"
Expand Down
2 changes: 1 addition & 1 deletion scripts/distribution/configs/databend-query.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[query]
max_active_sessions = 256
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000

# Internal flight rpc for cluster communication.
flight_api_address = "0.0.0.0:9091"
Expand Down
7 changes: 6 additions & 1 deletion src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::env;
use std::time::Duration;

use databend_common_base::mem_allocator::GlobalAllocator;
use databend_common_base::runtime::GLOBAL_MEM_STAT;
Expand Down Expand Up @@ -348,7 +349,11 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {
// for one shot background service, we need to drop it manually.
drop(shutdown_handle);
} else {
shutdown_handle.wait_for_termination_request().await;
let graceful_shutdown_timeout =
Some(Duration::from_millis(conf.query.shutdown_wait_timeout_ms));
shutdown_handle
.wait_for_termination_request(graceful_shutdown_timeout)
.await;
}
info!("Shutdown server.");
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ pub struct QueryConfig {
pub table_engine_memory_enabled: bool,

#[clap(long, value_name = "VALUE", default_value = "5000")]
pub wait_timeout_mills: u64,
pub shutdown_wait_timeout_ms: u64,

#[clap(long, value_name = "VALUE", default_value = "10000")]
pub max_query_log_size: usize,
Expand Down Expand Up @@ -1703,7 +1703,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
rpc_tls_query_service_domain_name: self.rpc_tls_query_service_domain_name,
rpc_client_timeout_secs: self.rpc_client_timeout_secs,
table_engine_memory_enabled: self.table_engine_memory_enabled,
wait_timeout_mills: self.wait_timeout_mills,
shutdown_wait_timeout_ms: self.shutdown_wait_timeout_ms,
max_query_log_size: self.max_query_log_size,
databend_enterprise_license: self.databend_enterprise_license,
management_mode: self.management_mode,
Expand Down Expand Up @@ -1787,7 +1787,7 @@ impl From<InnerQueryConfig> for QueryConfig {
rpc_tls_query_service_domain_name: inner.rpc_tls_query_service_domain_name,
rpc_client_timeout_secs: inner.rpc_client_timeout_secs,
table_engine_memory_enabled: inner.table_engine_memory_enabled,
wait_timeout_mills: inner.wait_timeout_mills,
shutdown_wait_timeout_ms: inner.shutdown_wait_timeout_ms,
max_query_log_size: inner.max_query_log_size,
databend_enterprise_license: inner.databend_enterprise_license,
management_mode: inner.management_mode,
Expand Down
5 changes: 3 additions & 2 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ pub struct QueryConfig {
pub rpc_client_timeout_secs: u64,
/// Table engine memory enabled
pub table_engine_memory_enabled: bool,
pub wait_timeout_mills: u64,
/// Graceful shutdown timeout
pub shutdown_wait_timeout_ms: u64,
pub max_query_log_size: usize,
pub databend_enterprise_license: Option<String>,
/// If in management mode, only can do some meta level operations(database/table/user/stage etc.) with metasrv.
Expand Down Expand Up @@ -269,7 +270,7 @@ impl Default for QueryConfig {
rpc_tls_query_service_domain_name: "localhost".to_string(),
rpc_client_timeout_secs: 0,
table_engine_memory_enabled: true,
wait_timeout_mills: 5000,
shutdown_wait_timeout_ms: 5000,
max_query_log_size: 10_000,
databend_enterprise_license: None,
management_mode: false,
Expand Down
11 changes: 6 additions & 5 deletions src/query/service/src/servers/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use databend_common_base::base::signal_stream;
use databend_common_base::base::DummySignalStream;
Expand Down Expand Up @@ -67,17 +68,17 @@ impl ShutdownHandle {
}

#[async_backtrace::framed]
pub async fn shutdown(&mut self, mut signal: SignalStream) {
pub async fn shutdown(&mut self, mut signal: SignalStream, timeout: Option<Duration>) {
self.shutdown_services(true).await;
ClusterDiscovery::instance()
.unregister_to_metastore(&mut signal)
.await;
self.sessions.graceful_shutdown(signal, 5).await;
self.sessions.graceful_shutdown(signal, timeout).await;
self.shutdown_services(false).await;
}

#[async_backtrace::framed]
pub async fn wait_for_termination_request(&mut self) {
pub async fn wait_for_termination_request(&mut self, timeout: Option<Duration>) {
match signal_stream() {
Err(cause) => {
error!("Cannot set shutdown signal handler, {:?}", cause);
Expand All @@ -91,7 +92,7 @@ impl ShutdownHandle {
self.shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire)
{
let shutdown_services = self.shutdown(stream);
let shutdown_services = self.shutdown(stream, timeout);
shutdown_services.await;
}
}
Expand All @@ -110,7 +111,7 @@ impl Drop for ShutdownHandle {
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire)
{
let signal_stream = DummySignalStream::create(SignalType::Exit);
futures::executor::block_on(self.shutdown(signal_stream));
futures::executor::block_on(self.shutdown(signal_stream, Some(Duration::from_secs(5))));
}
}
}
42 changes: 25 additions & 17 deletions src/query/service/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,27 +200,35 @@ impl SessionManager {
pub fn graceful_shutdown(
&self,
mut signal: SignalStream,
timeout_secs: i32,
timeout: Option<Duration>,
) -> impl Future<Output = ()> {
let active_sessions = self.active_sessions.clone();
async move {
info!(
"Waiting {} secs for connections to close. You can press Ctrl + C again to force shutdown.",
timeout_secs
);
let mut signal = Box::pin(signal.next());

for _index in 0..timeout_secs {
if SessionManager::destroy_idle_sessions(&active_sessions) {
return;
}
if let Some(mut timeout) = timeout {
info!(
"Waiting {:?} for connections to close. You can press Ctrl + C again to force shutdown.",
timeout
);

let mut signal = Box::pin(signal.next());

let interval = Duration::from_secs(1);
let sleep = Box::pin(tokio::time::sleep(interval));
match futures::future::select(sleep, signal).await {
Either::Right((_, _)) => break,
Either::Left((_, reserve_signal)) => signal = reserve_signal,
};
while !timeout.is_zero() {
if SessionManager::destroy_idle_sessions(&active_sessions) {
return;
}

let interval = Duration::from_secs(1);
let sleep = Box::pin(tokio::time::sleep(interval));
match futures::future::select(sleep, signal).await {
Either::Right((_, _)) => break,
Either::Left((_, reserve_signal)) => signal = reserve_signal,
};

timeout = match timeout > Duration::from_secs(1) {
true => timeout - Duration::from_secs(1),
false => Duration::from_secs(0),
};
}
}

info!("Will shutdown forcefully.");
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ rpc_tls_server_key = ""
rpc_tls_query_server_root_ca_cert = ""
rpc_tls_query_service_domain_name = "localhost"
table_engine_memory_enabled = true
wait_timeout_mills = 5000
shutdown_wait_timeout_ms = 5000
max_query_log_size = 10000
management_mode = false
jwt_key_file = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'query' | 'rpc_tls_server_key' | '' | '' |
| 'query' | 'share_endpoint_address' | '' | '' |
| 'query' | 'share_endpoint_auth_token_file' | '' | '' |
| 'query' | 'shutdown_wait_timeout_ms' | '5000' | '' |
| 'query' | 'table_engine_memory_enabled' | 'true' | '' |
| 'query' | 'tenant_id' | 'test' | '' |
| 'query' | 'udf_server_allow_list' | '' | '' |
| 'query' | 'users' | '{"name":"root","auth_type":"no_password","auth_string":null}' | '' |
| 'query' | 'wait_timeout_mills' | '5000' | '' |
| 'storage' | 'allow_insecure' | 'false' | '' |
| 'storage' | 'azblob.account_key' | '' | '' |
| 'storage' | 'azblob.account_name' | '' | '' |
Expand Down

0 comments on commit 5c46a27

Please sign in to comment.