Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent 639e13e commit 9d3047a
Showing 1 changed file with 38 additions and 109 deletions.
147 changes: 38 additions & 109 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use parquet::file::reader::*;

use crate::datasource::{RecordBatchIterator, ScanResult, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::array::BinaryArray;
use arrow::builder::{BinaryBuilder, Int64Builder};
use parquet::data_type::Int96;
use parquet::reader::schema::parquet_to_arrow_schema;
Expand Down Expand Up @@ -82,60 +81,31 @@ pub struct ParquetFile {
column_readers: Vec<ColumnReader>,
}

fn create_binary_array(b: &Vec<ByteArray>, row_count: usize) -> Result<Arc<BinaryArray>> {
let mut builder = BinaryBuilder::new(b.len());
for i in 0..row_count {
builder.append_string(&String::from_utf8(b[i].data().to_vec()).unwrap())?;
}
Ok(Arc::new(builder.finish()))
}

macro_rules! read_binary_column {
($SELF:ident, $R:ident, $INDEX:expr) => {{
//TODO: should be able to get num_rows in row group instead of defaulting to batch size
let mut read_buffer: Vec<ByteArray> =
Vec::with_capacity($SELF.batch_size);
for _ in 0..$SELF.batch_size {
read_buffer.push(ByteArray::default());
}
if $SELF.projection_schema.field($INDEX).is_nullable() {

let mut def_levels: Vec<i16> = Vec::with_capacity($SELF.batch_size);
for _ in 0..$SELF.batch_size {
def_levels.push(0);
}

let (values_read, levels_read) = $R.read_batch(
$SELF.batch_size,
Some(&mut def_levels),
None,
&mut read_buffer,
)?;
if values_read == levels_read {
create_binary_array(&read_buffer, values_read)?
vec![ByteArray::default(); $SELF.batch_size];
let mut def_levels: Vec<i16> = vec![0; $SELF.batch_size];
let (_, levels_read) = $R.read_batch(
$SELF.batch_size,
Some(&mut def_levels),
None,
&mut read_buffer,
)?;
let mut builder = BinaryBuilder::new(levels_read);
let mut value_index = 0;
for i in 0..levels_read {
if def_levels[i] > 0 {
builder.append_string(
&String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap(),
)?;
value_index += 1;
} else {
let mut builder = BinaryBuilder::new(levels_read);
let mut value_index = 0;
for i in 0..levels_read {
if def_levels[i] > 0 {
builder.append_string(&String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap())?;
value_index += 1;
} else {
builder.append_null()?;
}
}
Arc::new(builder.finish())
builder.append_null()?;
}
} else {
let (values_read, _) = $R.read_batch(
$SELF.batch_size,
None,
None,
&mut read_buffer,
)?;
create_binary_array(&read_buffer, values_read)?
}
}}
Arc::new(builder.finish())
}};
}

trait ArrowReader<T>
Expand Down Expand Up @@ -165,10 +135,7 @@ where
let mut read_buffer: Vec<P::T> = vec![A::default_value().into(); batch_size];

if is_nullable {
let mut def_levels: Vec<i16> = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
def_levels.push(0);
}
let mut def_levels: Vec<i16> = vec![0; batch_size];

let (values_read, levels_read) = self.read_batch(
batch_size,
Expand Down Expand Up @@ -308,67 +275,29 @@ impl ParquetFile {
}
ColumnReader::Int96ColumnReader(ref mut r) => {
let mut read_buffer: Vec<Int96> =
Vec::with_capacity(self.batch_size);
vec![Int96::new(); self.batch_size];

for _ in 0..self.batch_size {
read_buffer.push(Int96::new());
}

if self.projection_schema.field(i).is_nullable() {
let mut def_levels: Vec<i16> =
Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
def_levels.push(0);
}
let (values_read, levels_read) = r.read_batch(
self.batch_size,
Some(&mut def_levels),
None,
&mut read_buffer,
)?;

if values_read == levels_read {
let mut builder = Int64Builder::new(values_read);
for i in 0..values_read {
builder.append_value(convert_int96_timestamp(
read_buffer[i].data(),
))?;
}
Arc::new(builder.finish())
} else {
let mut builder = Int64Builder::new(levels_read);
let mut value_index = 0;
for i in 0..levels_read {
if def_levels[i] > 0 {
builder.append_value(
convert_int96_timestamp(
read_buffer[value_index].data(),
),
)?;
value_index += 1;
} else {
builder.append_null()?;
}
}
Arc::new(builder.finish())
}
} else {
let (values_read, _) = r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
)?;

let mut builder = Int64Builder::new(values_read);

for i in 0..values_read {
let mut def_levels: Vec<i16> = vec![0; self.batch_size];
let (_, levels_read) = r.read_batch(
self.batch_size,
Some(&mut def_levels),
None,
&mut read_buffer,
)?;

let mut builder = Int64Builder::new(levels_read);
let mut value_index = 0;
for i in 0..levels_read {
if def_levels[i] > 0 {
builder.append_value(convert_int96_timestamp(
read_buffer[i].data(),
read_buffer[value_index].data(),
))?;
value_index += 1;
} else {
builder.append_null()?;
}
Arc::new(builder.finish())
}
Arc::new(builder.finish())
}
ColumnReader::FloatColumnReader(ref mut r) => {
ArrowReader::<Float32Type>::read(
Expand Down

0 comments on commit 9d3047a

Please sign in to comment.