Skip to content

Commit

Permalink
Upgrade arrow
Browse files Browse the repository at this point in the history
fix decimal (#4)

Fix human error

Patch crates io to fix build (#5)

* fix decimal

* patch crate versions

Patch objectstore

Test in CI

Undo override?

Fix more errors

Fix last error?

Formatting

Clippy
  • Loading branch information
Brent Gardner committed Aug 2, 2022
1 parent 92e98df commit dae8678
Show file tree
Hide file tree
Showing 58 changed files with 442 additions and 360 deletions.
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ simd = ["datafusion/simd"]
snmalloc = ["snmalloc-rs"]

[dependencies]
datafusion = { path = "../datafusion/core" }
datafusion = { path = "../datafusion/core", features = [], optional = false }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "19.0.0" }
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = [], optional = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "10.0.0" }
datafusion = { path = "../datafusion/core", features = [], optional = false }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "19.0.0" }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = [], optional = false }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
datafusion = { path = "../datafusion/core", features = [], optional = false }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"
Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "19.0.0", features = ["prettyprint"] }
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = ["prettyprint"], optional = false }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.86.1", optional = true }
object_store = { version = "0.3", optional = true }
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = [], optional = true }
ordered-float = "3.0"
parquet = { version = "19.0.0", features = ["arrow"], optional = true }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.19"
22 changes: 11 additions & 11 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{
IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DECIMAL_MAX_PRECISION,
DECIMAL128_MAX_PRECISION,
},
util::decimal::{BasicDecimal, Decimal128},
};
Expand Down Expand Up @@ -611,7 +611,7 @@ impl ScalarValue {
scale: usize,
) -> Result<Self> {
// make sure the precision and scale is valid
if precision <= DECIMAL_MAX_PRECISION && scale <= precision {
if precision <= DECIMAL128_MAX_PRECISION && scale <= precision {
return Ok(ScalarValue::Decimal128(Some(value), precision, scale));
}
Err(DataFusionError::Internal(format!(
Expand Down Expand Up @@ -654,7 +654,7 @@ impl ScalarValue {
ScalarValue::Int32(_) => DataType::Int32,
ScalarValue::Int64(_) => DataType::Int64,
ScalarValue::Decimal128(_, precision, scale) => {
DataType::Decimal(*precision, *scale)
DataType::Decimal128(*precision, *scale)
}
ScalarValue::TimestampSecond(_, tz_opt) => {
DataType::Timestamp(TimeUnit::Second, tz_opt.clone())
Expand Down Expand Up @@ -935,7 +935,7 @@ impl ScalarValue {
}

let array: ArrayRef = match &data_type {
DataType::Decimal(precision, scale) => {
DataType::Decimal128(precision, scale) => {
let decimal_array =
ScalarValue::iter_to_decimal_array(scalars, precision, scale)?;
Arc::new(decimal_array)
Expand Down Expand Up @@ -1448,7 +1448,7 @@ impl ScalarValue {

Ok(match array.data_type() {
DataType::Null => ScalarValue::Null,
DataType::Decimal(precision, scale) => {
DataType::Decimal128(precision, scale) => {
ScalarValue::get_decimal_value_from_array(array, index, precision, scale)
}
DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
Expand Down Expand Up @@ -1899,7 +1899,7 @@ impl TryFrom<&DataType> for ScalarValue {
DataType::UInt16 => ScalarValue::UInt16(None),
DataType::UInt32 => ScalarValue::UInt32(None),
DataType::UInt64 => ScalarValue::UInt64(None),
DataType::Decimal(precision, scale) => {
DataType::Decimal128(precision, scale) => {
ScalarValue::Decimal128(None, *precision, *scale)
}
DataType::Utf8 => ScalarValue::Utf8(None),
Expand Down Expand Up @@ -2145,7 +2145,7 @@ mod tests {
#[test]
fn scalar_decimal_test() {
let decimal_value = ScalarValue::Decimal128(Some(123), 10, 1);
assert_eq!(DataType::Decimal(10, 1), decimal_value.get_datatype());
assert_eq!(DataType::Decimal128(10, 1), decimal_value.get_datatype());
let try_into_value: i128 = decimal_value.clone().try_into().unwrap();
assert_eq!(123_i128, try_into_value);
assert!(!decimal_value.is_null());
Expand All @@ -2163,14 +2163,14 @@ mod tests {
let array = decimal_value.to_array();
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
assert_eq!(1, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
assert_eq!(123i128, array.value(0).as_i128());

// decimal scalar to array with size
let array = decimal_value.to_array_of_size(10);
let array_decimal = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
assert_eq!(10, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
assert_eq!(123i128, array_decimal.value(0).as_i128());
assert_eq!(123i128, array_decimal.value(9).as_i128());
// test eq array
Expand Down Expand Up @@ -2208,7 +2208,7 @@ mod tests {
// convert the vec to decimal array and check the result
let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
assert_eq!(3, array.len());
assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone());

let decimal_vec = vec![
ScalarValue::Decimal128(Some(1), 10, 2),
Expand All @@ -2218,7 +2218,7 @@ mod tests {
];
let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
assert_eq!(4, array.len());
assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone());

assert!(ScalarValue::try_new_decimal128(1, 10, 2)
.unwrap()
Expand Down
22 changes: 11 additions & 11 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "19.0.0", features = ["prettyprint"] }
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = ["prettyprint"], optional = false }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "10.0.0", features = ["parquet", "object_store"] }
datafusion-expr = { path = "../expr", version = "10.0.0" }
datafusion-jit = { path = "../jit", version = "10.0.0", optional = true }
datafusion-optimizer = { path = "../optimizer", version = "10.0.0" }
datafusion-physical-expr = { path = "../physical-expr", version = "10.0.0" }
datafusion-row = { path = "../row", version = "10.0.0" }
datafusion-sql = { path = "../sql", version = "10.0.0" }
datafusion-common = { path = "../common", features = ["parquet", "object_store"], optional = false }
datafusion-expr = { path = "../expr", features = [], optional = false }
datafusion-jit = { path = "../jit", features = [], optional = true }
datafusion-optimizer = { path = "../optimizer", features = [], optional = false }
datafusion-physical-expr = { path = "../physical-expr", features = [], optional = false }
datafusion-row = { path = "../row", features = [], optional = false }
datafusion-sql = { path = "../sql", features = [], optional = false }
futures = "0.3"
glob = "0.3.0"
hashbrown = { version = "0.12", features = ["raw"] }
Expand All @@ -75,10 +75,10 @@ lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.3.0"
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = [], optional = false }
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "19.0.0", features = ["arrow", "async"] }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = ["arrow", "async"], optional = false }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand All @@ -98,7 +98,7 @@ csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
fuzz-utils = { path = "fuzz-utils", features = [], optional = false }

[[bench]]
harness = false
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "19.0.0", features = ["prettyprint"] }
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "6bb4b5ee16488c2a6427a5897bb6fbe334cc280e", features = ["prettyprint"], optional = false }
env_logger = "0.9.0"
rand = "0.8"
10 changes: 4 additions & 6 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
"Failed to parse avro value: {:?}",
e
))),
other => {
return Err(ArrowError::ParseError(format!(
"Row needs to be of type object, got: {:?}",
other
)))
}
other => Err(ArrowError::ParseError(format!(
"Row needs to be of type object, got: {:?}",
other
))),
})
.collect::<ArrowResult<Vec<Vec<(String, Value)>>>>()?;
if rows.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fn schema_to_field_with_props(
AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
AvroSchema::Decimal {
precision, scale, ..
} => DataType::Decimal(*precision, *scale),
} => DataType::Decimal128(*precision, *scale),
AvroSchema::Uuid => DataType::FixedSizeBinary(16),
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
Expand Down Expand Up @@ -216,7 +216,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::Union(_, _, _) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
DataType::Decimal(_, _) => "decimal",
DataType::Decimal128(_, _) => "decimal",
DataType::Decimal256(_, _) => "decimal",
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl InformationSchemaColumnsBuilder {
Float32 => (Some(24), Some(2), None),
// Numbers from postgres `double` type
Float64 => (Some(24), Some(2), None),
Decimal(precision, scale) => {
Decimal128(precision, scale) => {
(Some(*precision as u64), Some(10), Some(*scale as u64))
}
_ => (None, None, None),
Expand Down
27 changes: 22 additions & 5 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ mod tests {
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{GetResult, ListResult};
use object_store::{GetResult, ListResult, MultipartId};
use tokio::io::AsyncWrite;

#[tokio::test]
async fn read_merged_batches() -> Result<()> {
Expand Down Expand Up @@ -649,6 +650,22 @@ mod tests {
Err(object_store::Error::NotImplemented)
}

async fn put_multipart(
&self,
_location: &Path,
) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>
{
Err(object_store::Error::NotImplemented)
}

async fn abort_multipart(
&self,
_location: &Path,
_multipart_id: &MultipartId,
) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
Err(object_store::Error::NotImplemented)
}
Expand Down Expand Up @@ -1073,30 +1090,30 @@ mod tests {
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(4, 2), column.data_type());
assert_eq!(&DataType::Decimal128(4, 2), column.data_type());

// parquet use the int64 as the physical type to store decimal
let exec = get_exec("int64_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(10, 2), column.data_type());
assert_eq!(&DataType::Decimal128(10, 2), column.data_type());

// parquet use the fixed length binary as the physical type to store decimal
let exec = get_exec("fixed_length_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(25, 2), column.data_type());
assert_eq!(&DataType::Decimal128(25, 2), column.data_type());

let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(13, 2), column.data_type());
assert_eq!(&DataType::Decimal128(13, 2), column.data_type());

// parquet use the fixed length binary as the physical type to store decimal
// TODO: arrow-rs don't support convert the physical type of binary to decimal
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ mod tests {
// decimal(9,2)
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal(9, 2),
DataType::Decimal128(9, 2),
true,
)]));
// s1 > 5
Expand All @@ -1479,7 +1479,7 @@ mod tests {
// decimal(18,2)
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal(18, 2),
DataType::Decimal128(18, 2),
true,
)]));
// s1 > 5
Expand All @@ -1501,7 +1501,7 @@ mod tests {
// decimal(23,2)
let schema = Arc::new(Schema::new(vec![Field::new(
"s1",
DataType::Decimal(23, 2),
DataType::Decimal128(23, 2),
true,
)]));
// s1 > 5
Expand Down
18 changes: 17 additions & 1 deletion datafusion/core/src/physical_plan/file_format/chunked_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::path::Path;
use object_store::Result;
use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore};
use object_store::{MultipartId, Result};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWrite;

/// Wraps a [`ObjectStore`] and makes its get response return chunks
///
Expand Down Expand Up @@ -53,6 +54,21 @@ impl ObjectStore for ChunkedStore {
self.inner.put(location, bytes).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.inner.put_multipart(location).await
}

async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
self.inner.abort_multipart(location, multipart_id).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let bytes = self.inner.get(location).await?.bytes().await?;
let mut offset = 0;
Expand Down

0 comments on commit dae8678

Please sign in to comment.