Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions datafusion/src/row/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ pub fn write_batch_unchecked(
let mut writer = RowWriter::new(&schema);
let mut current_offset = offset;
let mut offsets = vec![];
let columns = batch.columns();
for cur_row in row_idx..batch.num_rows() {
offsets.push(current_offset);
let row_width = write_row(&mut writer, cur_row, batch);
let row_width = write_row(&mut writer, cur_row, &schema, columns);
output[current_offset..current_offset + row_width]
.copy_from_slice(writer.get_row());
current_offset += row_width;
Expand Down Expand Up @@ -106,6 +107,7 @@ pub fn write_batch_unchecked_jit(

#[cfg(feature = "jit")]
/// bench interpreted version write
#[inline(never)]
pub fn bench_write_batch(
batches: &[Vec<RecordBatch>],
schema: Arc<Schema>,
Expand All @@ -114,8 +116,9 @@ pub fn bench_write_batch(
let mut lengths = vec![];

for batch in batches.iter().flatten() {
let columns = batch.columns();
for cur_row in 0..batch.num_rows() {
let row_width = write_row(&mut writer, cur_row, batch);
let row_width = write_row(&mut writer, cur_row, &schema, columns);
lengths.push(row_width);
writer.reset()
}
Expand All @@ -126,6 +129,7 @@ pub fn bench_write_batch(

#[cfg(feature = "jit")]
/// bench jit version write
#[inline(never)]
pub fn bench_write_batch_jit(
batches: &[Vec<RecordBatch>],
schema: Arc<Schema>,
Expand Down Expand Up @@ -337,26 +341,19 @@ impl RowWriter {
}

/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width
fn write_row(row: &mut RowWriter, row_idx: usize, batch: &RecordBatch) -> usize {
fn write_row(
row: &mut RowWriter,
row_idx: usize,
schema: &Arc<Schema>,
columns: &[ArrayRef],
) -> usize {
// Get the row from the batch denoted by row_idx
if row.null_free {
for ((i, f), col) in batch
.schema()
.fields()
.iter()
.enumerate()
.zip(batch.columns().iter())
{
for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) {
write_field(i, row_idx, col, f.data_type(), row);
}
} else {
for ((i, f), col) in batch
.schema()
.fields()
.iter()
.enumerate()
.zip(batch.columns().iter())
{
for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) {
if !col.is_null(row_idx) {
row.set_non_null_at(i);
write_field(i, row_idx, col, f.data_type(), row);
Expand Down