From f0b29028551235c3efa3a906dbd4d6335d6977f1 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 3 Apr 2023 15:02:50 +0800 Subject: [PATCH 1/3] feat: client support flight progress --- cli/Cargo.toml | 18 ++-- cli/README.md | 12 +-- cli/src/display.rs | 253 ++++++++++++++++++++++++++++++++++++++++++++- cli/src/session.rs | 100 +++++------------- 4 files changed, 294 insertions(+), 89 deletions(-) diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 387ab264b..6024777a3 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -11,9 +11,10 @@ repository = "https://github.com/datafuselabs/databend-client" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = "35.0.0" -arrow-cast = { version = "35.0.0", features = ["prettyprint"] } -arrow-flight = { version = "35.0.0", features = ["flight-sql-experimental"] } +arrow = { git = "https://github.com/apache/arrow-rs", rev = "ecd44fd" } +arrow-cast = { git = "https://github.com/apache/arrow-rs", rev = "ecd44fd", features = ["prettyprint"] } +arrow-flight = { git = "https://github.com/apache/arrow-rs", rev = "ecd44fd", features = ["flight-sql-experimental"] } + atty = "0.2.14" futures = { version = "0.3", default-features = false, features = ["alloc"] } rustyline = "11.0.0" @@ -26,8 +27,15 @@ tokio = { version = "1.26", features = [ "parking_lot", ] } +async-trait = "0.1.68" clap = { version = "4.1.0", features = ["derive"] } comfy-table = "6.1.4" +indicatif = "0.17.3" +logos = "0.12.1" +serde = "1.0.159" +serde_json = "1.0.95" +strum = "0.24" +strum_macros = "0.24" tonic = { version = "0.8", default-features = false, features = [ "transport", "codegen", @@ -35,10 +43,6 @@ tonic = { version = "0.8", default-features = false, features = [ "prost", ] } -logos = "0.12.1" -strum = "0.24" -strum_macros = "0.24" - [[bin]] name = "bendsql" path = "src/main.rs" diff --git a/cli/README.md b/cli/README.md index 6a0f99559..e9d117372 100644 --- a/cli/README.md +++ b/cli/README.md @@ -1,23 +1,23 @@ -# databend-cli   +# bendsql   ## Install ```sh -cargo install databend-cli +cargo install bendsql ``` ## Usage ``` -> databend-cli --help -Usage: databend-cli <--user |--password |--host |--port > +> bendsql --help +Usage: bendsql <--user |--password |--host |--port > ``` ## Examples ### REPL ```sql -❯ databend-cli -h arch -u sundy -p abc --port 8900 +❯ bendsql -h arch -u sundy -p abc --port 8900 Welcome to Arrow CLI. Connecting to http://arch:8900/ as user sundy. @@ -52,7 +52,7 @@ Bye ### StdIn Pipe ```bash -❯ echo "select number from numbers(3)" | databend-cli -h arch -u sundy -p abc --port 8900 +❯ echo "select number from numbers(3)" | bendsql -h arch -u sundy -p abc --port 8900 0 1 2 diff --git a/cli/src/display.rs b/cli/src/display.rs index d8f603c5a..7ac25d259 100644 --- a/cli/src/display.rs +++ b/cli/src/display.rs @@ -12,10 +12,229 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; +use std::{collections::HashMap, fmt::Write, sync::Arc}; + +use arrow::{ + array::{Array, ArrayDataBuilder, LargeBinaryArray, LargeStringArray}, + csv::WriterBuilder, + datatypes::{DataType, Field, Schema}, + error::ArrowError, + record_batch::RecordBatch, +}; +use arrow_flight::{utils::flight_data_to_arrow_batch, FlightData}; use comfy_table::{Cell, CellAlignment, Table}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use tokio::time::Instant; +use tonic::Streaming; + +use indicatif::{ProgressBar, ProgressState, ProgressStyle}; + +#[async_trait::async_trait] +pub trait ChunkDisplay { + async fn display(&mut self) -> Result<(), ArrowError>; + fn total_rows(&self) -> usize; +} + +pub struct ReplDisplay { + schema: Schema, + stream: Streaming, + + rows: usize, + progress: Option, + start: Instant, +} + +impl ReplDisplay { + pub fn new(schema: Schema, start: Instant, stream: Streaming) -> Self { + Self { + schema, + stream, + rows: 0, + progress: None, + start, + } + } +} + +#[async_trait::async_trait] +impl ChunkDisplay for ReplDisplay { + async fn display(&mut self) -> Result<(), ArrowError> { + let mut batches = Vec::new(); + let mut progress: ProgressValue; + + while let Some(datum) = self.stream.next().await { + match datum { + Ok(datum) => { + if datum.app_metadata[..] == [0x01] { + progress = serde_json::from_slice(&datum.data_body) + .map_err(|err| ArrowError::ExternalError(Box::new(err)))?; + + match self.progress.as_mut() { + Some(pb) => { + pb.set_position(progress.read_bytes as u64); + pb.set_message(format!( + "{}/{} ({} rows/s)", + humanize_count(progress.read_rows as f64), + humanize_count(progress.total_rows as f64), + humanize_count( + progress.read_rows as f64 / pb.elapsed().as_secs_f64() + ) + )); + } + None => { + let pb = ProgressBar::new(progress.total_bytes as u64); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {bytes}/{total_bytes}({bytes_per_sec}) {msg} {wide_bar:.green/blue} ({eta})") + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("██-")); + + pb.set_position(progress.read_bytes as u64); + pb.set_message(format!( + "{}/{} ({} rows/s)", + humanize_count(progress.read_rows as f64), + humanize_count(progress.total_rows as f64), + humanize_count( + progress.read_rows as f64 / pb.elapsed().as_secs_f64() + ) + )); + + self.progress = Some(pb); + } + } + } else { + let dicitionaries_by_id = HashMap::new(); + let batch = flight_data_to_arrow_batch( + &datum, + Arc::new(self.schema.clone()), + &dicitionaries_by_id, + )?; + + self.rows += batch.num_rows(); + + let batch = normalize_record_batch(&batch)?; + batches.push(batch); + } + } + Err(err) => { + eprintln!("error: {:?}", err.message()); + } + } + } + + if let Some(pb) = self.progress.take() { + pb.finish_and_clear(); + } + print_batches(&batches)?; + + println!(); + + println!( + "{} rows in set ({:.3} sec)", + self.rows, + self.start.elapsed().as_secs_f64() + ); + println!(); + + Ok(()) + } + + fn total_rows(&self) -> usize { + self.rows + } +} + +#[derive(Serialize, Deserialize, Debug, Default)] +struct ProgressValue { + pub total_rows: usize, + pub total_bytes: usize, + + pub read_rows: usize, + pub read_bytes: usize, +} + +pub struct FormatDisplay { + schema: Schema, + stream: Streaming, +} + +impl FormatDisplay { + pub fn new(schema: Schema, stream: Streaming) -> Self { + Self { schema, stream } + } +} + +#[async_trait::async_trait] +impl ChunkDisplay for FormatDisplay { + async fn display(&mut self) -> Result<(), ArrowError> { + let mut bytes = vec![]; + let builder = WriterBuilder::new() + .has_headers(false) + .with_delimiter(b'\t'); + let mut writer = builder.build(&mut bytes); + + while let Some(datum) = self.stream.next().await { + match datum { + Ok(datum) => { + if datum.app_metadata[..] != [0x01] { + let dicitionaries_by_id = HashMap::new(); + let batch = flight_data_to_arrow_batch( + &datum, + Arc::new(self.schema.clone()), + &dicitionaries_by_id, + )?; + let batch = normalize_record_batch(&batch)?; + writer.write(&batch)?; + } + } + Err(err) => { + eprintln!("error: {:?}", err.message()); + } + } + } + + let formatted = std::str::from_utf8(writer.into_inner()) + .map_err(|e| ArrowError::CsvError(e.to_string()))?; + println!("{}", formatted); + Ok(()) + } + + fn total_rows(&self) -> usize { + 0 + } +} + +fn normalize_record_batch(batch: &RecordBatch) -> Result { + let num_columns = batch.num_columns(); + let mut columns = Vec::with_capacity(num_columns); + let mut fields = Vec::with_capacity(num_columns); + + for i in 0..num_columns { + let field = batch.schema().field(i).clone(); + let array = batch.column(i); + if let Some(binary_array) = array.as_any().downcast_ref::() { + let data = binary_array.data().clone(); + let builder = ArrayDataBuilder::from(data).data_type(DataType::LargeUtf8); + let data = builder.build()?; + + let utf8_array = LargeStringArray::from(data); + + columns.push(Arc::new(utf8_array) as Arc); + fields.push( + Field::new(field.name(), DataType::LargeUtf8, field.is_nullable()) + .with_metadata(field.metadata().clone()), + ); + } else { + columns.push(array.clone()); + fields.push(field); + } + } + + let schema = Schema::new(fields); + RecordBatch::try_new(Arc::new(schema), columns) +} /// Prints a visual representation of record batches to stdout pub fn print_batches(results: &[RecordBatch]) -> Result<(), ArrowError> { @@ -97,3 +316,35 @@ pub(crate) fn format_error(error: ArrowError) -> String { other => format!("{}", other), } } + +pub fn humanize_count(num: f64) -> String { + if num == 0.0 { + return String::from("0"); + } + + let negative = if num.is_sign_positive() { "" } else { "-" }; + let num = num.abs(); + let units = [ + "", + " thousand", + " million", + " billion", + " trillion", + " quadrillion", + ]; + + if num < 1_f64 { + return format!("{}{:.2}", negative, num); + } + let delimiter = 1000_f64; + let exponent = std::cmp::min( + (num.ln() / delimiter.ln()).floor() as i32, + (units.len() - 1) as i32, + ); + let pretty_bytes = format!("{:.2}", num / delimiter.powi(exponent)) + .parse::() + .unwrap() + * 1_f64; + let unit = units[exponent as usize]; + format!("{}{}{}", negative, pretty_bytes, unit) +} diff --git a/cli/src/session.rs b/cli/src/session.rs index 377bc2212..1a0852beb 100644 --- a/cli/src/session.rs +++ b/cli/src/session.rs @@ -12,30 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow::array::{Array, ArrayDataBuilder, LargeBinaryArray, LargeStringArray}; -use arrow::csv::WriterBuilder; -use arrow::datatypes::{DataType, Field, Schema}; use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; +use arrow::ipc::convert::fb_to_schema; +use arrow::ipc::root_as_message; + use arrow_flight::sql::client::FlightSqlServiceClient; -use arrow_flight::utils::flight_data_to_batches; -use arrow_flight::FlightData; use futures::TryStreamExt; use rustyline::config::Builder; use rustyline::error::ReadlineError; use rustyline::history::DefaultHistory; use rustyline::{CompletionType, Editor}; use std::io::BufRead; -use std::sync::Arc; + use tokio::time::Instant; -use tonic::transport::Endpoint; +use tonic::transport::{Channel, Endpoint}; -use crate::display::{format_error, print_batches}; +use crate::display::{format_error, ChunkDisplay, FormatDisplay, ReplDisplay}; use crate::helper::CliHelper; use crate::token::{TokenKind, Tokenizer}; pub struct Session { - client: FlightSqlServiceClient, + client: FlightSqlServiceClient, is_repl: bool, prompt: String, } @@ -52,12 +49,16 @@ impl Session { .await .map_err(|err| ArrowError::IoError(err.to_string()))?; + let mut client = FlightSqlServiceClient::new(channel); + if is_repl { println!("Welcome to databend-cli."); println!("Connecting to {} as user {}.", endpoint.uri(), user); println!(); } - let mut client = FlightSqlServiceClient::new(channel); + + // enable progress + client.set_header("bendsql", "1"); let _token = client.handshake(user, password).await.unwrap(); let prompt = format!("{} :) ", endpoint.uri().host().unwrap()); @@ -171,50 +172,29 @@ impl Session { .as_ref() .ok_or_else(|| ArrowError::IoError("Ticket is empty".to_string()))?; - let flight_data = self.client.do_get(ticket.clone()).await?; - let flight_data: Vec = flight_data.try_collect().await.unwrap(); - let batches = flight_data_to_batches(&flight_data)?; - let batches = batches - .iter() - .map(normalize_record_batch) - .collect::, ArrowError>>()?; + let mut flight_data = self.client.do_get(ticket.clone()).await?; + let datum = flight_data.try_next().await.unwrap().unwrap(); - if is_repl { - print_batches(batches.as_slice())?; + let message = root_as_message(&datum.data_header[..]) + .map_err(|_| ArrowError::CastError("Cannot get root as message".to_string()))?; + let ipc_schema = message + .header_as_schema() + .ok_or_else(|| ArrowError::CastError("Cannot get header as Schema".to_string()))?; - println!(); + let schema = fb_to_schema(ipc_schema); - let rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - println!( - "{} rows in set ({:.3} sec)", - rows, - start.elapsed().as_secs_f64() - ); - println!(); + if is_repl { + let mut displayer = ReplDisplay::new(schema, start, flight_data); + displayer.display().await?; } else { - let res = print_batches_with_sep(batches.as_slice(), b'\t')?; - print!("{res}"); + let mut displayer = FormatDisplay::new(schema, flight_data); + displayer.display().await?; } Ok(false) } } -fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { - let mut bytes = vec![]; - { - let builder = WriterBuilder::new() - .has_headers(false) - .with_delimiter(delimiter); - let mut writer = builder.build(&mut bytes); - for batch in batches { - writer.write(batch)?; - } - } - let formatted = String::from_utf8(bytes).map_err(|e| ArrowError::CsvError(e.to_string()))?; - Ok(formatted) -} - fn get_history_path() -> String { format!( "{}/.databend_history", @@ -222,36 +202,6 @@ fn get_history_path() -> String { ) } -fn normalize_record_batch(batch: &RecordBatch) -> Result { - let num_columns = batch.num_columns(); - let mut columns = Vec::with_capacity(num_columns); - let mut fields = Vec::with_capacity(num_columns); - - for i in 0..num_columns { - let field = batch.schema().field(i).clone(); - let array = batch.column(i); - if let Some(binary_array) = array.as_any().downcast_ref::() { - let data = binary_array.data().clone(); - let builder = ArrayDataBuilder::from(data).data_type(DataType::LargeUtf8); - let data = builder.build()?; - - let utf8_array = LargeStringArray::from(data); - - columns.push(Arc::new(utf8_array) as Arc); - fields.push( - Field::new(field.name(), DataType::LargeUtf8, field.is_nullable()) - .with_metadata(field.metadata().clone()), - ); - } else { - columns.push(array.clone()); - fields.push(field); - } - } - - let schema = Schema::new(fields); - RecordBatch::try_new(Arc::new(schema), columns) -} - #[derive(PartialEq, Eq)] pub enum QueryKind { Query, From 82b10e39b11ed76b6ed575a300f705ffdb70c2af Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 3 Apr 2023 15:32:31 +0800 Subject: [PATCH 2/3] feat: client support flight progress --- cli/src/display.rs | 59 ++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/cli/src/display.rs b/cli/src/display.rs index 7ac25d259..b6445880f 100644 --- a/cli/src/display.rs +++ b/cli/src/display.rs @@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use tonic::Streaming; -use indicatif::{ProgressBar, ProgressState, ProgressStyle}; +use indicatif::{HumanBytes, ProgressBar, ProgressState, ProgressStyle}; #[async_trait::async_trait] pub trait ChunkDisplay { @@ -63,7 +63,7 @@ impl ReplDisplay { impl ChunkDisplay for ReplDisplay { async fn display(&mut self) -> Result<(), ArrowError> { let mut batches = Vec::new(); - let mut progress: ProgressValue; + let mut progress = ProgressValue::default(); while let Some(datum) = self.stream.next().await { match datum { @@ -71,39 +71,28 @@ impl ChunkDisplay for ReplDisplay { if datum.app_metadata[..] == [0x01] { progress = serde_json::from_slice(&datum.data_body) .map_err(|err| ArrowError::ExternalError(Box::new(err)))?; - - match self.progress.as_mut() { - Some(pb) => { - pb.set_position(progress.read_bytes as u64); - pb.set_message(format!( - "{}/{} ({} rows/s)", - humanize_count(progress.read_rows as f64), - humanize_count(progress.total_rows as f64), - humanize_count( - progress.read_rows as f64 / pb.elapsed().as_secs_f64() - ) - )); - } - None => { - let pb = ProgressBar::new(progress.total_bytes as u64); - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {bytes}/{total_bytes}({bytes_per_sec}) {msg} {wide_bar:.green/blue} ({eta})") + if self.progress.as_mut().is_none() { + let pb = ProgressBar::new(progress.total_bytes as u64); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} {wide_bar:.cyan/blue} ({eta})") .unwrap() .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .progress_chars("██-")); - - pb.set_position(progress.read_bytes as u64); - pb.set_message(format!( - "{}/{} ({} rows/s)", - humanize_count(progress.read_rows as f64), - humanize_count(progress.total_rows as f64), - humanize_count( - progress.read_rows as f64 / pb.elapsed().as_secs_f64() - ) - )); - - self.progress = Some(pb); - } + self.progress = Some(pb); } + + let pb = self.progress.as_mut().unwrap(); + pb.set_position(progress.read_bytes as u64); + pb.set_message(format!( + "{}/{} ({} rows/s), {}/{} ({} /s)", + humanize_count(progress.read_rows as f64), + humanize_count(progress.total_rows as f64), + humanize_count(progress.read_rows as f64 / pb.elapsed().as_secs_f64()), + HumanBytes(progress.read_bytes as u64), + HumanBytes(progress.total_bytes as u64), + HumanBytes( + (progress.read_bytes as f64 / pb.elapsed().as_secs_f64()) as u64 + ) + )); } else { let dicitionaries_by_id = HashMap::new(); let batch = flight_data_to_arrow_batch( @@ -132,9 +121,13 @@ impl ChunkDisplay for ReplDisplay { println!(); println!( - "{} rows in set ({:.3} sec)", + "{} rows result set in {:.3} sec. Processed {} rows, {} ({} rows/s, {}/s)", self.rows, - self.start.elapsed().as_secs_f64() + self.start.elapsed().as_secs_f64(), + humanize_count(progress.total_rows as f64), + HumanBytes(progress.total_rows as u64), + humanize_count(progress.total_rows as f64 / self.start.elapsed().as_secs_f64()), + HumanBytes((progress.total_bytes as f64 / self.start.elapsed().as_secs_f64()) as u64), ); println!(); From c6dff7def8199a15dc2f39f97c5fd5fb71836b0d Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 3 Apr 2023 15:47:53 +0800 Subject: [PATCH 3/3] feat: update ciker --- cli/src/display.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/src/display.rs b/cli/src/display.rs index b6445880f..18e411a41 100644 --- a/cli/src/display.rs +++ b/cli/src/display.rs @@ -76,7 +76,7 @@ impl ChunkDisplay for ReplDisplay { pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} {wide_bar:.cyan/blue} ({eta})") .unwrap() .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) - .progress_chars("██-")); + .progress_chars("█▓▒░ ")); self.progress = Some(pb); }