From c7a9991adb70f89f69249eadddeeac9442a341c3 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 3 Apr 2023 14:46:44 +0800 Subject: [PATCH 1/3] feat: flight sql support send progress into client --- src/query/catalog/src/table_context.rs | 3 + .../flight_sql/flight_sql_service/query.rs | 110 ++++++++++++++---- .../flight_sql/flight_sql_service/service.rs | 5 + .../flight_sql/flight_sql_service/session.rs | 15 ++- src/query/service/src/sessions/query_ctx.rs | 8 ++ .../service/src/sessions/query_ctx_shared.rs | 3 + .../service/src/sessions/session_status.rs | 2 + .../src/stream/processor_executor_stream.rs | 1 + src/query/sql/src/executor/table_read_plan.rs | 6 + 9 files changed, 127 insertions(+), 26 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 14498150aa1cd..373ff936dd866 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -78,6 +78,9 @@ pub trait TableContext: Send + Sync { /// This method builds a `dyn Table`, which provides table specific io methods the plan needs. fn build_table_from_source_plan(&self, plan: &DataSourcePlan) -> Result>; + fn incr_total_scan_value(&self, value: ProgressValues); + fn get_total_scan_value(&self) -> ProgressValues; + fn get_scan_progress(&self) -> Arc; fn get_scan_progress_value(&self) -> ProgressValues; fn get_write_progress(&self) -> Arc; diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs index 5b3d0fe2c080f..e28f0c30e490a 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use arrow_flight::FlightData; @@ -19,7 +21,7 @@ use arrow_flight::SchemaAsIpc; use arrow_ipc::writer; use arrow_ipc::writer::IpcWriteOptions; use arrow_schema::Schema as ArrowSchema; -use async_stream::stream; +use common_base::base::tokio; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; @@ -28,7 +30,10 @@ use common_sql::plans::Plan; use common_sql::PlanExtras; use common_sql::Planner; use common_storages_fuse::TableContext; -use futures_util::StreamExt; +use futures::Stream; +use futures::StreamExt; +use serde::Deserialize; +use serde::Serialize; use tonic::Status; use super::status; @@ -98,13 +103,14 @@ impl FlightSqlServiceImpl { Ok(affected_rows as i64) } - #[async_backtrace::framed] pub(super) async fn execute_query( &self, session: Arc, plan: &Plan, plan_extras: &PlanExtras, ) -> Result { + let is_native_client = session.get_status().read().is_native_client; + let context = session .create_query_context() .await @@ -112,35 +118,97 @@ impl FlightSqlServiceImpl { context.attach_query_str(plan.to_string(), plan_extras.stament.to_mask_sql()); let interpreter = InterpreterFactory::get(context.clone(), plan).await?; + let data_schema = interpreter.schema(); - let schema_flight_data = Self::schema_to_flight_data((*data_schema).clone()); + let data_stream = interpreter.execute(context.clone()).await?; + + let is_finished = Arc::new(AtomicBool::new(false)); + let is_finished_clone = is_finished.clone(); + let (sender, receiver) = tokio::sync::mpsc::channel(2); + let _ = sender + .send(Ok(Self::schema_to_flight_data((*data_schema).clone()))) + .await; - let mut data_stream = interpreter.execute(context.clone()).await?; + let s1 = sender.clone(); + tokio::spawn(async move { + let mut data_stream = data_stream; - let stream = stream! { - yield Ok(schema_flight_data); while let Some(block) = data_stream.next().await { match block { Ok(block) => { - match Self::block_to_flight_data(block, &data_schema) { - Ok(flight_data) => { - yield Ok(flight_data) - } - Err(err) => { - yield Err(status!("Could not convert batches", err)) - } - } + let res = + match FlightSqlServiceImpl::block_to_flight_data(block, &data_schema) { + Ok(flight_data) => Ok(flight_data), + Err(err) => Err(status!("Could not convert batches", err)), + }; + + let _ = s1.send(res).await; } Err(err) => { - yield Err(status!("Could not convert batches", err)) + let _ = s1 + .send(Err(status!("Could not convert batches", err))) + .await; } - }; + } } + is_finished_clone.store(true, Ordering::SeqCst); + }); + + if is_native_client { + tokio::spawn(async move { + let total_scan_value = context.get_total_scan_value(); + let mut current_scan_value = context.get_scan_progress_value(); + + let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(20)); + while !is_finished.load(Ordering::SeqCst) { + interval.tick().await; - // to hold session ref until stream is all consumed - let _ = session.get_id(); - }; + let progress = context.get_scan_progress_value(); + if progress.rows == current_scan_value.rows { + continue; + } + current_scan_value = progress; + + let progress = ProgressValue { + total_rows: total_scan_value.rows, + total_bytes: total_scan_value.bytes, + + read_rows: current_scan_value.rows, + read_bytes: current_scan_value.bytes, + }; + + let progress = serde_json::to_vec(&progress).unwrap(); + let progress_flight_data = FlightData { + app_metadata: vec![0x01].into(), + data_body: progress.into(), + ..Default::default() + }; + let _ = sender.send(Ok(progress_flight_data)).await; + } + }); + } + + fn receiver_to_stream( + receiver: tokio::sync::mpsc::Receiver, + ) -> impl Stream { + futures::stream::unfold(receiver, |mut receiver| async { + match receiver.recv().await { + Some(value) => Some((value, receiver)), + None => None, + } + }) + } - Ok(Box::pin(stream)) + let st = receiver_to_stream(receiver); + Ok(Box::pin(st)) } } + +#[derive(Serialize, Deserialize, Debug)] +struct ProgressValue { + pub total_rows: usize, + pub total_bytes: usize, + + pub read_rows: usize, + pub read_bytes: usize, +} diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs index 04e83959a2e63..f09fab68f9e16 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs @@ -120,6 +120,7 @@ impl FlightSqlService for FlightSqlServiceImpl { Status, > { let remote_addr = request.remote_addr(); + let (user, password) = FlightSqlServiceImpl::get_user_password(request.metadata()) .map_err(Status::invalid_argument)?; let session = FlightSqlServiceImpl::auth_user_password(user, password, remote_addr).await?; @@ -136,6 +137,10 @@ impl FlightSqlService for FlightSqlServiceImpl { let metadata = MetadataValue::try_from(str) .map_err(|_| Status::internal("authorization not parsable"))?; resp.metadata_mut().insert("authorization", metadata); + + session.get_status().write().is_native_client = + FlightSqlServiceImpl::get_header_value(request.metadata(), "Bendsql").is_some(); + self.sessions.insert(token, session); Ok(resp) } diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs index 455abf72bcfd0..6cbf903a1cbf7 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs @@ -54,13 +54,18 @@ impl FlightSqlServiceImpl { } } + pub(super) fn get_header_value(metadata: &MetadataMap, key: &str) -> Option { + metadata + .get(key) + .and_then(|v| v.to_str().ok()) + .map(|v| v.to_string()) + } + pub(super) fn get_user_password(metadata: &MetadataMap) -> Result<(String, String), String> { let basic = "Basic "; - let authorization = metadata - .get("authorization") - .ok_or("authorization field not present")? - .to_str() - .map_err(|e| format!("authorization not parsable: {}", e))?; + let authorization = Self::get_header_value(metadata, "authorization") + .ok_or_else(|| "authorization not parsable".to_string())?; + if !authorization.starts_with(basic) { return Err(format!("Auth type not implemented: {authorization}")); } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 9dbea6e9d3062..fe0c7678ae177 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -236,6 +236,14 @@ impl TableContext for QueryContext { } } + fn incr_total_scan_value(&self, value: ProgressValues) { + self.shared.total_scan_values.as_ref().incr(&value); + } + + fn get_total_scan_value(&self) -> ProgressValues { + self.shared.total_scan_values.as_ref().get_values() + } + fn get_scan_progress(&self) -> Arc { self.shared.scan_progress.clone() } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index b94b863b3dcf6..3374a80ef90b5 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -55,6 +55,8 @@ type DatabaseAndTable = (String, String, String); /// FROM table_name_4; /// For each subquery, they will share a runtime, session, progress, init_query_id pub struct QueryContextShared { + /// total_scan_values for scan stats + pub(in crate::sessions) total_scan_values: Arc, /// scan_progress for scan metrics of datablocks (uncompressed) pub(in crate::sessions) scan_progress: Arc, /// write_progress for write/commit metrics of datablocks (uncompressed) @@ -96,6 +98,7 @@ impl QueryContextShared { catalog_manager: CatalogManager::instance(), data_operator: DataOperator::instance(), init_query_id: Arc::new(RwLock::new(Uuid::new_v4().to_string())), + total_scan_values: Arc::new(Progress::create()), scan_progress: Arc::new(Progress::create()), result_progress: Arc::new(Progress::create()), write_progress: Arc::new(Progress::create()), diff --git a/src/query/service/src/sessions/session_status.rs b/src/query/service/src/sessions/session_status.rs index 67a4582b1dcb2..2ded5ae4d57e3 100644 --- a/src/query/service/src/sessions/session_status.rs +++ b/src/query/service/src/sessions/session_status.rs @@ -17,6 +17,7 @@ use std::time::Instant; pub struct SessionStatus { pub session_started_at: Instant, pub last_query_finished_at: Option, + pub is_native_client: bool, } impl SessionStatus { @@ -35,6 +36,7 @@ impl Default for SessionStatus { SessionStatus { session_started_at: Instant::now(), last_query_finished_at: None, + is_native_client: false, } } } diff --git a/src/query/service/src/stream/processor_executor_stream.rs b/src/query/service/src/stream/processor_executor_stream.rs index 64aa770872c81..3dd85519ddc63 100644 --- a/src/query/service/src/stream/processor_executor_stream.rs +++ b/src/query/service/src/stream/processor_executor_stream.rs @@ -36,6 +36,7 @@ impl PullingExecutorStream { impl Stream for PullingExecutorStream { type Item = Result; + // The ctx can't be wake up, so we can't return Poll::Pending here fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { let self_ = Pin::get_mut(self); match self_.executor.pull_data() { diff --git a/src/query/sql/src/executor/table_read_plan.rs b/src/query/sql/src/executor/table_read_plan.rs index 1ae96a33dc9ea..4f3233b267ec7 100644 --- a/src/query/sql/src/executor/table_read_plan.rs +++ b/src/query/sql/src/executor/table_read_plan.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use std::sync::Arc; +use common_base::base::ProgressValues; use common_catalog::plan::DataSourcePlan; use common_catalog::plan::InternalColumn; use common_catalog::plan::PartStatistics; @@ -73,6 +74,11 @@ impl ToReadDataSourcePlan for dyn Table { self.read_partitions(ctx.clone(), push_downs.clone()).await }?; + ctx.incr_total_scan_value(ProgressValues { + rows: statistics.read_rows, + bytes: statistics.read_bytes, + }); + // We need the partition sha256 to specify the result cache. if ctx.get_settings().get_enable_query_result_cache()? { let sha = parts.compute_sha256()?; From e258ad607a5d74303a3a86abd0f815f1be55a98b Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 3 Apr 2023 14:57:43 +0800 Subject: [PATCH 2/3] feat: flight sql support send progress into client --- .../src/servers/flight_sql/flight_sql_service/query.rs | 5 ++++- .../src/servers/flight_sql/flight_sql_service/service.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs index e28f0c30e490a..5a4111f41d8d7 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs @@ -42,6 +42,9 @@ use super::FlightSqlServiceImpl; use crate::interpreters::InterpreterFactory; use crate::sessions::Session; +/// A app_metakey which indicates the data is a progress type +static H_PROGRESS: u8 = 0x01; + impl FlightSqlServiceImpl { pub(crate) fn schema_to_flight_data(data_schema: DataSchema) -> FlightData { let arrow_schema = ArrowSchema::from(&data_schema); @@ -179,7 +182,7 @@ impl FlightSqlServiceImpl { let progress = serde_json::to_vec(&progress).unwrap(); let progress_flight_data = FlightData { - app_metadata: vec![0x01].into(), + app_metadata: vec![H_PROGRESS].into(), data_body: progress.into(), ..Default::default() }; diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs index f09fab68f9e16..2b0d0bb34e2cb 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/service.rs @@ -139,7 +139,7 @@ impl FlightSqlService for FlightSqlServiceImpl { resp.metadata_mut().insert("authorization", metadata); session.get_status().write().is_native_client = - FlightSqlServiceImpl::get_header_value(request.metadata(), "Bendsql").is_some(); + FlightSqlServiceImpl::get_header_value(request.metadata(), "bendsql").is_some(); self.sessions.insert(token, session); Ok(resp) From 86a222d6a94891cbac4ac3e34837b6f3bd8218ad Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 3 Apr 2023 15:11:35 +0800 Subject: [PATCH 3/3] feat: client support flight progress --- .../src/servers/flight_sql/flight_sql_service/query.rs | 5 +---- .../service/tests/it/storages/fuse/operations/commit.rs | 8 ++++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs index 5a4111f41d8d7..1efa4a046ca52 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs @@ -195,10 +195,7 @@ impl FlightSqlServiceImpl { receiver: tokio::sync::mpsc::Receiver, ) -> impl Stream { futures::stream::unfold(receiver, |mut receiver| async { - match receiver.recv().await { - Some(value) => Some((value, receiver)), - None => None, - } + receiver.recv().await.map(|value| (value, receiver)) }) } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 9baba65137ed7..478313d305aed 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -371,6 +371,14 @@ impl TableContext for CtxDelegation { todo!() } + fn incr_total_scan_value(&self, _value: ProgressValues) { + todo!() + } + + fn get_total_scan_value(&self) -> ProgressValues { + todo!() + } + fn get_scan_progress(&self) -> Arc { todo!() }