Skip to content

Commit

Permalink
Return error from JSON writer rather than panic
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Jan 19, 2022
1 parent 4218c74 commit 77d7977
Showing 1 changed file with 78 additions and 73 deletions.
151 changes: 78 additions & 73 deletions arrow/src/json/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! let a = Int32Array::from(vec![1, 2, 3]);
//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
//!
//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]).unwrap();
//! assert_eq!(
//! serde_json::Value::Object(json_rows[1].clone()),
//! serde_json::json!({"a": 2}),
Expand Down Expand Up @@ -110,64 +110,64 @@ use serde_json::Value;

use crate::array::*;
use crate::datatypes::*;
use crate::error::Result;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
as_primitive_array::<T>(array)
fn primitive_array_to_json<T: ArrowPrimitiveType>(
array: &ArrayRef,
) -> Result<Vec<Value>> {
Ok(as_primitive_array::<T>(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into_json_value().unwrap_or(Value::Null),
None => Value::Null,
})
.collect()
.collect())
}

fn struct_array_to_jsonmap_array(
array: &StructArray,
row_count: usize,
) -> Vec<JsonMap<String, Value>> {
) -> Result<Vec<JsonMap<String, Value>>> {
let inner_col_names = array.column_names();

let mut inner_objs = iter::repeat(JsonMap::new())
.take(row_count)
.collect::<Vec<JsonMap<String, Value>>>();

array
.columns()
.iter()
.enumerate()
.for_each(|(j, struct_col)| {
set_column_for_json_rows(
&mut inner_objs,
row_count,
struct_col,
inner_col_names[j],
);
});

inner_objs
for (j, struct_col) in array.columns().iter().enumerate() {
match set_column_for_json_rows(
&mut inner_objs,
row_count,
struct_col,
inner_col_names[j],
) {
Ok(_) => {}
Err(e) => return Err(e),
}
}
Ok(inner_objs)
}

/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON [`serde_json::Value`]'s
pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
pub fn array_to_json_array(array: &ArrayRef) -> Result<Vec<Value>> {
match array.data_type() {
DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
DataType::Boolean => as_boolean_array(array)
DataType::Null => Ok(iter::repeat(Value::Null).take(array.len()).collect()),
DataType::Boolean => Ok(as_boolean_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
.collect(),
.collect()),

DataType::Utf8 => as_string_array(array)
DataType::Utf8 => Ok(as_string_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
.collect(),
.collect()),
DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
Expand All @@ -178,31 +178,29 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
DataType::List(_) => as_list_array(array)
DataType::List(_) => Ok(as_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Value::Array(array_to_json_array(&v)),
Some(v) => Value::Array(array_to_json_array(&v).unwrap()),
None => Value::Null,
})
.collect(),
DataType::LargeList(_) => as_large_list_array(array)
.collect()),
DataType::LargeList(_) => Ok(as_large_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Value::Array(array_to_json_array(&v)),
Some(v) => Value::Array(array_to_json_array(&v).unwrap()),
None => Value::Null,
})
.collect(),
.collect()),
DataType::Struct(_) => {
let jsonmaps =
struct_array_to_jsonmap_array(as_struct_array(array), array.len());
jsonmaps.into_iter().map(Value::Object).collect()
}
_ => {
panic!(
"Unsupported datatype for array conversion: {:#?}",
array.data_type()
);
struct_array_to_jsonmap_array(as_struct_array(array), array.len())?;
Ok(jsonmaps.into_iter().map(Value::Object).collect())
}
t => Err(ArrowError::JsonError(format!(
"data type {:?} not supported",
t
))),
}
}

Expand Down Expand Up @@ -261,37 +259,37 @@ fn set_column_for_json_rows(
row_count: usize,
array: &ArrayRef,
col_name: &str,
) {
) -> Result<()> {
match array.data_type() {
DataType::Int8 => {
set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name);
}
DataType::Int16 => {
set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name);
}
DataType::Int32 => {
set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name);
}
DataType::Int64 => {
set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name);
}
DataType::UInt8 => {
set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name);
}
DataType::UInt16 => {
set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name);
}
DataType::UInt32 => {
set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name);
}
DataType::UInt64 => {
set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name);
}
DataType::Float32 => {
set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name);
}
DataType::Float64 => {
set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name);
}
DataType::Null => {
// when value is null, we simply skip setting the key
Expand Down Expand Up @@ -444,7 +442,7 @@ fn set_column_for_json_rows(
}
DataType::Struct(_) => {
let inner_objs =
struct_array_to_jsonmap_array(as_struct_array(array), row_count);
struct_array_to_jsonmap_array(as_struct_array(array), row_count)?;
rows.iter_mut()
.take(row_count)
.zip(inner_objs.into_iter())
Expand All @@ -457,34 +455,34 @@ fn set_column_for_json_rows(
rows.iter_mut()
.zip(listarr.iter())
.take(row_count)
.for_each(|(row, maybe_value)| {
.try_for_each(|(row, maybe_value)| -> Result<()> {
if let Some(v) = maybe_value {
row.insert(
col_name.to_string(),
Value::Array(array_to_json_array(&v)),
Value::Array(array_to_json_array(&v)?),
);
}
});
Ok(())
})?;
}
DataType::LargeList(_) => {
let listarr = as_large_list_array(array);
rows.iter_mut()
.zip(listarr.iter())
.take(row_count)
.for_each(|(row, maybe_value)| {
.try_for_each(|(row, maybe_value)| -> Result<()> {
if let Some(v) = maybe_value {
row.insert(
col_name.to_string(),
Value::Array(array_to_json_array(&v)),
);
let val = array_to_json_array(&v)?;
row.insert(col_name.to_string(), Value::Array(val));
}
});
Ok(())
})?;
}
DataType::Dictionary(_, value_type) => {
let slice = array.slice(0, row_count);
let hydrated = crate::compute::kernels::cast::cast(&slice, value_type)
.expect("cannot cast dictionary to underlying values");
set_column_for_json_rows(rows, row_count, &hydrated, col_name)
set_column_for_json_rows(rows, row_count, &hydrated, col_name)?;
}
DataType::Map(_, _) => {
let maparr = as_map_array(array);
Expand All @@ -494,11 +492,14 @@ fn set_column_for_json_rows(

// Keys have to be strings to convert to json.
if !matches!(keys.data_type(), DataType::Utf8) {
panic!("Unsupported datatype: {:#?}", array.data_type());
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
keys.data_type()
)));
}

let keys = as_string_array(&keys);
let values = array_to_json_array(&values);
let values = array_to_json_array(&values)?;

let mut kv = keys.iter().zip(values.into_iter());

Expand All @@ -522,34 +523,38 @@ fn set_column_for_json_rows(
}
}
_ => {
panic!("Unsupported datatype: {:#?}", array.data_type());
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
array.data_type()
)))
}
}
Ok(())
}

/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
/// [`JsonMap`]s (objects)
pub fn record_batches_to_json_rows(
batches: &[RecordBatch],
) -> Vec<JsonMap<String, Value>> {
) -> Result<Vec<JsonMap<String, Value>>> {
let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
.take(batches.iter().map(|b| b.num_rows()).sum())
.collect();

if !rows.is_empty() {
let schema = batches[0].schema();
let mut base = 0;
batches.iter().for_each(|batch| {
for batch in batches {
let row_count = batch.num_rows();
batch.columns().iter().enumerate().for_each(|(j, col)| {
for (j, col) in batch.columns().iter().enumerate() {
let col_name = schema.field(j).name();
set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
});
set_column_for_json_rows(&mut rows[base..], row_count, col, col_name)?
}
base += row_count;
});
}
}

rows
Ok(rows)
}

/// This trait defines how to format a sequence of JSON objects to a
Expand Down Expand Up @@ -683,7 +688,7 @@ where

/// Convert the [`RecordBatch`] into JSON rows, and write them to the output
pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
for row in record_batches_to_json_rows(batches) {
for row in record_batches_to_json_rows(batches)? {
self.write_row(&Value::Object(row))?;
}
Ok(())
Expand Down

0 comments on commit 77d7977

Please sign in to comment.