Skip to content

Commit

Permalink
Update to arrow 31 (#4927)
Browse files Browse the repository at this point in the history
* Update to arrow 31

* Test fixes

* Use upstream concat_batches
  • Loading branch information
tustvold committed Jan 17, 2023
1 parent 097523f commit 4623166
Show file tree
Hide file tree
Showing 18 changed files with 82 additions and 98 deletions.
4 changes: 2 additions & 2 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ simd = ["datafusion/simd"]
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = "30.0.1"
arrow = "31.0.0"
datafusion = { path = "../datafusion/core", version = "16.0.0", features = ["scheduler"] }
env_logger = "0.10"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
object_store = "0.5.0"
parquet = "30.0.1"
parquet = "31.0.0"
parquet-test-utils = { path = "../parquet-test-utils/", version = "0.1.0" }
rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] }
Expand Down
62 changes: 31 additions & 31 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.62"
readme = "README.md"

[dependencies]
arrow = "30.0.1"
arrow = "31.0.0"
async-trait = "0.1.41"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "16.0.0" }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow = "30.0.1"
arrow-flight = "30.0.1"
arrow = "31.0.0"
arrow-flight = "31.0.0"
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
datafusion-common = { path = "../datafusion/common" }
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ pyarrow = ["pyo3", "arrow/pyarrow"]

[dependencies]
apache-avro = { version = "0.14", default-features = false, features = ["snappy"], optional = true }
arrow = { version = "30.0.1", default-features = false }
arrow = { version = "31.0.0", default-features = false }
chrono = { version = "0.4", default-features = false }
cranelift-module = { version = "0.89.0", optional = true }
num_cpus = "1.13.0"
object_store = { version = "0.5.0", default-features = false, optional = true }
parquet = { version = "30.0.1", default-features = false, optional = true }
parquet = { version = "31.0.0", default-features = false, optional = true }
pyo3 = { version = "0.17.1", optional = true }
sqlparser = "0.30"
6 changes: 3 additions & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "30.0.1", features = ["prettyprint"] }
arrow = { version = "31.0.0", features = ["prettyprint"] }
async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", "futures-io", "tokio"], optional = true }
async-trait = "0.1.41"
bytes = "1.1"
Expand All @@ -83,7 +83,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.5.3"
parking_lot = "0.12"
parquet = { version = "30.0.1", features = ["arrow", "async"] }
parquet = { version = "31.0.0", features = ["arrow", "async"] }
paste = "^1.0"
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
Expand All @@ -102,7 +102,7 @@ xz2 = { version = "0.1", optional = true }


[dev-dependencies]
arrow = { version = "30.0.1", features = ["prettyprint", "dyn_cmp_dict"] }
arrow = { version = "31.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
async-trait = "0.1.53"
criterion = "0.4"
csv = "1.1.6"
Expand Down
22 changes: 3 additions & 19 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ use crate::physical_plan::{
};

use crate::execution::context::TaskContext;
use arrow::compute::kernels::concat::concat;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use arrow::record_batch::RecordBatch;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -281,28 +280,13 @@ pub fn concat_batches(
batches: &[RecordBatch],
row_count: usize,
) -> ArrowResult<RecordBatch> {
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
let mut arrays = Vec::with_capacity(schema.fields().len());
for i in 0..schema.fields().len() {
let array = concat(
&batches
.iter()
.map(|batch| batch.column(i).as_ref())
.collect::<Vec<_>>(),
)?;
arrays.push(array);
}
trace!(
"Combined {} batches containing {} rows",
batches.len(),
row_count
);

let options = RecordBatchOptions::new().with_row_count(Some(row_count));

RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
let b = arrow::compute::concat_batches(schema, batches)?;
Ok(b)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ mod tests {
let err = it.next().await.unwrap().unwrap_err().to_string();
assert_eq!(
err,
"Csv error: incorrect number of fields, expected 14 got 13"
"Csv error: incorrect number of fields for line 1, expected 14 got 13"
);
Ok(())
}
Expand Down
50 changes: 25 additions & 25 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,11 @@ async fn timestamp_minmax() -> Result<()> {
let sql = "SELECT MIN(table_a.ts), MAX(table_b.ts) FROM table_a, table_b";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------------------+----------------------------------+",
"| MIN(table_a.ts) | MAX(table_b.ts) |",
"+-------------------------+----------------------------------+",
"| 2020-09-08T11:42:29.190 | 2020-09-08T13:42:29.190855+00:00 |",
"+-------------------------+----------------------------------+",
"+-------------------------+-----------------------------+",
"| MIN(table_a.ts) | MAX(table_b.ts) |",
"+-------------------------+-----------------------------+",
"| 2020-09-08T11:42:29.190 | 2020-09-08T13:42:29.190855Z |",
"+-------------------------+-----------------------------+",
];
assert_batches_eq!(expected, &actual);

Expand All @@ -560,19 +560,19 @@ async fn timestamp_coercion() -> Result<()> {
let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------------+-------------------------------+-------------------------+",
"| ts | ts | table_a.ts = table_b.ts |",
"+---------------------------+-------------------------------+-------------------------+",
"| 2020-09-08T13:42:29+00:00 | 2020-09-08T13:42:29.190+00:00 | true |",
"| 2020-09-08T13:42:29+00:00 | 2020-09-08T12:42:29.190+00:00 | false |",
"| 2020-09-08T13:42:29+00:00 | 2020-09-08T11:42:29.190+00:00 | false |",
"| 2020-09-08T12:42:29+00:00 | 2020-09-08T13:42:29.190+00:00 | false |",
"| 2020-09-08T12:42:29+00:00 | 2020-09-08T12:42:29.190+00:00 | true |",
"| 2020-09-08T12:42:29+00:00 | 2020-09-08T11:42:29.190+00:00 | false |",
"| 2020-09-08T11:42:29+00:00 | 2020-09-08T13:42:29.190+00:00 | false |",
"| 2020-09-08T11:42:29+00:00 | 2020-09-08T12:42:29.190+00:00 | false |",
"| 2020-09-08T11:42:29+00:00 | 2020-09-08T11:42:29.190+00:00 | true |",
"+---------------------------+-------------------------------+-------------------------+",
"+----------------------+--------------------------+-------------------------+",
"| ts | ts | table_a.ts = table_b.ts |",
"+----------------------+--------------------------+-------------------------+",
"| 2020-09-08T13:42:29Z | 2020-09-08T13:42:29.190Z | true |",
"| 2020-09-08T13:42:29Z | 2020-09-08T12:42:29.190Z | false |",
"| 2020-09-08T13:42:29Z | 2020-09-08T11:42:29.190Z | false |",
"| 2020-09-08T12:42:29Z | 2020-09-08T13:42:29.190Z | false |",
"| 2020-09-08T12:42:29Z | 2020-09-08T12:42:29.190Z | true |",
"| 2020-09-08T12:42:29Z | 2020-09-08T11:42:29.190Z | false |",
"| 2020-09-08T11:42:29Z | 2020-09-08T13:42:29.190Z | false |",
"| 2020-09-08T11:42:29Z | 2020-09-08T12:42:29.190Z | false |",
"| 2020-09-08T11:42:29Z | 2020-09-08T11:42:29.190Z | true |",
"+----------------------+--------------------------+-------------------------+",
];
assert_batches_eq!(expected, &actual);
}
Expand Down Expand Up @@ -1539,13 +1539,13 @@ async fn cast_timestamp_to_timestamptz() -> Result<()> {
let actual = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+----------------------------------+---------------------------------------+",
"| table_a.ts | arrowtypeof(table_a.ts) |",
"+----------------------------------+---------------------------------------+",
"| 2020-09-08T13:42:29.190855+00:00 | Timestamp(Nanosecond, Some(\"+00:00\")) |",
"| 2020-09-08T12:42:29.190855+00:00 | Timestamp(Nanosecond, Some(\"+00:00\")) |",
"| 2020-09-08T11:42:29.190855+00:00 | Timestamp(Nanosecond, Some(\"+00:00\")) |",
"+----------------------------------+---------------------------------------+",
"+-----------------------------+---------------------------------------+",
"| table_a.ts | arrowtypeof(table_a.ts) |",
"+-----------------------------+---------------------------------------+",
"| 2020-09-08T13:42:29.190855Z | Timestamp(Nanosecond, Some(\"+00:00\")) |",
"| 2020-09-08T12:42:29.190855Z | Timestamp(Nanosecond, Some(\"+00:00\")) |",
"| 2020-09-08T11:42:29.190855Z | Timestamp(Nanosecond, Some(\"+00:00\")) |",
"+-----------------------------+---------------------------------------+",
];
assert_batches_eq!(expected, &actual);

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "30.0.1", default-features = false }
arrow = { version = "31.0.0", default-features = false }
datafusion-common = { path = "../common", version = "16.0.0" }
log = "^0.4"
sqlparser = "0.30"

0 comments on commit 4623166

Please sign in to comment.