Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ repository = "https://github.com/datafuselabs/databend-client"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "35.0.0"
arrow-cast = { version = "35.0.0", features = ["prettyprint"] }
arrow-flight = { version = "35.0.0", features = ["flight-sql-experimental"] }
arrow = { git = "https://github.com/apache/arrow-rs", rev = "ecd44fd" }
arrow-cast = { git = "https://github.com/apache/arrow-rs", rev = "ecd44fd", features = ["prettyprint"] }
arrow-flight = { git = "https://github.com/apache/arrow-rs", rev = "ecd44fd", features = ["flight-sql-experimental"] }

atty = "0.2.14"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
rustyline = "11.0.0"
Expand All @@ -26,19 +27,22 @@ tokio = { version = "1.26", features = [
"parking_lot",
] }

async-trait = "0.1.68"
clap = { version = "4.1.0", features = ["derive"] }
comfy-table = "6.1.4"
indicatif = "0.17.3"
logos = "0.12.1"
serde = "1.0.159"
serde_json = "1.0.95"
strum = "0.24"
strum_macros = "0.24"
tonic = { version = "0.8", default-features = false, features = [
"transport",
"codegen",
"tls",
"prost",
] }

logos = "0.12.1"
strum = "0.24"
strum_macros = "0.24"

[[bin]]
name = "bendsql"
path = "src/main.rs"
Expand Down
12 changes: 6 additions & 6 deletions cli/README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
# databend-cli  
# bendsql  

## Install

```sh
cargo install databend-cli
cargo install bendsql
```

## Usage

```
> databend-cli --help
Usage: databend-cli <--user <USER>|--password <PASSWORD>|--host <HOST>|--port <PORT>>
> bendsql --help
Usage: bendsql <--user <USER>|--password <PASSWORD>|--host <HOST>|--port <PORT>>
```

## Examples

### REPL
```sql
databend-cli -h arch -u sundy -p abc --port 8900
bendsql -h arch -u sundy -p abc --port 8900
Welcome to Arrow CLI.
Connecting to http://arch:8900/ as user sundy.

Expand Down Expand Up @@ -52,7 +52,7 @@ Bye
### StdIn Pipe

```bash
❯ echo "select number from numbers(3)" | databend-cli -h arch -u sundy -p abc --port 8900
❯ echo "select number from numbers(3)" | bendsql -h arch -u sundy -p abc --port 8900
0
1
2
Expand Down
246 changes: 245 additions & 1 deletion cli/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,222 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
use std::{collections::HashMap, fmt::Write, sync::Arc};

use arrow::{
array::{Array, ArrayDataBuilder, LargeBinaryArray, LargeStringArray},
csv::WriterBuilder,
datatypes::{DataType, Field, Schema},
error::ArrowError,
record_batch::RecordBatch,
};
use arrow_flight::{utils::flight_data_to_arrow_batch, FlightData};
use comfy_table::{Cell, CellAlignment, Table};

use arrow_cast::display::{ArrayFormatter, FormatOptions};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tonic::Streaming;

use indicatif::{HumanBytes, ProgressBar, ProgressState, ProgressStyle};

#[async_trait::async_trait]
pub trait ChunkDisplay {
async fn display(&mut self) -> Result<(), ArrowError>;
fn total_rows(&self) -> usize;
}

pub struct ReplDisplay {
schema: Schema,
stream: Streaming<FlightData>,

rows: usize,
progress: Option<ProgressBar>,
start: Instant,
}

impl ReplDisplay {
pub fn new(schema: Schema, start: Instant, stream: Streaming<FlightData>) -> Self {
Self {
schema,
stream,
rows: 0,
progress: None,
start,
}
}
}

#[async_trait::async_trait]
impl ChunkDisplay for ReplDisplay {
async fn display(&mut self) -> Result<(), ArrowError> {
let mut batches = Vec::new();
let mut progress = ProgressValue::default();

while let Some(datum) = self.stream.next().await {
match datum {
Ok(datum) => {
if datum.app_metadata[..] == [0x01] {
progress = serde_json::from_slice(&datum.data_body)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))?;
if self.progress.as_mut().is_none() {
let pb = ProgressBar::new(progress.total_bytes as u64);
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} {wide_bar:.cyan/blue} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("█▓▒░ "));
self.progress = Some(pb);
}

let pb = self.progress.as_mut().unwrap();
pb.set_position(progress.read_bytes as u64);
pb.set_message(format!(
"{}/{} ({} rows/s), {}/{} ({} /s)",
humanize_count(progress.read_rows as f64),
humanize_count(progress.total_rows as f64),
humanize_count(progress.read_rows as f64 / pb.elapsed().as_secs_f64()),
HumanBytes(progress.read_bytes as u64),
HumanBytes(progress.total_bytes as u64),
HumanBytes(
(progress.read_bytes as f64 / pb.elapsed().as_secs_f64()) as u64
)
));
} else {
let dicitionaries_by_id = HashMap::new();
let batch = flight_data_to_arrow_batch(
&datum,
Arc::new(self.schema.clone()),
&dicitionaries_by_id,
)?;

self.rows += batch.num_rows();

let batch = normalize_record_batch(&batch)?;
batches.push(batch);
}
}
Err(err) => {
eprintln!("error: {:?}", err.message());
}
}
}

if let Some(pb) = self.progress.take() {
pb.finish_and_clear();
}
print_batches(&batches)?;

println!();

println!(
"{} rows result set in {:.3} sec. Processed {} rows, {} ({} rows/s, {}/s)",
self.rows,
self.start.elapsed().as_secs_f64(),
humanize_count(progress.total_rows as f64),
HumanBytes(progress.total_rows as u64),
humanize_count(progress.total_rows as f64 / self.start.elapsed().as_secs_f64()),
HumanBytes((progress.total_bytes as f64 / self.start.elapsed().as_secs_f64()) as u64),
);
println!();

Ok(())
}

fn total_rows(&self) -> usize {
self.rows
}
}

#[derive(Serialize, Deserialize, Debug, Default)]
struct ProgressValue {
pub total_rows: usize,
pub total_bytes: usize,

pub read_rows: usize,
pub read_bytes: usize,
}

pub struct FormatDisplay {
schema: Schema,
stream: Streaming<FlightData>,
}

impl FormatDisplay {
pub fn new(schema: Schema, stream: Streaming<FlightData>) -> Self {
Self { schema, stream }
}
}

#[async_trait::async_trait]
impl ChunkDisplay for FormatDisplay {
async fn display(&mut self) -> Result<(), ArrowError> {
let mut bytes = vec![];
let builder = WriterBuilder::new()
.has_headers(false)
.with_delimiter(b'\t');
let mut writer = builder.build(&mut bytes);

while let Some(datum) = self.stream.next().await {
match datum {
Ok(datum) => {
if datum.app_metadata[..] != [0x01] {
let dicitionaries_by_id = HashMap::new();
let batch = flight_data_to_arrow_batch(
&datum,
Arc::new(self.schema.clone()),
&dicitionaries_by_id,
)?;
let batch = normalize_record_batch(&batch)?;
writer.write(&batch)?;
}
}
Err(err) => {
eprintln!("error: {:?}", err.message());
}
}
}

let formatted = std::str::from_utf8(writer.into_inner())
.map_err(|e| ArrowError::CsvError(e.to_string()))?;
println!("{}", formatted);
Ok(())
}

fn total_rows(&self) -> usize {
0
}
}

fn normalize_record_batch(batch: &RecordBatch) -> Result<RecordBatch, ArrowError> {
let num_columns = batch.num_columns();
let mut columns = Vec::with_capacity(num_columns);
let mut fields = Vec::with_capacity(num_columns);

for i in 0..num_columns {
let field = batch.schema().field(i).clone();
let array = batch.column(i);
if let Some(binary_array) = array.as_any().downcast_ref::<LargeBinaryArray>() {
let data = binary_array.data().clone();
let builder = ArrayDataBuilder::from(data).data_type(DataType::LargeUtf8);
let data = builder.build()?;

let utf8_array = LargeStringArray::from(data);

columns.push(Arc::new(utf8_array) as Arc<dyn Array>);
fields.push(
Field::new(field.name(), DataType::LargeUtf8, field.is_nullable())
.with_metadata(field.metadata().clone()),
);
} else {
columns.push(array.clone());
fields.push(field);
}
}

let schema = Schema::new(fields);
RecordBatch::try_new(Arc::new(schema), columns)
}

/// Prints a visual representation of record batches to stdout
pub fn print_batches(results: &[RecordBatch]) -> Result<(), ArrowError> {
Expand Down Expand Up @@ -97,3 +309,35 @@ pub(crate) fn format_error(error: ArrowError) -> String {
other => format!("{}", other),
}
}

pub fn humanize_count(num: f64) -> String {
if num == 0.0 {
return String::from("0");
}

let negative = if num.is_sign_positive() { "" } else { "-" };
let num = num.abs();
let units = [
"",
" thousand",
" million",
" billion",
" trillion",
" quadrillion",
];

if num < 1_f64 {
return format!("{}{:.2}", negative, num);
}
let delimiter = 1000_f64;
let exponent = std::cmp::min(
(num.ln() / delimiter.ln()).floor() as i32,
(units.len() - 1) as i32,
);
let pretty_bytes = format!("{:.2}", num / delimiter.powi(exponent))
.parse::<f64>()
.unwrap()
* 1_f64;
let unit = units[exponent as usize];
format!("{}{}{}", negative, pretty_bytes, unit)
}
Loading