Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to arrow-7.0.0 #1523

Merged
merged 8 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ rust-version = "1.57"
[dependencies]
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
prost = "0.8"
tonic = "0.5"
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
futures = "0.3"
num_cpus = "1.13.0"
11 changes: 4 additions & 7 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,20 @@ async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
log = "0.4"
prost = "0.8"
prost = "0.9"
serde = {version = "1", features = ["derive"]}
sqlparser = "0.13"
tokio = "1.0"
tonic = "0.5"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
chrono = { version = "0.4", default-features = false }

# workaround for https://github.com/apache/arrow-datafusion/issues/1498
# should be able to remove when we update arrow-flight
quote = "=1.0.10"
arrow-flight = { version = "6.4.0" }
arrow-flight = { version = "7.0.0" }

datafusion = { path = "../../../datafusion", version = "6.0.0" }

[dev-dependencies]
tempfile = "3"

[build-dependencies]
tonic-build = { version = "0.5" }
tonic-build = { version = "0.6" }
8 changes: 8 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ enum TimeUnit{
enum IntervalUnit{
YearMonth = 0;
DayTime = 1;
MonthDayNano = 2;
}

message Decimal{
Expand All @@ -1040,11 +1041,18 @@ message Struct{
repeated Field sub_field_types = 1;
}

enum UnionMode{
sparse = 0;
dense = 1;
}

message Union{
repeated Field union_types = 1;
UnionMode union_mode = 2;
}



message ScalarListValue{
ScalarType datatype = 1;
repeated ScalarValue values = 2;
Expand Down
14 changes: 9 additions & 5 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Client API for sending requests to executors.

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -135,24 +135,28 @@ impl BallistaClient {
}

struct FlightDataStream {
stream: Streaming<FlightData>,
stream: Mutex<Streaming<FlightData>>,
schema: SchemaRef,
}

impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
Self { stream, schema }
Self {
stream: Mutex::new(stream),
schema,
}
}
}

impl Stream for FlightDataStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx).map(|x| match x {
let mut stream = self.stream.lock().expect("mutex is bad");
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
Expand Down
89 changes: 51 additions & 38 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod roundtrip_tests {
use super::super::{super::error::Result, protobuf};
use crate::error::BallistaError;
use core::panic;
use datafusion::arrow::datatypes::UnionMode;
use datafusion::logical_plan::Repartition;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
Expand Down Expand Up @@ -413,25 +414,31 @@ mod roundtrip_tests {
true,
),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
]),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
UnionMode::Dense,
),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
],
UnionMode::Sparse,
),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
Expand Down Expand Up @@ -558,25 +565,31 @@ mod roundtrip_tests {
true,
),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
]),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
],
UnionMode::Sparse,
),
DataType::Union(
vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
Field::new(
"nested_struct",
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
true,
),
],
UnionMode::Dense,
),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
Expand Down
25 changes: 17 additions & 8 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use super::super::proto_error;
use crate::serde::{byte_to_string, protobuf, BallistaError};
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
};
use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
Expand Down Expand Up @@ -60,6 +60,7 @@ impl protobuf::IntervalUnit {
match interval_unit {
IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
}
}

Expand All @@ -71,6 +72,7 @@ impl protobuf::IntervalUnit {
Some(interval_unit) => Ok(match interval_unit {
protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
}),
None => Err(proto_error(
"Error converting i32 to DateUnit: Passed invalid variant",
Expand Down Expand Up @@ -238,12 +240,19 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
DataType::Union(union_types) => ArrowTypeEnum::Union(protobuf::Union {
union_types: union_types
.iter()
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
DataType::Union(union_types, union_mode) => {
let union_mode = match union_mode {
UnionMode::Sparse => protobuf::UnionMode::Sparse,
UnionMode::Dense => protobuf::UnionMode::Dense,
};
ArrowTypeEnum::Union(protobuf::Union {
union_types: union_types
.iter()
.map(|field| field.into())
.collect::<Vec<_>>(),
union_mode: union_mode.into(),
})
}
DataType::Dictionary(key_type, value_type) => {
ArrowTypeEnum::Dictionary(Box::new(protobuf::Dictionary {
key: Some(Box::new(key_type.as_ref().into())),
Expand Down Expand Up @@ -387,7 +396,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::Decimal(_, _) => {
Expand Down
20 changes: 16 additions & 4 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use std::{convert::TryInto, io::Cursor};

use datafusion::arrow::datatypes::UnionMode;
use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
Expand Down Expand Up @@ -246,13 +247,24 @@ impl TryInto<datafusion::arrow::datatypes::DataType>
.map(|field| field.try_into())
.collect::<Result<Vec<_>, _>>()?,
),
arrow_type::ArrowTypeEnum::Union(union) => DataType::Union(
union
arrow_type::ArrowTypeEnum::Union(union) => {
let union_mode = protobuf::UnionMode::from_i32(union.union_mode)
.ok_or_else(|| {
proto_error(
"Protobuf deserialization error: Unknown union mode type",
)
})?;
let union_mode = match union_mode {
protobuf::UnionMode::Dense => UnionMode::Dense,
protobuf::UnionMode::Sparse => UnionMode::Sparse,
};
let union_types = union
.union_types
.iter()
.map(|field| field.try_into())
.collect::<Result<Vec<_>, _>>()?,
),
.collect::<Result<Vec<_>, _>>()?;
DataType::Union(union_types, union_mode)
}
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
let pb_key_datatype = dict
.as_ref()
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ edition = "2018"
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = { version = "6.4.0" }
arrow-flight = { version = "6.4.0" }
arrow = { version = "7.0.0" }
arrow-flight = { version = "7.0.0" }
anyhow = "1"
async-trait = "0.1.36"
ballista-core = { path = "../core", version = "0.6.0" }
Expand All @@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.5"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ http-body = "0.4"
hyper = "0.14.4"
log = "0.4"
parse_arg = "0.1.3"
prost = "0.8"
prost = "0.9"
rand = "0.8"
serde = {version = "1", features = ["derive"]}
sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.5"
tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"

Expand All @@ -60,7 +60,7 @@ uuid = { version = "0.8", features = ["v4"] }

[build-dependencies]
configure_me_codegen = "0.4.1"
tonic-build = { version = "0.5" }
tonic-build = { version = "0.6" }

[package.metadata.configure_me.bin]
scheduler = "scheduler_config_spec.toml"
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ clap = "2.33"
rustyline = "9.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion", version = "6.0.0" }
arrow = { version = "6.4.0" }
arrow = { version = "7.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
6 changes: 3 additions & 3 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "6.4.0" }
arrow-flight = { version = "7.0.0" }
datafusion = { path = "../datafusion" }
prost = "0.8"
tonic = "0.5"
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
futures = "0.3"
num_cpus = "1.13.0"
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ avro = ["avro-rs", "num-traits"]
[dependencies]
ahash = { version = "0.7", default-features = false }
hashbrown = { version = "0.11", features = ["raw"] }
arrow = { version = "6.4.0", features = ["prettyprint"] }
parquet = { version = "6.4.0", features = ["arrow"] }
arrow = { version = "7.0.0", features = ["prettyprint"] }
parquet = { version = "7.0.0", features = ["arrow"] }
sqlparser = "0.13"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
Loading