Skip to content

Commit

Permalink
revert to columnar reads
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent c3f71d7 commit 5ce3086
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 72 deletions.
203 changes: 137 additions & 66 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

use parquet::basic;
use parquet::column::reader::*;
use parquet::data_type::ByteArray;
use parquet::file::reader::*;
use parquet::record::{Row, RowAccessor};
use parquet::schema::types::Type;

use crate::datasource::{RecordBatchIterator, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder};
use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder};

pub struct ParquetTable {
filename: String,
Expand All @@ -54,7 +55,6 @@ impl ParquetTable {
}

impl Table for ParquetTable {

fn schema(&self) -> &Arc<Schema> {
&self.schema
}
Expand All @@ -78,10 +78,13 @@ pub struct ParquetFile {
projection: Vec<usize>,
batch_size: usize,
current_row_group: Option<Box<RowGroupReader>>,
column_readers: Vec<ColumnReader>,
}

impl ParquetFile {
pub fn open(file: File, projection: Option<Vec<usize>>) -> Result<Self> {
println!("open()");

let reader = SerializedFileReader::new(file).unwrap();

let metadata = reader.metadata();
Expand All @@ -103,13 +106,22 @@ impl ParquetFile {
}
};

let projected_fields: Vec<Field> = projection
.iter()
.map(|i| schema.fields()[*i].clone())
.collect();

let projected_schema = Arc::new(Schema::new(projected_fields));
println!("projected schema: {:?}", projected_schema);

Ok(ParquetFile {
reader: reader,
row_group_index: 0,
schema: Arc::new(schema),
schema: projected_schema,
projection,
batch_size: 64 * 1024,
current_row_group: None,
column_readers: vec![],
})
}
_ => Err(ExecutionError::General(
Expand All @@ -120,7 +132,17 @@ impl ParquetFile {

fn load_next_row_group(&mut self) {
if self.row_group_index < self.reader.num_row_groups() {
//println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups());
let reader = self.reader.get_row_group(self.row_group_index).unwrap();

self.column_readers = vec![];

for i in &self.projection {
//TODO validate index in bounds
self.column_readers
.push(reader.get_column_reader(*i).unwrap());
}

self.current_row_group = Some(reader);
self.row_group_index += 1;
} else {
Expand All @@ -129,86 +151,134 @@ impl ParquetFile {
}

fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
println!("load_batch()");
match &self.current_row_group {
Some(reader) => {
// read batch of rows into memory
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(reader.num_columns());
let mut row_count = 0;
for i in 0..self.column_readers.len() {
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (BOOL)".to_string(),
));
}
ColumnReader::Int32ColumnReader(ref mut r) => {
let mut read_buffer: Vec<i32> =
Vec::with_capacity(self.batch_size);

// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect();
for _ in 0..self.batch_size {
read_buffer.push(0);
}

let mut row_iter = reader.get_row_iter(None).unwrap(); //TODO projection push down
let mut rows: Vec<Row> = Vec::with_capacity(self.batch_size);
while let Some(row) = row_iter.next() {
if rows.len() == self.batch_size {
break;
}
rows.push(row);
}
println!("Loaded {} rows into memory", rows.len());

// convert to columnar batch
let mut batch: Vec<Arc<Array>> =
Vec::with_capacity(self.projection.len());
for i in &self.projection {
let array: Arc<Array> = match self.schema.field(*i).data_type() {
DataType::Int32 => {
let mut builder = Int32Builder::new(rows.len());
for row in &rows {
//TODO null handling
builder.append_value(row.get_int(*i).unwrap()).unwrap();
match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
println!("Read {} rows", count);
let mut builder = Int32Builder::new(count);
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Arc::new(builder.finish())
}
_ => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
)));
}
}
Arc::new(builder.finish())
}
DataType::Float32 => {
let mut builder = Float32Builder::new(rows.len());
for row in &rows {
//TODO null handling
builder.append_value(row.get_float(*i).unwrap()).unwrap();
}
Arc::new(builder.finish())
ColumnReader::Int64ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT64)".to_string(),
));
}
DataType::Float64 => {
let mut builder = Float64Builder::new(rows.len());
for row in &rows {
//TODO null handling
builder
.append_value(row.get_double(*i).unwrap())
.unwrap();
}
Arc::new(builder.finish())
ColumnReader::Int96ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT96)".to_string(),
));
}
DataType::Utf8 => {
let mut builder = BinaryBuilder::new(rows.len());
for row in &rows {
//TODO null handling
let bytes = row.get_bytes(*i).unwrap();
builder
.append_string(
&String::from_utf8(bytes.data().to_vec())
.unwrap(),
)
.unwrap();
ColumnReader::FloatColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FLOAT)".to_string(),
));
}
ColumnReader::DoubleColumnReader(ref mut r) => {
let mut builder = Float64Builder::new(self.batch_size);
let mut read_buffer: Vec<f64> =
Vec::with_capacity(self.batch_size);
match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
builder.append_slice(&read_buffer).unwrap();
row_count = count;
Arc::new(builder.finish())
}
_ => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
)));
}
}
Arc::new(builder.finish())
}
other => {
return Err(ExecutionError::NotImplemented(format!(
"unsupported column reader type ({:?})",
other
)));
ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FixedLenByteArray)"
.to_string(),
));
}
ColumnReader::ByteArrayColumnReader(ref mut r) => {
let mut b: Vec<ByteArray> =
Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
b.push(ByteArray::default());
}
match r.read_batch(self.batch_size, None, None, &mut b) {
//TODO this isn't handling null values
Ok((count, _)) => {
row_count = count;
//TODO this is horribly inefficient
let mut builder = BinaryBuilder::new(row_count);
for j in 0..row_count {
let foo = b[j].slice(0, b[j].len());
let bytes: &[u8] = foo.data();
let str =
String::from_utf8(bytes.to_vec()).unwrap();
builder.append_string(&str).unwrap();
}
Arc::new(builder.finish())
}
_ => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
)));
}
}
}
};

println!("Adding array to batch");
batch.push(array);
}

println!("Loaded batch of {} rows", rows.len());
println!("Loaded batch of {} rows", row_count);

if rows.len() == 0 {
if row_count == 0 {
Ok(None)
} else {
Ok(Some(RecordBatch::try_new(
self.schema.projection(&self.projection)?,
batch,
)?))
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
}
}
_ => Ok(None),
Expand Down Expand Up @@ -342,6 +412,7 @@ mod tests {
}

fn load_table(name: &str) -> Box<Table> {
println!("load_table");
let testdata = env::var("PARQUET_TEST_DATA").unwrap();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::new(&filename);
Expand Down
16 changes: 10 additions & 6 deletions rust/datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ extern crate datafusion;
use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema};

use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::Table;
use datafusion::execution::context::ExecutionContext;
use datafusion::execution::relation::Relation;
use datafusion::datasource::parquet::ParquetFile;
use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{RecordBatchIterator, Table};

const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;

#[test]
fn parquet_query() {
let mut ctx = ExecutionContext::new();
ctx.register_table("alltypes_plain", load_parquet_table("alltypes_plain.parquet"));
ctx.register_table(
"alltypes_plain",
load_parquet_table("alltypes_plain.parquet"),
);
let sql = "SELECT id, string_col FROM alltypes_plain";
let actual = execute(&mut ctx, sql);
let expected = "tbd".to_string();
let expected = "4\t\"0\"\n5\t\"1\"\n6\t\"0\"\n7\t\"1\"\n2\t\"0\"\n3\t\"1\"\n0\t\"0\"\n1\t\"1\"\n".to_string();
assert_eq!(expected, actual);
}

Expand Down Expand Up @@ -187,7 +189,9 @@ fn load_parquet_table(name: &str) -> Rc<Table> {

/// Execute query and return result set as tab delimited string
fn execute(ctx: &mut ExecutionContext, sql: &str) -> String {
let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap();
let plan = ctx.create_logical_plan(&sql).unwrap();
println!("Plan: {:?}", plan);
let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap();
result_str(&results)
}

Expand Down

0 comments on commit 5ce3086

Please sign in to comment.