Skip to content

Commit

Permalink
Update to arrow-7.0.0 (#1523)
Browse files Browse the repository at this point in the history
* Update tonic/prost deps

* Update to arrow 7.0.0-SNAPSHOT

* Update datafusion and tests for arrow changes

* fix doc tests

* Update avro support

* Use released arrow 7.0.0
  • Loading branch information
alamb committed Jan 12, 2022
1 parent e1e7b86 commit 14176ff
Show file tree
Hide file tree
Showing 21 changed files with 182 additions and 101 deletions.
4 changes: 2 additions & 2 deletions ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ rust-version = "1.57"
[dependencies]
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
prost = "0.8"
tonic = "0.5"
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
futures = "0.3"
num_cpus = "1.13.0"
11 changes: 4 additions & 7 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,20 @@ async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
log = "0.4"
prost = "0.8"
prost = "0.9"
serde = {version = "1", features = ["derive"]}
sqlparser = "0.13"
tokio = "1.0"
tonic = "0.5"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
chrono = { version = "0.4", default-features = false }

# workaround for https://github.com/apache/arrow-datafusion/issues/1498
# should be able to remove when we update arrow-flight
quote = "=1.0.10"
arrow-flight = { version = "6.4.0" }
arrow-flight = { version = "7.0.0" }

datafusion = { path = "../../../datafusion", version = "6.0.0" }

[dev-dependencies]
tempfile = "3"

[build-dependencies]
tonic-build = { version = "0.5" }
tonic-build = { version = "0.6" }
8 changes: 8 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ enum TimeUnit{
enum IntervalUnit{
YearMonth = 0;
DayTime = 1;
MonthDayNano = 2;
}

message Decimal{
Expand All @@ -1040,11 +1041,18 @@ message Struct{
repeated Field sub_field_types = 1;
}

enum UnionMode{
sparse = 0;
dense = 1;
}

message Union{
repeated Field union_types = 1;
UnionMode union_mode = 2;
}



message ScalarListValue{
ScalarType datatype = 1;
repeated ScalarValue values = 2;
Expand Down
14 changes: 9 additions & 5 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Client API for sending requests to executors.

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -135,24 +135,28 @@ impl BallistaClient {
}

struct FlightDataStream {
stream: Streaming<FlightData>,
stream: Mutex<Streaming<FlightData>>,
schema: SchemaRef,
}

impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
Self { stream, schema }
Self {
stream: Mutex::new(stream),
schema,
}
}
}

impl Stream for FlightDataStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx).map(|x| match x {
let mut stream = self.stream.lock().expect("mutex is bad");
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
Expand Down
89 changes: 51 additions & 38 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod roundtrip_tests {
use super::super::{super::error::Result, protobuf};
use crate::error::BallistaError;
use core::panic;
use datafusion::arrow::datatypes::UnionMode;
use datafusion::logical_plan::Repartition;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
Expand Down Expand Up @@ -413,25 +414,31 @@ mod roundtrip_tests {
true,
),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
]),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
UnionMode::Dense,
),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
],
UnionMode::Sparse,
),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
Expand Down Expand Up @@ -558,25 +565,31 @@ mod roundtrip_tests {
true,
),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
]),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
UnionMode::Sparse,
),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
],
UnionMode::Dense,
),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
Expand Down
25 changes: 17 additions & 8 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use super::super::proto_error;
use crate::serde::{byte_to_string, protobuf, BallistaError};
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
};
use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
Expand Down Expand Up @@ -60,6 +60,7 @@ impl protobuf::IntervalUnit {
match interval_unit {
IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
}
}

Expand All @@ -71,6 +72,7 @@ impl protobuf::IntervalUnit {
Some(interval_unit) => Ok(match interval_unit {
protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
}),
None => Err(proto_error(
"Error converting i32 to DateUnit: Passed invalid variant",
Expand Down Expand Up @@ -238,12 +240,19 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
DataType::Union(union_types) => ArrowTypeEnum::Union(protobuf::Union {
union_types: union_types
.iter()
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
DataType::Union(union_types, union_mode) => {
let union_mode = match union_mode {
UnionMode::Sparse => protobuf::UnionMode::Sparse,
UnionMode::Dense => protobuf::UnionMode::Dense,
};
ArrowTypeEnum::Union(protobuf::Union {
union_types: union_types
.iter()
.map(|field| field.into())
.collect::<Vec<_>>(),
union_mode: union_mode.into(),
})
}
DataType::Dictionary(key_type, value_type) => {
ArrowTypeEnum::Dictionary(Box::new(protobuf::Dictionary {
key: Some(Box::new(key_type.as_ref().into())),
Expand Down Expand Up @@ -387,7 +396,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::Decimal(_, _) => {
Expand Down
20 changes: 16 additions & 4 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use std::{convert::TryInto, io::Cursor};

use datafusion::arrow::datatypes::UnionMode;
use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
Expand Down Expand Up @@ -246,13 +247,24 @@ impl TryInto<datafusion::arrow::datatypes::DataType>
.map(|field| field.try_into())
.collect::<Result<Vec<_>, _>>()?,
),
arrow_type::ArrowTypeEnum::Union(union) => DataType::Union(
union
arrow_type::ArrowTypeEnum::Union(union) => {
let union_mode = protobuf::UnionMode::from_i32(union.union_mode)
.ok_or_else(|| {
proto_error(
"Protobuf deserialization error: Unknown union mode type",
)
})?;
let union_mode = match union_mode {
protobuf::UnionMode::Dense => UnionMode::Dense,
protobuf::UnionMode::Sparse => UnionMode::Sparse,
};
let union_types = union
.union_types
.iter()
.map(|field| field.try_into())
.collect::<Result<Vec<_>, _>>()?,
),
.collect::<Result<Vec<_>, _>>()?;
DataType::Union(union_types, union_mode)
}
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
let pb_key_datatype = dict
.as_ref()
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ edition = "2018"
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = { version = "6.4.0" }
arrow-flight = { version = "6.4.0" }
arrow = { version = "7.0.0" }
arrow-flight = { version = "7.0.0" }
anyhow = "1"
async-trait = "0.1.36"
ballista-core = { path = "../core", version = "0.6.0" }
Expand All @@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.5"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ http-body = "0.4"
hyper = "0.14.4"
log = "0.4"
parse_arg = "0.1.3"
prost = "0.8"
prost = "0.9"
rand = "0.8"
serde = {version = "1", features = ["derive"]}
sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.5"
tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"

Expand All @@ -60,7 +60,7 @@ uuid = { version = "0.8", features = ["v4"] }

[build-dependencies]
configure_me_codegen = "0.4.1"
tonic-build = { version = "0.5" }
tonic-build = { version = "0.6" }

[package.metadata.configure_me.bin]
scheduler = "scheduler_config_spec.toml"
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ clap = "2.33"
rustyline = "9.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion", version = "6.0.0" }
arrow = { version = "6.4.0" }
arrow = { version = "7.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
6 changes: 3 additions & 3 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "6.4.0" }
arrow-flight = { version = "7.0.0" }
datafusion = { path = "../datafusion" }
prost = "0.8"
tonic = "0.5"
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
futures = "0.3"
num_cpus = "1.13.0"
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ avro = ["avro-rs", "num-traits"]
[dependencies]
ahash = { version = "0.7", default-features = false }
hashbrown = { version = "0.11", features = ["raw"] }
arrow = { version = "6.4.0", features = ["prettyprint"] }
parquet = { version = "6.4.0", features = ["arrow"] }
arrow = { version = "7.0.0", features = ["prettyprint"] }
parquet = { version = "7.0.0", features = ["arrow"] }
sqlparser = "0.13"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
Loading

0 comments on commit 14176ff

Please sign in to comment.