Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 179 additions & 81 deletions rust/arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@
//! let batch = json.next().unwrap().unwrap();
//! ```

use std::fs::File;
use indexmap::map::IndexMap as HashMap;
use indexmap::set::IndexSet as HashSet;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use indexmap::map::IndexMap as HashMap;
use indexmap::set::IndexSet as HashSet;
use serde_json::Value;

use crate::array::*;
Expand Down Expand Up @@ -157,9 +156,64 @@ fn generate_schema(spec: HashMap<String, HashSet<DataType>>) -> Result<Arc<Schem
/// `max_read_records` controlling the maximum number of records to read.
///
/// If `max_read_records` is not set, the whole file is read to infer its field types.
fn infer_json_schema(file: File, max_read_records: Option<usize>) -> Result<Arc<Schema>> {
///
/// Contrary to [`infer_json_schema`], this function will seek back to the start of the `reader`.
/// That way, the `reader` can be used immediately afterwards to create a [`Reader`].
///
/// # Examples
/// ```
/// use std::fs::File;
/// use std::io::BufReader;
/// use arrow::json::reader::infer_json_schema_from_seekable;
///
/// let file = File::open("test/data/mixed_arrays.json").unwrap();
/// // file's cursor's offset at 0
/// let mut reader = BufReader::new(file);
/// let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap();
/// // file's cursor's offset automatically set at 0
/// ```
pub fn infer_json_schema_from_seekable<R: Read + Seek>(
reader: &mut BufReader<R>,
max_read_records: Option<usize>,
) -> Result<Arc<Schema>> {
let schema = infer_json_schema(reader, max_read_records);
// return the reader seek back to the start
reader.seek(SeekFrom::Start(0))?;

schema
}

/// Infer the fields of a JSON file by reading the first n records of the buffer, with
/// `max_read_records` controlling the maximum number of records to read.
///
/// If `max_read_records` is not set, the whole file is read to infer its field types.
///
/// This function will not seek back to the start of the `reader`. The user has to manage the
/// original file's cursor. This function is useful when the `reader`'s cursor is not available
/// (does not implement [`Seek`]), such is the case for compressed streams decoders.
///
/// # Examples
/// ```
/// use std::fs::File;
/// use std::io::{BufReader, SeekFrom, Seek};
/// use flate2::read::GzDecoder;
/// use arrow::json::reader::infer_json_schema;
///
/// let mut file = File::open("test/data/mixed_arrays.json.gz").unwrap();
///
/// // file's cursor's offset at 0
/// let mut reader = BufReader::new(GzDecoder::new(&file));
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
/// // cursor's offset at end of file
///
/// // seek back to start so that the original file is usable again
/// file.seek(SeekFrom::Start(0)).unwrap();
/// ```
pub fn infer_json_schema<R: Read>(
reader: &mut BufReader<R>,
max_read_records: Option<usize>,
) -> Result<Arc<Schema>> {
let mut values: HashMap<String, HashSet<DataType>> = HashMap::new();
let mut reader = BufReader::new(file.try_clone()?);

let mut line = String::new();
for _ in 0..max_read_records.unwrap_or(std::usize::MAX) {
Expand Down Expand Up @@ -302,12 +356,7 @@ fn infer_json_schema(file: File, max_read_records: Option<usize>) -> Result<Arc<
};
}

let schema = generate_schema(values)?;

// return the reader seek back to the start
reader.into_inner().seek(SeekFrom::Start(0))?;

Ok(schema)
generate_schema(values)
}

/// JSON file reader
Expand All @@ -328,17 +377,12 @@ impl<R: Read> Reader<R> {
/// If reading a `File`, you can customise the Reader, such as to enable schema
/// inference, use `ReaderBuilder`.
pub fn new(
reader: BufReader<R>,
reader: R,
schema: Arc<Schema>,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
Self {
schema,
projection,
reader,
batch_size,
}
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
}

/// Returns the schema of the reader, useful for getting the schema without reading
Expand All @@ -364,6 +408,23 @@ impl<R: Read> Reader<R> {
}
}

/// Create a new JSON Reader from a `BufReader<R: Read>`
///
/// To customize the schema, such as to enable schema inference, use `ReaderBuilder`
pub fn from_buf_reader(
reader: BufReader<R>,
schema: Arc<Schema>,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
Self {
schema,
projection,
reader,
batch_size,
}
}

/// Read the next batch of records
pub fn next(&mut self) -> Result<Option<RecordBatch>> {
let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
Expand Down Expand Up @@ -752,14 +813,16 @@ impl ReaderBuilder {
}

/// Create a new `Reader` from the `ReaderBuilder`
pub fn build<R: Read>(self, file: File) -> Result<Reader<File>> {
pub fn build<R: Read + Seek>(self, source: R) -> Result<Reader<R>> {
let mut buf_reader = BufReader::new(source);

// check if schema should be inferred
let schema = match self.schema {
Some(schema) => schema,
None => infer_json_schema(file.try_clone()?, self.max_records)?,
None => infer_json_schema_from_seekable(&mut buf_reader, self.max_records)?,
};
let buf_reader = BufReader::new(file);
Ok(Reader::new(

Ok(Reader::from_buf_reader(
buf_reader,
schema,
self.batch_size,
Expand All @@ -773,6 +836,8 @@ mod tests {
use crate::datatypes::DataType::Dictionary;

use super::*;
use flate2::read::GzDecoder;
use std::fs::File;

#[test]
fn test_json_basic() {
Expand Down Expand Up @@ -901,7 +966,7 @@ mod tests {
]);

let mut reader: Reader<File> = Reader::new(
BufReader::new(File::open("test/data/basic.json").unwrap()),
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
1024,
None,
Expand Down Expand Up @@ -953,7 +1018,7 @@ mod tests {
]);

let mut reader: Reader<File> = Reader::new(
BufReader::new(File::open("test/data/basic.json").unwrap()),
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
1024,
Some(vec!["a".to_string(), "c".to_string()]),
Expand Down Expand Up @@ -1083,63 +1148,74 @@ mod tests {
.unwrap();
let batch = reader.next().unwrap().unwrap();

assert_eq!(4, batch.num_columns());
assert_eq!(4, batch.num_rows());

let schema = batch.schema();

let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Int64, a.1.data_type());
let b = schema.column_with_name("b").unwrap();
assert_eq!(
&DataType::List(Box::new(DataType::Float64)),
b.1.data_type()
);
let c = schema.column_with_name("c").unwrap();
assert_eq!(
&DataType::List(Box::new(DataType::Boolean)),
c.1.data_type()
);
let d = schema.column_with_name("d").unwrap();
assert_eq!(&DataType::List(Box::new(DataType::Utf8)), d.1.data_type());

let bb = batch
.column(b.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let bb = bb.values();
let bb = bb.as_any().downcast_ref::<Float64Array>().unwrap();
assert_eq!(10, bb.len());
assert_eq!(4.0, bb.value(9));

let cc = batch
.column(c.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let cc = cc.values();
let cc = cc.as_any().downcast_ref::<BooleanArray>().unwrap();
assert_eq!(6, cc.len());
assert_eq!(false, cc.value(0));
assert_eq!(false, cc.value(3));
assert_eq!(false, cc.is_valid(2));
assert_eq!(false, cc.is_valid(4));

let dd = batch
.column(d.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let dd = dd.values();
let dd = dd.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(7, dd.len());
assert_eq!(false, dd.is_valid(1));
assert_eq!("text", dd.value(2));
assert_eq!("1", dd.value(3));
assert_eq!("false", dd.value(4));
assert_eq!("array", dd.value(5));
assert_eq!("2.4", dd.value(6));
let mut file = File::open("test/data/mixed_arrays.json.gz").unwrap();
let mut reader = BufReader::new(GzDecoder::new(&file));
let schema = infer_json_schema(&mut reader, None).unwrap();
file.seek(SeekFrom::Start(0)).unwrap();

let reader = BufReader::new(GzDecoder::new(&file));
let mut reader = Reader::from_buf_reader(reader, schema, 64, None);
let batch_gz = reader.next().unwrap().unwrap();

for batch in vec![batch, batch_gz] {
assert_eq!(4, batch.num_columns());
assert_eq!(4, batch.num_rows());

let schema = batch.schema();

let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Int64, a.1.data_type());
let b = schema.column_with_name("b").unwrap();
assert_eq!(
&DataType::List(Box::new(DataType::Float64)),
b.1.data_type()
);
let c = schema.column_with_name("c").unwrap();
assert_eq!(
&DataType::List(Box::new(DataType::Boolean)),
c.1.data_type()
);
let d = schema.column_with_name("d").unwrap();
assert_eq!(&DataType::List(Box::new(DataType::Utf8)), d.1.data_type());

let bb = batch
.column(b.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let bb = bb.values();
let bb = bb.as_any().downcast_ref::<Float64Array>().unwrap();
assert_eq!(10, bb.len());
assert_eq!(4.0, bb.value(9));

let cc = batch
.column(c.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let cc = cc.values();
let cc = cc.as_any().downcast_ref::<BooleanArray>().unwrap();
assert_eq!(6, cc.len());
assert_eq!(false, cc.value(0));
assert_eq!(false, cc.value(3));
assert_eq!(false, cc.is_valid(2));
assert_eq!(false, cc.is_valid(4));

let dd = batch
.column(d.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let dd = dd.values();
let dd = dd.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(7, dd.len());
assert_eq!(false, dd.is_valid(1));
assert_eq!("text", dd.value(2));
assert_eq!("1", dd.value(3));
assert_eq!("false", dd.value(4));
assert_eq!("array", dd.value(5));
assert_eq!("2.4", dd.value(6));
}
}

#[test]
Expand Down Expand Up @@ -1303,4 +1379,26 @@ mod tests {

assert_eq!(vec![5, 5, 2], num_records);
}

#[test]
fn test_json_infer_schema() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::List(Box::new(DataType::Float64)), true),
Field::new("c", DataType::List(Box::new(DataType::Boolean)), true),
Field::new("d", DataType::List(Box::new(DataType::Utf8)), true),
]);

let mut reader =
BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap();

assert_eq!(inferred_schema, Arc::new(schema.clone()));

let file = File::open("test/data/mixed_arrays.json.gz").unwrap();
let mut reader = BufReader::new(GzDecoder::new(&file));
let inferred_schema = infer_json_schema(&mut reader, None).unwrap();

assert_eq!(inferred_schema, Arc::new(schema));
}
}
Binary file added rust/arrow/test/data/mixed_arrays.json.gz
Binary file not shown.