Skip to content

Commit

Permalink
refactor: introduce new Output with OutputMeta (#3466)
Browse files Browse the repository at this point in the history
* refactor: introduce new output struct

* chore: add helper function

* chore: update comment

* chore: update commit

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: rename according to cr

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
shuiyisong and waynexia committed Mar 11, 2024
1 parent 8c37c3f commit 0bb9497
Show file tree
Hide file tree
Showing 54 changed files with 807 additions and 592 deletions.
8 changes: 4 additions & 4 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use client::api::v1::column::Values;
use client::api::v1::{
Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType,
};
use client::{Client, Database, Output, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures_util::TryStreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand Down Expand Up @@ -502,9 +502,9 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
for i in 0..num_iter {
let now = Instant::now();
let res = db.sql(&query).await.unwrap();
match res {
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
Output::Stream(stream, _) => {
match res.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => (),
OutputData::Stream(stream) => {
stream.try_collect::<Vec<_>>().await.unwrap();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl Database {
reason: "Expect 'AffectedRows' Flight messages to be the one and the only!"
}
);
Ok(Output::AffectedRows(rows))
Ok(Output::new_with_affected_rows(rows))
}
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu {
Expand Down Expand Up @@ -340,7 +340,7 @@ impl Database {
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::new_stream(Box::pin(record_batch_stream)))
Ok(Output::new_with_stream(Box::pin(record_batch_stream)))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use api::v1::greptime_response::Response;
use api::v1::{AffectedRows, GreptimeResponse};
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
pub use common_query::Output;
pub use common_query::{Output, OutputData, OutputMeta};
pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;

Expand Down
9 changes: 4 additions & 5 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use client::api::v1::auth_header::AuthScheme;
use client::api::v1::Basic;
use client::{Client, Database, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use client::{Client, Database, OutputData, DEFAULT_SCHEMA_NAME};
use common_recordbatch::util::collect;
use common_telemetry::{debug, error, info, warn};
use datatypes::scalars::ScalarVector;
Expand Down Expand Up @@ -142,7 +141,7 @@ impl Export {
.with_context(|_| RequestDatabaseSnafu {
sql: "show databases".to_string(),
})?;
let Output::Stream(stream, _) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)
Expand Down Expand Up @@ -183,7 +182,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream, _) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let Some(record_batch) = collect(stream)
Expand Down Expand Up @@ -235,7 +234,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream, _) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Instant;
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_query::Output;
Expand Down Expand Up @@ -184,15 +184,15 @@ impl Repl {
}
.context(RequestDatabaseSnafu { sql: &sql })?;

let either = match output {
Output::Stream(s, _) => {
let either = match output.data {
OutputData::Stream(s) => {
let x = RecordBatches::try_collect(s)
.await
.context(CollectRecordBatchesSnafu)?;
Either::Left(x)
}
Output::RecordBatches(x) => Either::Left(x),
Output::AffectedRows(rows) => Either::Right(rows),
OutputData::RecordBatches(x) => Either::Left(x),
OutputData::AffectedRows(rows) => Either::Right(rows),
};

let end = Instant::now();
Expand Down
81 changes: 65 additions & 16 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,87 @@ pub mod prelude;
mod signature;
use sqlparser_derive::{Visit, VisitMut};

// sql output
pub enum Output {
/// new Output struct with output data(previously Output) and output meta
#[derive(Debug)]
pub struct Output {
pub data: OutputData,
pub meta: OutputMeta,
}

/// Original Output struct
/// carrying result data to response/client/user interface
pub enum OutputData {
AffectedRows(usize),
RecordBatches(RecordBatches),
Stream(SendableRecordBatchStream, Option<Arc<dyn PhysicalPlan>>),
Stream(SendableRecordBatchStream),
}

/// OutputMeta stores meta information produced/generated during the execution
#[derive(Debug, Default)]
pub struct OutputMeta {
/// May exist for query output. One can retrieve execution metrics from this plan.
pub plan: Option<Arc<dyn PhysicalPlan>>,
pub cost: usize,
}

impl Output {
// helper function to build original `Output::Stream`
pub fn new_stream(stream: SendableRecordBatchStream) -> Self {
Output::Stream(stream, None)
pub fn new_with_affected_rows(affected_rows: usize) -> Self {
Self {
data: OutputData::AffectedRows(affected_rows),
meta: Default::default(),
}
}

pub fn new_with_record_batches(recordbatches: RecordBatches) -> Self {
Self {
data: OutputData::RecordBatches(recordbatches),
meta: Default::default(),
}
}

pub fn new_with_stream(stream: SendableRecordBatchStream) -> Self {
Self {
data: OutputData::Stream(stream),
meta: Default::default(),
}
}

pub fn new(data: OutputData, meta: OutputMeta) -> Self {
Self { data, meta }
}
}

impl Debug for Output {
impl Debug for OutputData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Output::AffectedRows(rows) => write!(f, "Output::AffectedRows({rows})"),
Output::RecordBatches(recordbatches) => {
write!(f, "Output::RecordBatches({recordbatches:?})")
OutputData::AffectedRows(rows) => write!(f, "OutputData::AffectedRows({rows})"),
OutputData::RecordBatches(recordbatches) => {
write!(f, "OutputData::RecordBatches({recordbatches:?})")
}
Output::Stream(_, df) => {
if df.is_some() {
write!(f, "Output::Stream(<stream>, Some<physical_plan>)")
} else {
write!(f, "Output::Stream(<stream>)")
}
OutputData::Stream(_) => {
write!(f, "OutputData::Stream(<stream>)")
}
}
}
}

impl OutputMeta {
pub fn new(plan: Option<Arc<dyn PhysicalPlan>>, cost: usize) -> Self {
Self { plan, cost }
}

pub fn new_with_plan(plan: Arc<dyn PhysicalPlan>) -> Self {
Self {
plan: Some(plan),
cost: 0,
}
}

pub fn new_with_cost(cost: usize) -> Self {
Self { plan: None, cost }
}
}

pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
Expand Down
16 changes: 9 additions & 7 deletions src/common/test-util/src/recordbatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use client::Database;
use common_query::Output;
use common_query::OutputData;
use common_recordbatch::util;

pub enum ExpectedOutput<'a> {
Expand All @@ -23,22 +23,24 @@ pub enum ExpectedOutput<'a> {

pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {
let output = db.sql(sql).await.unwrap();
let output = output.data;

match (&output, expected) {
(Output::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
(OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
assert_eq!(*x, y, "actual: \n{}", x)
}
(Output::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (Output::Stream(_, _), ExpectedOutput::QueryResult(x)) => {
(OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => {
check_output_stream(output, x).await
}
_ => panic!(),
}
}

pub async fn check_output_stream(output: Output, expected: &str) {
pub async fn check_output_stream(output: OutputData, expected: &str) {
let recordbatches = match output {
Output::Stream(stream, _) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
OutputData::Stream(stream) => util::collect_batches(stream).await.unwrap(),
OutputData::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
Expand Down
8 changes: 4 additions & 4 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_query::{DfPhysicalPlan, OutputData};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
Expand Down Expand Up @@ -651,11 +651,11 @@ impl RegionServerInner {
.await
.context(ExecuteLogicalPlanSnafu)?;

match result {
Output::AffectedRows(_) | Output::RecordBatches(_) => {
match result.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
UnsupportedOutputSnafu { expected: "stream" }.fail()
}
Output::Stream(stream, _) => Ok(stream),
OutputData::Stream(stream) => Ok(stream),
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::BoxedError;
Expand Down Expand Up @@ -401,13 +402,13 @@ impl SqlQueryHandler for Instance {

/// Attaches a timer to the output and observes it once the output is exhausted.
pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
match output {
Output::AffectedRows(_) | Output::RecordBatches(_) => output,
Output::Stream(stream, plan) => {
match output.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
OutputData::Stream(stream) => {
let stream = OnDone::new(stream, move || {
timer.observe_duration();
});
Output::Stream(Box::pin(stream), plan)
Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl GrpcQueryHandler for Instance {
.statement_executor
.create_table_inner(&mut expr, None, &ctx)
.await?;
Output::AffectedRows(0)
Output::new_with_affected_rows(0)
}
DdlExpr::Alter(expr) => self.statement_executor.alter_table_inner(expr).await?,
DdlExpr::CreateDatabase(expr) => {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ impl OpentsdbProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

Ok(match output {
common_query::Output::AffectedRows(rows) => rows,
Ok(match output.data {
common_query::OutputData::AffectedRows(rows) => rows,
_ => unreachable!(),
})
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, Wri
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::OutputData;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
Expand Down Expand Up @@ -77,7 +78,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
}

async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
let Output::Stream(stream, _) = output else {
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream)
Expand Down
3 changes: 2 additions & 1 deletion src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl Deleter {
.await?;

let affected_rows = self.do_request(deletes, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))

Ok(Output::new_with_affected_rows(affected_rows))
}

pub async fn handle_table_delete(
Expand Down
6 changes: 3 additions & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Inserter {
.await?;

let affected_rows = self.do_request(inserts, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
Ok(Output::new_with_affected_rows(affected_rows))
}

/// Handle row inserts request with metric engine.
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Inserter {
.await?;

let affected_rows = self.do_request(inserts, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
Ok(Output::new_with_affected_rows(affected_rows))
}

pub async fn handle_table_insert(
Expand Down Expand Up @@ -185,7 +185,7 @@ impl Inserter {
.await?;

let affected_rows = self.do_request(inserts, ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
Ok(Output::new_with_affected_rows(affected_rows))
}
}

Expand Down

0 comments on commit 0bb9497

Please sign in to comment.