diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 4447fbdc55932..cb8597febf525 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -35,7 +35,9 @@ use parquet::schema::types::Type; use crate::datasource::{RecordBatchIterator, Table}; use crate::execution::error::{ExecutionError, Result}; -use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder}; +use arrow::builder::BooleanBuilder; +use arrow::builder::Int64Builder; +use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder}; pub struct ParquetTable { filename: String, @@ -112,7 +114,6 @@ impl ParquetFile { .collect(); let projected_schema = Arc::new(Schema::new(projected_fields)); - println!("projected schema: {:?}", projected_schema); Ok(ParquetFile { reader: reader, @@ -132,7 +133,6 @@ impl ParquetFile { fn load_next_row_group(&mut self) { if self.row_group_index < self.reader.num_row_groups() { - //println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups()); let reader = self.reader.get_row_group(self.row_group_index).unwrap(); self.column_readers = vec![]; @@ -151,17 +151,40 @@ impl ParquetFile { } fn load_batch(&mut self) -> Result> { - println!("load_batch()"); match &self.current_row_group { Some(reader) => { let mut batch: Vec> = Vec::with_capacity(reader.num_columns()); let mut row_count = 0; for i in 0..self.column_readers.len() { let array: Arc = match self.column_readers[i] { - ColumnReader::BoolColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (BOOL)".to_string(), - )); + ColumnReader::BoolColumnReader(ref mut r) => { + let mut read_buffer: Vec = + Vec::with_capacity(self.batch_size); + + for _ in 0..self.batch_size { + read_buffer.push(false); + } + + match r.read_batch( + self.batch_size, + None, + None, + &mut read_buffer, + ) { + //TODO this isn't handling null values + Ok((count, _)) => { + let mut builder = BooleanBuilder::new(count); + builder.append_slice(&read_buffer[0..count]).unwrap(); + row_count = count; + Arc::new(builder.finish()) + } + Err(e) => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {}): {:?}", + i, e + ))); + } + } } ColumnReader::Int32ColumnReader(ref mut r) => { let mut read_buffer: Vec = @@ -179,39 +202,87 @@ impl ParquetFile { ) { //TODO this isn't handling null values Ok((count, _)) => { - println!("Read {} rows", count); let mut builder = Int32Builder::new(count); builder.append_slice(&read_buffer[0..count]).unwrap(); row_count = count; Arc::new(builder.finish()) } - _ => { + Err(e) => { return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {})", - i + "Error reading parquet batch (column {}): {:?}", + i, e ))); } } } - ColumnReader::Int64ColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (INT64)".to_string(), - )); + ColumnReader::Int64ColumnReader(ref mut r) => { + let mut read_buffer: Vec = + Vec::with_capacity(self.batch_size); + + for _ in 0..self.batch_size { + read_buffer.push(0); + } + + match r.read_batch( + self.batch_size, + None, + None, + &mut read_buffer, + ) { + //TODO this isn't handling null values + Ok((count, _)) => { + let mut builder = Int64Builder::new(count); + builder.append_slice(&read_buffer[0..count]).unwrap(); + row_count = count; + Arc::new(builder.finish()) + } + Err(e) => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {}): {:?}", + i, e + ))); + } + } } ColumnReader::Int96ColumnReader(ref mut _r) => { return Err(ExecutionError::NotImplemented( "unsupported column reader type (INT96)".to_string(), )); } - ColumnReader::FloatColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (FLOAT)".to_string(), - )); + ColumnReader::FloatColumnReader(ref mut r) => { + let mut builder = Float32Builder::new(self.batch_size); + let mut read_buffer: Vec = + Vec::with_capacity(self.batch_size); + for _ in 0..self.batch_size { + read_buffer.push(0.0); + } + match r.read_batch( + self.batch_size, + None, + None, + &mut read_buffer, + ) { + //TODO this isn't handling null values + Ok((count, _)) => { + builder.append_slice(&read_buffer[0..count]).unwrap(); + row_count = count; + Arc::new(builder.finish()) + } + Err(e) => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {}): {:?}", + i, e + ))); + } + } } ColumnReader::DoubleColumnReader(ref mut r) => { let mut builder = Float64Builder::new(self.batch_size); let mut read_buffer: Vec = Vec::with_capacity(self.batch_size); + for _ in 0..self.batch_size { + read_buffer.push(0.0); + } match r.read_batch( self.batch_size, None, @@ -220,14 +291,14 @@ impl ParquetFile { ) { //TODO this isn't handling null values Ok((count, _)) => { - builder.append_slice(&read_buffer).unwrap(); + builder.append_slice(&read_buffer[0..count]).unwrap(); row_count = count; Arc::new(builder.finish()) } - _ => { + Err(e) => { return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {})", - i + "Error reading parquet batch (column {}): {:?}", + i, e ))); } } @@ -251,11 +322,13 @@ impl ParquetFile { //TODO this is horribly inefficient let mut builder = BinaryBuilder::new(row_count); for j in 0..row_count { - let foo = b[j].slice(0, b[j].len()); - let bytes: &[u8] = foo.data(); - let str = - String::from_utf8(bytes.to_vec()).unwrap(); - builder.append_string(&str).unwrap(); + let slice = b[j].slice(0, b[j].len()); + builder + .append_string( + &String::from_utf8(slice.data().to_vec()) + .unwrap(), + ) + .unwrap(); } Arc::new(builder.finish()) } @@ -354,11 +427,43 @@ impl RecordBatchIterator for ParquetFile { #[cfg(test)] mod tests { use super::*; + use arrow::array::BooleanArray; + use arrow::array::Float32Array; + use arrow::array::Float64Array; + use arrow::array::Int64Array; use arrow::array::{BinaryArray, Int32Array}; use std::env; #[test] - fn read_read_i32_column() { + fn read_bool_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![1]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan.borrow_mut(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[true, false, true, false, true, false, true, false]", + format!("{:?}", values) + ); + } + + #[test] + fn read_i32_alltypes_plain_parquet() { let table = load_table("alltypes_plain.parquet"); let projection = Some(vec![0]); @@ -383,7 +488,63 @@ mod tests { } #[test] - fn read_read_string_column() { + fn read_f32_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![6]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan.borrow_mut(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]", + format!("{:?}", values) + ); + } + + #[test] + fn read_f64_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![7]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan.borrow_mut(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]", + format!("{:?}", values) + ); + } + + #[test] + fn read_utf8_alltypes_plain_parquet() { let table = load_table("alltypes_plain.parquet"); let projection = Some(vec![9]); @@ -400,7 +561,7 @@ mod tests { .downcast_ref::() .unwrap(); let mut values: Vec = vec![]; - for i in 0..8 { + for i in 0..batch.num_rows() { let str: String = String::from_utf8(array.value(i).to_vec()).unwrap(); values.push(str); } @@ -411,12 +572,39 @@ mod tests { ); } + #[test] + fn read_int64_nullable_impala_parquet() { + let table = load_table("nullable.impala.parquet"); + + let projection = Some(vec![0]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan.borrow_mut(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(7, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!("[1, 2, 3, 4, 5, 6, 7]", format!("{:?}", values)); + } + fn load_table(name: &str) -> Box { - println!("load_table"); let testdata = env::var("PARQUET_TEST_DATA").unwrap(); let filename = format!("{}/{}", testdata, name); let table = ParquetTable::new(&filename); - println!("{:?}", table.schema()); + println!("Loading file {} with schema:", name); + for field in table.schema().fields() { + println!("\t{:?}", field); + } Box::new(table) } }