Skip to content

Commit

Permalink
Upgrade datafusion version to 27.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
yukkit committed Jul 6, 2023
1 parent f3c68dd commit c4814f8
Show file tree
Hide file tree
Showing 73 changed files with 1,554 additions and 941 deletions.
332 changes: 224 additions & 108 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Expand Up @@ -55,11 +55,11 @@ crossbeam-channel = "0.5"
ctrlc = "3"
dashmap = "5.2"
derive_builder = "0.11"
arrow = { version = "36.0.0", features = ["prettyprint"] }
arrow-schema = { version = "36.0.0", features = ["serde"] }
arrow-flight = { version = "36.0.0" }
datafusion-proto = { git = "https://github.com/cnosdb/arrow-datafusion.git", branch = "22.0.0" }
datafusion = { git = "https://github.com/cnosdb/arrow-datafusion.git", branch = "22.0.0" }
arrow = { version = "42.0.0", features = ["prettyprint"] }
arrow-schema = { version = "42.0.0", features = ["serde"] }
arrow-flight = { version = "42.0.0" }
datafusion-proto = { git = "https://github.com/cnosdb/arrow-datafusion.git", branch = "27.0.0" }
datafusion = { git = "https://github.com/cnosdb/arrow-datafusion.git", branch = "27.0.0" }
diff = "0.1.13"
dirs = "4.0.0"
env_logger = "0.9"
Expand All @@ -77,7 +77,7 @@ nom = "7.1.1"
num-traits = "0.2.14"
num_cpus = "1.13.0"
num_enum = "0.5.7"
object_store = { version = "0.5.2", features = ["aws", "gcp", "azure"] }
object_store = { version = "0.6.1", features = ["aws", "gcp", "azure"] }
once_cell = "1.12.0"
openraft = { git = "https://github.com/datafuselabs/openraft", rev = "914fcb4dad32a2f187b808298048e9e8b912977f", features = ["serde"] }
openssl = { version = "0.10.48", features = ["vendored"] }
Expand Down Expand Up @@ -117,8 +117,8 @@ tokio = { version = "1.21" }
tokio-stream = "0.1"
tokio-util = { version = "0.7.0" }
toml = "0.5.9"
tonic = "0.8"
tonic-build = "0.8"
tonic = "0.9"
tonic-build = "0.9"
tower = "0.4.13"
tracing = "0.1.35"
tracing-appender = "0.2.2"
Expand Down
27 changes: 15 additions & 12 deletions client/src/print_format.rs
Expand Up @@ -33,7 +33,9 @@ macro_rules! batches_to_json {
let mut bytes = vec![];
{
let mut writer = $WRITER::new(&mut bytes);
writer.write_batches($batches)?;
for batch in $batches {
writer.write(batch)?;
}
writer.finish()?;
}
String::from_utf8(bytes).map_err(|e| DataFusionError::Execution(e.to_string()))?
Expand Down Expand Up @@ -63,7 +65,9 @@ impl PrintFormat {
Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?),
Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?),
Self::Table => pretty::print_batches(batches)?,
Self::Json => println!("{}", batches_to_json!(ArrayWriter, batches)),
Self::Json => {
println!("{}", batches_to_json!(ArrayWriter, batches))
}
Self::NdJson => {
println!("{}", batches_to_json!(LineDelimitedWriter, batches))
}
Expand All @@ -88,7 +92,6 @@ mod tests {

use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::from_slice::FromSlice;

use super::*;

Expand All @@ -106,15 +109,15 @@ mod tests {
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from_slice([1, 2, 3])),
Arc::new(Int32Array::from_slice([4, 5, 6])),
Arc::new(Int32Array::from_slice([7, 8, 9])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
],
)
.unwrap();

let batches = vec![batch];
let r = print_batches_with_sep(&batches, b',').unwrap();
let batches = &[batch];
let r = print_batches_with_sep(batches, b',').unwrap();
assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r);
}

Expand All @@ -136,14 +139,14 @@ mod tests {
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from_slice([1, 2, 3])),
Arc::new(Int32Array::from_slice([4, 5, 6])),
Arc::new(Int32Array::from_slice([7, 8, 9])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
],
)
.unwrap();

let batches = vec![batch];
let batches = vec![&batch];
let r = batches_to_json!(ArrayWriter, &batches);
assert_eq!(
"[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]",
Expand Down
4 changes: 2 additions & 2 deletions common/models/src/arrow_array.rs
@@ -1,4 +1,4 @@
use arrow_schema::{Field, Schema};
use arrow_schema::Schema;
use datafusion::arrow::array::{
ArrayBuilder, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
Expand Down Expand Up @@ -51,7 +51,7 @@ pub fn build_arrow_array_builders(
schema
.fields()
.iter()
.map(|f: &Field| build_arrow_array_builder(f.data_type(), batch_size))
.map(|f| build_arrow_array_builder(f.data_type(), batch_size))
.collect::<Result<Vec<_>, ArrowError>>()
}

Expand Down
7 changes: 6 additions & 1 deletion common/models/src/schema.rs
Expand Up @@ -26,7 +26,7 @@ use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result as DataFusionResult;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::prelude::Column;
use datafusion::scalar::ScalarValue;
use derive_builder::Builder;
Expand Down Expand Up @@ -127,6 +127,11 @@ impl ExternalTableSchema {
FileType::JSON => {
Arc::new(JsonFormat::default().with_file_compression_type(file_compression_type))
}
FileType::ARROW => {
return Err(DataFusionError::NotImplemented(
"Arrow external table.".to_string(),
))
}
};

let options = ListingOptions::new(file_format)
Expand Down

0 comments on commit c4814f8

Please sign in to comment.