Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub trait TableContext: Send + Sync {
/// This method builds a `dyn Table`, which provides table specific io methods the plan needs.
fn build_table_from_source_plan(&self, plan: &DataSourcePlan) -> Result<Arc<dyn Table>>;

fn incr_total_scan_value(&self, value: ProgressValues);
fn get_total_scan_value(&self) -> ProgressValues;

fn get_scan_progress(&self) -> Arc<Progress>;
fn get_scan_progress_value(&self) -> ProgressValues;
fn get_write_progress(&self) -> Arc<Progress>;
Expand Down
110 changes: 89 additions & 21 deletions src/query/service/src/servers/flight_sql/flight_sql_service/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use arrow_flight::FlightData;
use arrow_flight::SchemaAsIpc;
use arrow_ipc::writer;
use arrow_ipc::writer::IpcWriteOptions;
use arrow_schema::Schema as ArrowSchema;
use async_stream::stream;
use common_base::base::tokio;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
Expand All @@ -28,7 +30,10 @@ use common_sql::plans::Plan;
use common_sql::PlanExtras;
use common_sql::Planner;
use common_storages_fuse::TableContext;
use futures_util::StreamExt;
use futures::Stream;
use futures::StreamExt;
use serde::Deserialize;
use serde::Serialize;
use tonic::Status;

use super::status;
Expand All @@ -37,6 +42,9 @@ use super::FlightSqlServiceImpl;
use crate::interpreters::InterpreterFactory;
use crate::sessions::Session;

/// A app_metakey which indicates the data is a progress type
static H_PROGRESS: u8 = 0x01;

impl FlightSqlServiceImpl {
pub(crate) fn schema_to_flight_data(data_schema: DataSchema) -> FlightData {
let arrow_schema = ArrowSchema::from(&data_schema);
Expand Down Expand Up @@ -98,49 +106,109 @@ impl FlightSqlServiceImpl {
Ok(affected_rows as i64)
}

#[async_backtrace::framed]
pub(super) async fn execute_query(
&self,
session: Arc<Session>,
plan: &Plan,
plan_extras: &PlanExtras,
) -> Result<DoGetStream> {
let is_native_client = session.get_status().read().is_native_client;

let context = session
.create_query_context()
.await
.map_err(|e| status!("Could not create_query_context", e))?;

context.attach_query_str(plan.to_string(), plan_extras.stament.to_mask_sql());
let interpreter = InterpreterFactory::get(context.clone(), plan).await?;

let data_schema = interpreter.schema();
let schema_flight_data = Self::schema_to_flight_data((*data_schema).clone());
let data_stream = interpreter.execute(context.clone()).await?;

let mut data_stream = interpreter.execute(context.clone()).await?;
let is_finished = Arc::new(AtomicBool::new(false));
let is_finished_clone = is_finished.clone();
let (sender, receiver) = tokio::sync::mpsc::channel(2);
let _ = sender
.send(Ok(Self::schema_to_flight_data((*data_schema).clone())))
.await;

let s1 = sender.clone();
tokio::spawn(async move {
let mut data_stream = data_stream;

let stream = stream! {
yield Ok(schema_flight_data);
while let Some(block) = data_stream.next().await {
match block {
Ok(block) => {
match Self::block_to_flight_data(block, &data_schema) {
Ok(flight_data) => {
yield Ok(flight_data)
}
Err(err) => {
yield Err(status!("Could not convert batches", err))
}
}
let res =
match FlightSqlServiceImpl::block_to_flight_data(block, &data_schema) {
Ok(flight_data) => Ok(flight_data),
Err(err) => Err(status!("Could not convert batches", err)),
};

let _ = s1.send(res).await;
}
Err(err) => {
yield Err(status!("Could not convert batches", err))
let _ = s1
.send(Err(status!("Could not convert batches", err)))
.await;
}
};
}
}
is_finished_clone.store(true, Ordering::SeqCst);
});

// to hold session ref until stream is all consumed
let _ = session.get_id();
};
if is_native_client {
tokio::spawn(async move {
let total_scan_value = context.get_total_scan_value();
let mut current_scan_value = context.get_scan_progress_value();

Ok(Box::pin(stream))
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(20));
while !is_finished.load(Ordering::SeqCst) {
interval.tick().await;

let progress = context.get_scan_progress_value();
if progress.rows == current_scan_value.rows {
continue;
}
current_scan_value = progress;

let progress = ProgressValue {
total_rows: total_scan_value.rows,
total_bytes: total_scan_value.bytes,

read_rows: current_scan_value.rows,
read_bytes: current_scan_value.bytes,
};

let progress = serde_json::to_vec(&progress).unwrap();
let progress_flight_data = FlightData {
app_metadata: vec![H_PROGRESS].into(),
data_body: progress.into(),
..Default::default()
};
let _ = sender.send(Ok(progress_flight_data)).await;
}
});
}

fn receiver_to_stream<T>(
receiver: tokio::sync::mpsc::Receiver<T>,
) -> impl Stream<Item = T> {
futures::stream::unfold(receiver, |mut receiver| async {
receiver.recv().await.map(|value| (value, receiver))
})
}

let st = receiver_to_stream(receiver);
Ok(Box::pin(st))
}
}

#[derive(Serialize, Deserialize, Debug)]
struct ProgressValue {
pub total_rows: usize,
pub total_bytes: usize,

pub read_rows: usize,
pub read_bytes: usize,
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
Status,
> {
let remote_addr = request.remote_addr();

let (user, password) = FlightSqlServiceImpl::get_user_password(request.metadata())
.map_err(Status::invalid_argument)?;
let session = FlightSqlServiceImpl::auth_user_password(user, password, remote_addr).await?;
Expand All @@ -136,6 +137,10 @@ impl FlightSqlService for FlightSqlServiceImpl {
let metadata = MetadataValue::try_from(str)
.map_err(|_| Status::internal("authorization not parsable"))?;
resp.metadata_mut().insert("authorization", metadata);

session.get_status().write().is_native_client =
FlightSqlServiceImpl::get_header_value(request.metadata(), "bendsql").is_some();

self.sessions.insert(token, session);
Ok(resp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ impl FlightSqlServiceImpl {
}
}

pub(super) fn get_header_value(metadata: &MetadataMap, key: &str) -> Option<String> {
metadata
.get(key)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string())
}

pub(super) fn get_user_password(metadata: &MetadataMap) -> Result<(String, String), String> {
let basic = "Basic ";
let authorization = metadata
.get("authorization")
.ok_or("authorization field not present")?
.to_str()
.map_err(|e| format!("authorization not parsable: {}", e))?;
let authorization = Self::get_header_value(metadata, "authorization")
.ok_or_else(|| "authorization not parsable".to_string())?;

if !authorization.starts_with(basic) {
return Err(format!("Auth type not implemented: {authorization}"));
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ impl TableContext for QueryContext {
}
}

fn incr_total_scan_value(&self, value: ProgressValues) {
self.shared.total_scan_values.as_ref().incr(&value);
}

fn get_total_scan_value(&self) -> ProgressValues {
self.shared.total_scan_values.as_ref().get_values()
}

fn get_scan_progress(&self) -> Arc<Progress> {
self.shared.scan_progress.clone()
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type DatabaseAndTable = (String, String, String);
/// FROM table_name_4;
/// For each subquery, they will share a runtime, session, progress, init_query_id
pub struct QueryContextShared {
/// total_scan_values for scan stats
pub(in crate::sessions) total_scan_values: Arc<Progress>,
/// scan_progress for scan metrics of datablocks (uncompressed)
pub(in crate::sessions) scan_progress: Arc<Progress>,
/// write_progress for write/commit metrics of datablocks (uncompressed)
Expand Down Expand Up @@ -96,6 +98,7 @@ impl QueryContextShared {
catalog_manager: CatalogManager::instance(),
data_operator: DataOperator::instance(),
init_query_id: Arc::new(RwLock::new(Uuid::new_v4().to_string())),
total_scan_values: Arc::new(Progress::create()),
scan_progress: Arc::new(Progress::create()),
result_progress: Arc::new(Progress::create()),
write_progress: Arc::new(Progress::create()),
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/sessions/session_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Instant;
pub struct SessionStatus {
pub session_started_at: Instant,
pub last_query_finished_at: Option<Instant>,
pub is_native_client: bool,
}

impl SessionStatus {
Expand All @@ -35,6 +36,7 @@ impl Default for SessionStatus {
SessionStatus {
session_started_at: Instant::now(),
last_query_finished_at: None,
is_native_client: false,
}
}
}
1 change: 1 addition & 0 deletions src/query/service/src/stream/processor_executor_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl PullingExecutorStream {
impl Stream for PullingExecutorStream {
type Item = Result<DataBlock>;

// The ctx can't be wake up, so we can't return Poll::Pending here
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let self_ = Pin::get_mut(self);
match self_.executor.pull_data() {
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ impl TableContext for CtxDelegation {
todo!()
}

fn incr_total_scan_value(&self, _value: ProgressValues) {
todo!()
}

fn get_total_scan_value(&self) -> ProgressValues {
todo!()
}

fn get_scan_progress(&self) -> Arc<Progress> {
todo!()
}
Expand Down
6 changes: 6 additions & 0 deletions src/query/sql/src/executor/table_read_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use common_base::base::ProgressValues;
use common_catalog::plan::DataSourcePlan;
use common_catalog::plan::InternalColumn;
use common_catalog::plan::PartStatistics;
Expand Down Expand Up @@ -73,6 +74,11 @@ impl ToReadDataSourcePlan for dyn Table {
self.read_partitions(ctx.clone(), push_downs.clone()).await
}?;

ctx.incr_total_scan_value(ProgressValues {
rows: statistics.read_rows,
bytes: statistics.read_bytes,
});

// We need the partition sha256 to specify the result cache.
if ctx.get_settings().get_enable_query_result_cache()? {
let sha = parts.compute_sha256()?;
Expand Down