Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce new Output with OutputMeta #3466

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use client::api::v1::column::Values;
use client::api::v1::{
Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType,
};
use client::{Client, Database, Output, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures_util::TryStreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand Down Expand Up @@ -502,9 +502,9 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
for i in 0..num_iter {
let now = Instant::now();
let res = db.sql(&query).await.unwrap();
match res {
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
Output::Stream(stream, _) => {
match res.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => (),
OutputData::Stream(stream) => {
stream.try_collect::<Vec<_>>().await.unwrap();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl Database {
reason: "Expect 'AffectedRows' Flight messages to be the one and the only!"
}
);
Ok(Output::AffectedRows(rows))
Ok(Output::new_with_affectedrows(rows))
}
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu {
Expand Down Expand Up @@ -340,7 +340,7 @@ impl Database {
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::new_stream(Box::pin(record_batch_stream)))
Ok(Output::new_with_stream(Box::pin(record_batch_stream)))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use api::v1::greptime_response::Response;
use api::v1::{AffectedRows, GreptimeResponse};
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
pub use common_query::Output;
pub use common_query::{Output, OutputData, OutputMeta};
pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;

Expand Down
9 changes: 4 additions & 5 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use client::api::v1::auth_header::AuthScheme;
use client::api::v1::Basic;
use client::{Client, Database, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use client::{Client, Database, OutputData, DEFAULT_SCHEMA_NAME};
use common_recordbatch::util::collect;
use common_telemetry::{debug, error, info, warn};
use datatypes::scalars::ScalarVector;
Expand Down Expand Up @@ -142,7 +141,7 @@ impl Export {
.with_context(|_| RequestDatabaseSnafu {
sql: "show databases".to_string(),
})?;
let Output::Stream(stream, _) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)
Expand Down Expand Up @@ -183,7 +182,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream, _) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let Some(record_batch) = collect(stream)
Expand Down Expand Up @@ -235,7 +234,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream, _) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Instant;
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_query::Output;
Expand Down Expand Up @@ -184,15 +184,15 @@ impl Repl {
}
.context(RequestDatabaseSnafu { sql: &sql })?;

let either = match output {
Output::Stream(s, _) => {
let either = match output.data {
OutputData::Stream(s) => {
let x = RecordBatches::try_collect(s)
.await
.context(CollectRecordBatchesSnafu)?;
Either::Left(x)
}
Output::RecordBatches(x) => Either::Left(x),
Output::AffectedRows(rows) => Either::Right(rows),
OutputData::RecordBatches(x) => Either::Left(x),
OutputData::AffectedRows(rows) => Either::Right(rows),
};

let end = Instant::now();
Expand Down
80 changes: 64 additions & 16 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,86 @@ pub mod prelude;
mod signature;
use sqlparser_derive::{Visit, VisitMut};

// sql output
pub enum Output {
/// new Output struct with output data(previously Output) and output meta
#[derive(Debug)]
pub struct Output {
pub data: OutputData,
pub meta: OutputMeta,
}

/// Original Output struct
/// carrying result data to response/client/user interface
pub enum OutputData {
AffectedRows(usize),
RecordBatches(RecordBatches),
Stream(SendableRecordBatchStream, Option<Arc<dyn PhysicalPlan>>),
Stream(SendableRecordBatchStream),
}

/// OutputMeta stores meta information produced/generated during the execution
#[derive(Debug, Default)]
pub struct OutputMeta {
pub plan: Option<Arc<dyn PhysicalPlan>>,
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
pub cost: usize,
}

impl Output {
// helper function to build original `Output::Stream`
pub fn new_stream(stream: SendableRecordBatchStream) -> Self {
Output::Stream(stream, None)
pub fn new_with_affectedrows(affected_rows: usize) -> Self {
Self {
data: OutputData::AffectedRows(affected_rows),
meta: Default::default(),
}
}

pub fn new_with_recordbatches(recordbatches: RecordBatches) -> Self {
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
Self {
data: OutputData::RecordBatches(recordbatches),
meta: Default::default(),
}
}

pub fn new_with_stream(stream: SendableRecordBatchStream) -> Self {
Self {
data: OutputData::Stream(stream),
meta: Default::default(),
}
}

pub fn new(data: OutputData, meta: OutputMeta) -> Self {
Self { data, meta }
}
}

impl Debug for Output {
impl Debug for OutputData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Output::AffectedRows(rows) => write!(f, "Output::AffectedRows({rows})"),
Output::RecordBatches(recordbatches) => {
write!(f, "Output::RecordBatches({recordbatches:?})")
OutputData::AffectedRows(rows) => write!(f, "OutputData::AffectedRows({rows})"),
OutputData::RecordBatches(recordbatches) => {
write!(f, "OutputData::RecordBatches({recordbatches:?})")
}
Output::Stream(_, df) => {
if df.is_some() {
write!(f, "Output::Stream(<stream>, Some<physical_plan>)")
} else {
write!(f, "Output::Stream(<stream>)")
}
OutputData::Stream(_) => {
write!(f, "OutputData::Stream(<stream>)")
}
}
}
}

impl OutputMeta {
pub fn new(plan: Option<Arc<dyn PhysicalPlan>>, cost: usize) -> Self {
Self { plan, cost }
}

pub fn new_with_plan(plan: Arc<dyn PhysicalPlan>) -> Self {
Self {
plan: Some(plan),
cost: 0,
}
}

pub fn new_with_cost(cost: usize) -> Self {
Self { plan: None, cost }
}
}

pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
Expand Down
16 changes: 9 additions & 7 deletions src/common/test-util/src/recordbatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use client::Database;
use common_query::Output;
use common_query::OutputData;
use common_recordbatch::util;

pub enum ExpectedOutput<'a> {
Expand All @@ -23,22 +23,24 @@ pub enum ExpectedOutput<'a> {

pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {
let output = db.sql(sql).await.unwrap();
let output = output.data;

match (&output, expected) {
(Output::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
(OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
assert_eq!(*x, y, "actual: \n{}", x)
}
(Output::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (Output::Stream(_, _), ExpectedOutput::QueryResult(x)) => {
(OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => {
check_output_stream(output, x).await
}
_ => panic!(),
}
}

pub async fn check_output_stream(output: Output, expected: &str) {
pub async fn check_output_stream(output: OutputData, expected: &str) {
let recordbatches = match output {
Output::Stream(stream, _) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
OutputData::Stream(stream) => util::collect_batches(stream).await.unwrap(),
OutputData::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
Expand Down
8 changes: 4 additions & 4 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_query::{DfPhysicalPlan, OutputData};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
Expand Down Expand Up @@ -651,11 +651,11 @@ impl RegionServerInner {
.await
.context(ExecuteLogicalPlanSnafu)?;

match result {
Output::AffectedRows(_) | Output::RecordBatches(_) => {
match result.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
UnsupportedOutputSnafu { expected: "stream" }.fail()
}
Output::Stream(stream, _) => Ok(stream),
OutputData::Stream(stream) => Ok(stream),
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::BoxedError;
Expand Down Expand Up @@ -401,13 +402,13 @@ impl SqlQueryHandler for Instance {

/// Attaches a timer to the output and observes it once the output is exhausted.
pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
match output {
Output::AffectedRows(_) | Output::RecordBatches(_) => output,
Output::Stream(stream, plan) => {
match output.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
OutputData::Stream(stream) => {
let stream = OnDone::new(stream, move || {
timer.observe_duration();
});
Output::Stream(Box::pin(stream), plan)
Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl GrpcQueryHandler for Instance {
.statement_executor
.create_table_inner(&mut expr, None, &ctx)
.await?;
Output::AffectedRows(0)
Output::new_with_affectedrows(0)
}
DdlExpr::Alter(expr) => self.statement_executor.alter_table_inner(expr).await?,
DdlExpr::CreateDatabase(expr) => {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ impl OpentsdbProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

Ok(match output {
common_query::Output::AffectedRows(rows) => rows,
Ok(match output.data {
common_query::OutputData::AffectedRows(rows) => rows,
_ => unreachable!(),
})
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, Wri
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::OutputData;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
Expand Down Expand Up @@ -77,7 +78,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
}

async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
let Output::Stream(stream, _) = output else {
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream)
Expand Down
3 changes: 2 additions & 1 deletion src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl Deleter {
.await?;

let affected_rows = self.do_request(deletes, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))

Ok(Output::new_with_affectedrows(affected_rows))
}

pub async fn handle_table_delete(
Expand Down
6 changes: 3 additions & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Inserter {
.await?;

let affected_rows = self.do_request(inserts, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
Ok(Output::new_with_affectedrows(affected_rows))
}

/// Handle row inserts request with metric engine.
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Inserter {
.await?;

let affected_rows = self.do_request(inserts, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
Ok(Output::new_with_affectedrows(affected_rows))
}

pub async fn handle_table_insert(
Expand Down Expand Up @@ -185,7 +185,7 @@ impl Inserter {
.await?;

let affected_rows = self.do_request(inserts, ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
Ok(Output::new_with_affectedrows(affected_rows))
}
}

Expand Down