-
Notifications
You must be signed in to change notification settings - Fork 670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return error from JSON writer rather than panic #1205
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ | |
//! let a = Int32Array::from(vec![1, 2, 3]); | ||
//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); | ||
//! | ||
//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]); | ||
//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]).unwrap(); | ||
//! assert_eq!( | ||
//! serde_json::Value::Object(json_rows[1].clone()), | ||
//! serde_json::json!({"a": 2}), | ||
|
@@ -110,64 +110,64 @@ use serde_json::Value; | |
|
||
use crate::array::*; | ||
use crate::datatypes::*; | ||
use crate::error::Result; | ||
use crate::error::{ArrowError, Result}; | ||
use crate::record_batch::RecordBatch; | ||
|
||
fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> { | ||
as_primitive_array::<T>(array) | ||
fn primitive_array_to_json<T: ArrowPrimitiveType>( | ||
array: &ArrayRef, | ||
) -> Result<Vec<Value>> { | ||
Ok(as_primitive_array::<T>(array) | ||
.iter() | ||
.map(|maybe_value| match maybe_value { | ||
Some(v) => v.into_json_value().unwrap_or(Value::Null), | ||
None => Value::Null, | ||
}) | ||
.collect() | ||
.collect()) | ||
} | ||
|
||
fn struct_array_to_jsonmap_array( | ||
array: &StructArray, | ||
row_count: usize, | ||
) -> Vec<JsonMap<String, Value>> { | ||
) -> Result<Vec<JsonMap<String, Value>>> { | ||
let inner_col_names = array.column_names(); | ||
|
||
let mut inner_objs = iter::repeat(JsonMap::new()) | ||
.take(row_count) | ||
.collect::<Vec<JsonMap<String, Value>>>(); | ||
|
||
array | ||
.columns() | ||
.iter() | ||
.enumerate() | ||
.for_each(|(j, struct_col)| { | ||
set_column_for_json_rows( | ||
&mut inner_objs, | ||
row_count, | ||
struct_col, | ||
inner_col_names[j], | ||
); | ||
}); | ||
|
||
inner_objs | ||
for (j, struct_col) in array.columns().iter().enumerate() { | ||
match set_column_for_json_rows( | ||
&mut inner_objs, | ||
row_count, | ||
struct_col, | ||
inner_col_names[j], | ||
) { | ||
Ok(_) => {} | ||
Err(e) => return Err(e), | ||
} | ||
} | ||
Ok(inner_objs) | ||
} | ||
|
||
/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON [`serde_json::Value`]'s | ||
pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> { | ||
pub fn array_to_json_array(array: &ArrayRef) -> Result<Vec<Value>> { | ||
match array.data_type() { | ||
DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(), | ||
DataType::Boolean => as_boolean_array(array) | ||
DataType::Null => Ok(iter::repeat(Value::Null).take(array.len()).collect()), | ||
DataType::Boolean => Ok(as_boolean_array(array) | ||
.iter() | ||
.map(|maybe_value| match maybe_value { | ||
Some(v) => v.into(), | ||
None => Value::Null, | ||
}) | ||
.collect(), | ||
.collect()), | ||
|
||
DataType::Utf8 => as_string_array(array) | ||
DataType::Utf8 => Ok(as_string_array(array) | ||
.iter() | ||
.map(|maybe_value| match maybe_value { | ||
Some(v) => v.into(), | ||
None => Value::Null, | ||
}) | ||
.collect(), | ||
.collect()), | ||
DataType::Int8 => primitive_array_to_json::<Int8Type>(array), | ||
DataType::Int16 => primitive_array_to_json::<Int16Type>(array), | ||
DataType::Int32 => primitive_array_to_json::<Int32Type>(array), | ||
|
@@ -178,31 +178,29 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> { | |
DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array), | ||
DataType::Float32 => primitive_array_to_json::<Float32Type>(array), | ||
DataType::Float64 => primitive_array_to_json::<Float64Type>(array), | ||
DataType::List(_) => as_list_array(array) | ||
DataType::List(_) => Ok(as_list_array(array) | ||
.iter() | ||
.map(|maybe_value| match maybe_value { | ||
Some(v) => Value::Array(array_to_json_array(&v)), | ||
Some(v) => Value::Array(array_to_json_array(&v).unwrap()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when i change unwarp() with ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is inside a closure, that closure needs to have a return type of
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks ! @jhorstmann this really helps me! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The difference is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thanks a lot ! |
||
None => Value::Null, | ||
}) | ||
.collect(), | ||
DataType::LargeList(_) => as_large_list_array(array) | ||
.collect()), | ||
DataType::LargeList(_) => Ok(as_large_list_array(array) | ||
.iter() | ||
.map(|maybe_value| match maybe_value { | ||
Some(v) => Value::Array(array_to_json_array(&v)), | ||
Some(v) => Value::Array(array_to_json_array(&v).unwrap()), | ||
None => Value::Null, | ||
}) | ||
.collect(), | ||
.collect()), | ||
DataType::Struct(_) => { | ||
let jsonmaps = | ||
struct_array_to_jsonmap_array(as_struct_array(array), array.len()); | ||
jsonmaps.into_iter().map(Value::Object).collect() | ||
} | ||
_ => { | ||
panic!( | ||
"Unsupported datatype for array conversion: {:#?}", | ||
array.data_type() | ||
); | ||
struct_array_to_jsonmap_array(as_struct_array(array), array.len())?; | ||
Ok(jsonmaps.into_iter().map(Value::Object).collect()) | ||
} | ||
t => Err(ArrowError::JsonError(format!( | ||
"data type {:?} not supported", | ||
t | ||
))), | ||
} | ||
} | ||
|
||
|
@@ -261,37 +259,37 @@ fn set_column_for_json_rows( | |
row_count: usize, | ||
array: &ArrayRef, | ||
col_name: &str, | ||
) { | ||
) -> Result<()> { | ||
match array.data_type() { | ||
DataType::Int8 => { | ||
set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::Int16 => { | ||
set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::Int32 => { | ||
set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::Int64 => { | ||
set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::UInt8 => { | ||
set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::UInt16 => { | ||
set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::UInt32 => { | ||
set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::UInt64 => { | ||
set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::Float32 => { | ||
set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::Float64 => { | ||
set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name) | ||
set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name); | ||
} | ||
DataType::Null => { | ||
// when value is null, we simply skip setting the key | ||
|
@@ -444,7 +442,7 @@ fn set_column_for_json_rows( | |
} | ||
DataType::Struct(_) => { | ||
let inner_objs = | ||
struct_array_to_jsonmap_array(as_struct_array(array), row_count); | ||
struct_array_to_jsonmap_array(as_struct_array(array), row_count)?; | ||
rows.iter_mut() | ||
.take(row_count) | ||
.zip(inner_objs.into_iter()) | ||
|
@@ -457,34 +455,34 @@ fn set_column_for_json_rows( | |
rows.iter_mut() | ||
.zip(listarr.iter()) | ||
.take(row_count) | ||
.for_each(|(row, maybe_value)| { | ||
.try_for_each(|(row, maybe_value)| -> Result<()> { | ||
if let Some(v) = maybe_value { | ||
row.insert( | ||
col_name.to_string(), | ||
Value::Array(array_to_json_array(&v)), | ||
Value::Array(array_to_json_array(&v)?), | ||
); | ||
} | ||
}); | ||
Ok(()) | ||
})?; | ||
} | ||
DataType::LargeList(_) => { | ||
let listarr = as_large_list_array(array); | ||
rows.iter_mut() | ||
.zip(listarr.iter()) | ||
.take(row_count) | ||
.for_each(|(row, maybe_value)| { | ||
.try_for_each(|(row, maybe_value)| -> Result<()> { | ||
if let Some(v) = maybe_value { | ||
row.insert( | ||
col_name.to_string(), | ||
Value::Array(array_to_json_array(&v)), | ||
); | ||
let val = array_to_json_array(&v)?; | ||
row.insert(col_name.to_string(), Value::Array(val)); | ||
} | ||
}); | ||
Ok(()) | ||
})?; | ||
} | ||
DataType::Dictionary(_, value_type) => { | ||
let slice = array.slice(0, row_count); | ||
let hydrated = crate::compute::kernels::cast::cast(&slice, value_type) | ||
.expect("cannot cast dictionary to underlying values"); | ||
set_column_for_json_rows(rows, row_count, &hydrated, col_name) | ||
set_column_for_json_rows(rows, row_count, &hydrated, col_name)?; | ||
} | ||
DataType::Map(_, _) => { | ||
let maparr = as_map_array(array); | ||
|
@@ -494,11 +492,14 @@ fn set_column_for_json_rows( | |
|
||
// Keys have to be strings to convert to json. | ||
if !matches!(keys.data_type(), DataType::Utf8) { | ||
panic!("Unsupported datatype: {:#?}", array.data_type()); | ||
return Err(ArrowError::JsonError(format!( | ||
"data type {:?} not supported in nested map for json writer", | ||
keys.data_type() | ||
))); | ||
} | ||
|
||
let keys = as_string_array(&keys); | ||
let values = array_to_json_array(&values); | ||
let values = array_to_json_array(&values)?; | ||
|
||
let mut kv = keys.iter().zip(values.into_iter()); | ||
|
||
|
@@ -522,34 +523,38 @@ fn set_column_for_json_rows( | |
} | ||
} | ||
_ => { | ||
panic!("Unsupported datatype: {:#?}", array.data_type()); | ||
return Err(ArrowError::JsonError(format!( | ||
"data type {:?} not supported in nested map for json writer", | ||
array.data_type() | ||
))) | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
||
/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON | ||
/// [`JsonMap`]s (objects) | ||
pub fn record_batches_to_json_rows( | ||
batches: &[RecordBatch], | ||
) -> Vec<JsonMap<String, Value>> { | ||
) -> Result<Vec<JsonMap<String, Value>>> { | ||
let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new()) | ||
.take(batches.iter().map(|b| b.num_rows()).sum()) | ||
.collect(); | ||
|
||
if !rows.is_empty() { | ||
let schema = batches[0].schema(); | ||
let mut base = 0; | ||
batches.iter().for_each(|batch| { | ||
for batch in batches { | ||
let row_count = batch.num_rows(); | ||
batch.columns().iter().enumerate().for_each(|(j, col)| { | ||
for (j, col) in batch.columns().iter().enumerate() { | ||
let col_name = schema.field(j).name(); | ||
set_column_for_json_rows(&mut rows[base..], row_count, col, col_name); | ||
}); | ||
set_column_for_json_rows(&mut rows[base..], row_count, col, col_name)? | ||
} | ||
base += row_count; | ||
}); | ||
} | ||
} | ||
|
||
rows | ||
Ok(rows) | ||
} | ||
|
||
/// This trait defines how to format a sequence of JSON objects to a | ||
|
@@ -683,7 +688,7 @@ where | |
|
||
/// Convert the [`RecordBatch`] into JSON rows, and write them to the output | ||
pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> { | ||
for row in record_batches_to_json_rows(batches) { | ||
for row in record_batches_to_json_rows(batches)? { | ||
self.write_row(&Value::Object(row))?; | ||
} | ||
Ok(()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is exactly what the
?
operator does. There is some nice coverage of this operator in "The Book": https://doc.rust-lang.org/book/ch09-02-recoverable-errors-with-result.html#a-shortcut-for-propagating-errors-the--operatorThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb Thanks! After this PR, I have learn more philosophy of the rust error handling .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed -- we are all learning together! Thank you for your contributions