Skip to content

Commit

Permalink
ARROW-10818: [Rust] Implement DecimalType
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jan 24, 2021
1 parent 10f4ada commit b352649
Show file tree
Hide file tree
Showing 22 changed files with 1,134 additions and 88 deletions.
1 change: 1 addition & 0 deletions rust/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ flatbuffers = "^0.8"
hex = "0.4"
prettytable-rs = { version = "0.8.0", optional = true }
lexical-core = "^0.7"
num-bigint = "0.3"

[features]
default = []
Expand Down
42 changes: 25 additions & 17 deletions rust/arrow/src/array/array_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
use std::fmt;
use std::mem;
use std::{any::Any, iter::FromIterator};
use std::{
convert::{From, TryInto},
sync::Arc,
};
use std::{convert::From, sync::Arc};

use super::{
array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef,
FixedSizeListArray, GenericBinaryIter, GenericListArray, OffsetSizeTrait,
};
use crate::buffer::Buffer;
use crate::util::bit_util;
use crate::{buffer::MutableBuffer, datatypes::DataType};
use crate::{buffer::Buffer, datatypes::ArrowDecimalType};
use crate::{
buffer::MutableBuffer, datatypes::DataType, datatypes::Int128DecimalType,
datatypes::ToByteSlice,
};

/// Like OffsetSizeTrait, but specialized for Binary
// This allow us to expose a constant datatype for the GenericBinaryArray
Expand Down Expand Up @@ -485,9 +485,9 @@ pub struct DecimalArray {
}

impl DecimalArray {
/// Returns the element at index `i` as i128.
pub fn value(&self, i: usize) -> i128 {
pub fn value(&self, i: usize) -> Int128DecimalType {
assert!(i < self.data.len(), "DecimalArray out of bounds access");

let offset = i.checked_add(self.data.offset()).unwrap();
let raw_val = unsafe {
let pos = self.value_offset_at(offset);
Expand All @@ -496,11 +496,12 @@ impl DecimalArray {
(self.value_offset_at(offset + 1) - pos) as usize,
)
};
let as_array = raw_val.try_into();
match as_array {
Ok(v) if raw_val.len() == 16 => i128::from_le_bytes(v),
_ => panic!("DecimalArray elements are not 128bit integers."),
}

Int128DecimalType::from_bytes_with_precision_scale(
raw_val,
self.precision,
self.scale,
)
}

/// Returns the offset for the element at index `i`.
Expand Down Expand Up @@ -987,13 +988,20 @@ mod tests {
192, 219, 180, 17, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 36, 75, 238, 253,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
];
let array_data = ArrayData::builder(DataType::Decimal(23, 6))
let array_data = ArrayData::builder(DataType::Decimal(23, 0))
.len(2)
.add_buffer(Buffer::from(&values[..]))
.build();
let decimal_array = DecimalArray::from(array_data);
assert_eq!(8_887_000_000, decimal_array.value(0));
assert_eq!(-8_887_000_000, decimal_array.value(1));

assert_eq!(
Int128DecimalType::new(8_887_000_000_i128, 23, 0),
decimal_array.value(0)
);
assert_eq!(
Int128DecimalType::new(-8_887_000_000_i128, 23, 0),
decimal_array.value(1)
);
assert_eq!(16, decimal_array.value_length());
}

Expand All @@ -1009,7 +1017,7 @@ mod tests {
.build();
let arr = DecimalArray::from(array_data);
assert_eq!(
"DecimalArray<23, 6>\n[\n 8887000000,\n -8887000000,\n]",
"DecimalArray<23, 6>\n[\n Decimal<23, 6>(\"8887.000000\"),\n Decimal<23, 6>(\"-8887.000000\"),\n]",
format!("{:?}", arr)
);
}
Expand Down
40 changes: 20 additions & 20 deletions rust/arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1206,11 +1206,18 @@ impl DecimalBuilder {
///
/// Automatically calls the `append` method to delimit the slice appended in as a
/// distinct array element.
pub fn append_value(&mut self, value: i128) -> Result<()> {
let value_as_bytes = Self::from_i128_to_fixed_size_bytes(
value,
self.builder.value_length() as usize,
)?;
pub fn append_value(&mut self, value: Int128DecimalType) -> Result<()> {
if self.scale != value.scale || self.precision != value.precision {
return Err(ArrowError::InvalidArgumentError(format!(
"Value Int128DecimalType<{}, {}> does not have the same scale as DecimalBuilder<{}, {}>",
value.precision,
value.scale,
self.precision,
self.scale,
)));
}

let value_as_bytes = value.to_byte_slice();
if self.builder.value_length() != value_as_bytes.len() as i32 {
return Err(ArrowError::InvalidArgumentError(
"Byte slice does not have the same length as DecimalBuilder value lengths".to_string()
Expand All @@ -1222,17 +1229,6 @@ impl DecimalBuilder {
self.builder.append(true)
}

fn from_i128_to_fixed_size_bytes(v: i128, size: usize) -> Result<Vec<u8>> {
if size > 16 {
return Err(ArrowError::InvalidArgumentError(
"DecimalBuilder only supports values up to 16 bytes.".to_string(),
));
}
let res = v.to_le_bytes();
let start_byte = 16 - size;
Ok(res[start_byte..16].to_vec())
}

/// Append a null value to the array.
pub fn append_null(&mut self) -> Result<()> {
let length: usize = self.builder.value_length() as usize;
Expand Down Expand Up @@ -2755,14 +2751,18 @@ mod tests {

#[test]
fn test_decimal_builder() {
let mut builder = DecimalBuilder::new(30, 23, 6);
let mut builder = DecimalBuilder::new(30, 23, 0);

builder.append_value(8_887_000_000).unwrap();
builder
.append_value(Int128DecimalType::new(8_887_000_000, 23, 0))
.unwrap();
builder.append_null().unwrap();
builder.append_value(-8_887_000_000).unwrap();
builder
.append_value(Int128DecimalType::new(-8_887_000_000, 23, 0))
.unwrap();
let decimal_array: DecimalArray = builder.finish();

assert_eq!(&DataType::Decimal(23, 6), decimal_array.data_type());
assert_eq!(&DataType::Decimal(23, 0), decimal_array.data_type());
assert_eq!(3, decimal_array.len());
assert_eq!(1, decimal_array.null_count());
assert_eq!(32, decimal_array.value_offset(2));
Expand Down
8 changes: 5 additions & 3 deletions rust/arrow/src/array/equal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{

use crate::{
buffer::Buffer,
datatypes::{ArrowPrimitiveType, DataType, IntervalUnit},
datatypes::{ArrowPrimitiveType, DataType, Int128DecimalType, IntervalUnit},
};

mod boolean;
Expand Down Expand Up @@ -733,11 +733,13 @@ mod tests {
}

fn create_decimal_array(data: &[Option<i128>]) -> ArrayDataRef {
let mut builder = DecimalBuilder::new(20, 23, 6);
let mut builder = DecimalBuilder::new(20, 23, 0);

for d in data {
if let Some(v) = d {
builder.append_value(*v).unwrap();
builder
.append_value(Int128DecimalType::new(*v, 23, 0))
.unwrap();
} else {
builder.append_null().unwrap();
}
Expand Down
20 changes: 14 additions & 6 deletions rust/arrow/src/array/equal_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl JsonEqual for DecimalArray {
JString(s) => {
self.is_valid(i)
&& (s
.parse::<i128>()
.parse::<Int128DecimalType>()
.map_or_else(|_| false, |v| v == self.value(i)))
}
JNull => self.is_null(i),
Expand Down Expand Up @@ -899,10 +899,14 @@ mod tests {
#[test]
fn test_decimal_json_equal() {
// Test the equal case
let mut builder = DecimalBuilder::new(30, 23, 6);
builder.append_value(1_000).unwrap();
let mut builder = DecimalBuilder::new(30, 23, 0);
builder
.append_value(Int128DecimalType::new(1_000, 23, 0))
.unwrap();
builder.append_null().unwrap();
builder.append_value(-250).unwrap();
builder
.append_value(Int128DecimalType::new(-250, 23, 0))
.unwrap();
let arrow_array: DecimalArray = builder.finish();
let json_array: Value = serde_json::from_str(
r#"
Expand All @@ -918,9 +922,13 @@ mod tests {
assert!(json_array.eq(&arrow_array));

// Test unequal case
builder.append_value(1_000).unwrap();
builder
.append_value(Int128DecimalType::new(-1_000, 23, 0))
.unwrap();
builder.append_null().unwrap();
builder.append_value(55).unwrap();
builder
.append_value(Int128DecimalType::new(-55, 23, 0))
.unwrap();
let arrow_array: DecimalArray = builder.finish();
let json_array: Value = serde_json::from_str(
r#"
Expand Down
63 changes: 63 additions & 0 deletions rust/arrow/src/compute/kernels/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
(Utf8, _) => DataType::is_numeric(to_type),
(_, Utf8) => DataType::is_numeric(from_type) || from_type == &Binary,

// start decimal casts
(UInt8, Decimal(_, _)) => true,
(UInt16, Decimal(_, _)) => true,
(UInt32, Decimal(_, _)) => true,
(UInt64, Decimal(_, _)) => true,
(Int8, Decimal(_, _)) => true,
(Int16, Decimal(_, _)) => true,
(Int32, Decimal(_, _)) => true,
(Int64, Decimal(_, _)) => true,
(Float32, Decimal(_, _)) => true,
(Float64, Decimal(_, _)) => true,
// end decimal casts

// start numeric casts
(UInt8, UInt16) => true,
(UInt8, UInt32) => true,
Expand Down Expand Up @@ -443,6 +456,17 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result<ArrayRef> {
))),
},

// start decimal casts
(Int8, Decimal(p, s)) => cast_numeric_to_decimal::<Int8Type>(array, *p, *s),
(Int16, Decimal(p, s)) => cast_numeric_to_decimal::<Int16Type>(array, *p, *s),
(Int32, Decimal(p, s)) => cast_numeric_to_decimal::<Int32Type>(array, *p, *s),
(Int64, Decimal(p, s)) => cast_numeric_to_decimal::<Int64Type>(array, *p, *s),
(Uint8, Decimal(p, s)) => cast_numeric_to_decimal::<UInt8Type>(array, *p, *s),
(Uint16, Decimal(p, s)) => cast_numeric_to_decimal::<UInt16Type>(array, *p, *s),
(Uint32, Decimal(p, s)) => cast_numeric_to_decimal::<UInt32Type>(array, *p, *s),
(Uint64, Decimal(p, s)) => cast_numeric_to_decimal::<UInt64Type>(array, *p, *s),
// end numeric casts

// start numeric casts
(UInt8, UInt16) => cast_numeric_arrays::<UInt8Type, UInt16Type>(array),
(UInt8, UInt32) => cast_numeric_arrays::<UInt8Type, UInt32Type>(array),
Expand Down Expand Up @@ -849,6 +873,35 @@ where
Ok(Arc::new(PrimitiveArray::<TO>::from(data)) as ArrayRef)
}

fn cast_numeric_to_decimal<FROM>(
from: &ArrayRef,
precision: usize,
scale: usize,
) -> Result<ArrayRef>
where
FROM: ArrowNumericType,
FROM::Native: num::NumCast,
{
let values = from
.as_any()
.downcast_ref::<PrimitiveArray<FROM>>()
.unwrap();
let mut builder = DecimalBuilder::new(values.len(), precision, scale);

for maybe_value in values.iter() {
match maybe_value {
Some(v) => builder.append_value(Int128DecimalType::new(
100000000000000000_i128,
precision,
scale,
))?,
None => builder.append_null()?,
};
}

Ok(Arc::new(builder.finish()))
}

/// Convert Array into a PrimitiveArray of type, and apply numeric cast
fn cast_numeric_arrays<FROM, TO>(from: &ArrayRef) -> Result<ArrayRef>
where
Expand Down Expand Up @@ -1205,6 +1258,16 @@ mod tests {
assert!(9.0 - c.value(4) < f64::EPSILON);
}

#[test]
fn test_cast_i64_to_decimal() {
let a = Int32Array::from(vec![5]);
let array = Arc::new(a) as ArrayRef;
let b = cast(&array, &DataType::Decimal(5, 10)).unwrap();
let c = b.as_any().downcast_ref::<DecimalArray>().unwrap();

assert_eq!("5.0000000000", c.value(0).to_string());
}

#[test]
fn test_cast_i32_to_u8() {
let a = Int32Array::from(vec![-5, 6, -7, 8, 100000000]);
Expand Down
49 changes: 48 additions & 1 deletion rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,15 @@ use std::sync::Arc;

use csv as csv_crate;

use crate::array::{ArrayRef, BooleanArray, PrimitiveArray, StringArray};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::{
array::{
ArrayRef, BooleanArray, DecimalArray, DecimalBuilder, PrimitiveArray, StringArray,
},
buffer,
};

use self::csv_crate::{ByteRecord, StringRecord};

Expand Down Expand Up @@ -413,6 +418,9 @@ fn parse(
let field = &fields[i];
match field.data_type() {
&DataType::Boolean => build_boolean_array(line_number, rows, i),
&DataType::Decimal(p, s) => {
build_decimal_array(line_number, rows, i, p, s)
}
&DataType::Int8 => {
build_primitive_array::<Int8Type>(line_number, rows, i)
}
Expand Down Expand Up @@ -613,6 +621,45 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
.map(|e| Arc::new(e) as ArrayRef)
}

// parses a specific column (col_idx) into an Arrow Array.
fn build_decimal_array(
line_number: usize,
rows: &[StringRecord],
col_idx: usize,
precision: usize,
scale: usize,
) -> Result<ArrayRef> {
let mut builder = DecimalBuilder::new(rows.len(), precision, scale);

for (row_index, row) in rows.iter().enumerate() {
match row.get(col_idx) {
Some(s) => {
if s.is_empty() {
builder.append_null()?
}

let parsed = match Int128DecimalType::parse(s, precision, scale) {
Ok(number) => number,
Err(e) => {
return Err(ArrowError::ParseError(format!(
// TODO: we should surface the underlying error here.
"Error while parsing value {} for column {} at line {}",
s,
col_idx,
line_number + row_index
)));
}
};

builder.append_value(parsed)?
}
None => builder.append_null()?,
}
}

Ok(Arc::new(builder.finish()))
}

// parses a specific column (col_idx) into an Arrow Array.
fn build_boolean_array(
line_number: usize,
Expand Down
Loading

0 comments on commit b352649

Please sign in to comment.