Skip to content

Commit

Permalink
Implement function slice for RecordBatch (#490)
Browse files Browse the repository at this point in the history
* Implement RecordBatch::slice()

* optimize

* optimize

* add test case

* fix clippy
  • Loading branch information
b41sh committed Jun 25, 2021
1 parent 7bea1f6 commit de62168
Showing 1 changed file with 84 additions and 7 deletions.
91 changes: 84 additions & 7 deletions arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,31 @@ impl RecordBatch {
&self.columns[..]
}

/// Return a new RecordBatch where each column is sliced
/// according to `offset` and `length`
///
/// # Panics
///
/// Panics if `offset` with `length` is greater than column length.
pub fn slice(&self, offset: usize, length: usize) -> RecordBatch {
if self.schema.fields().is_empty() {
assert!((offset + length) == 0);
return RecordBatch::new_empty(self.schema.clone());
}
assert!((offset + length) <= self.num_rows());

let columns = self
.columns()
.iter()
.map(|column| column.slice(offset, length))
.collect();

Self {
schema: self.schema.clone(),
columns,
}
}

/// Create a `RecordBatch` from an iterable list of pairs of the
/// form `(field_name, array)`, with the same requirements on
/// fields and arrays as [`RecordBatch::try_new`]. This method is
Expand Down Expand Up @@ -414,16 +439,68 @@ mod tests {
let record_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
.unwrap();
check_batch(record_batch)
check_batch(record_batch, 5)
}

fn check_batch(record_batch: RecordBatch) {
assert_eq!(5, record_batch.num_rows());
fn check_batch(record_batch: RecordBatch, num_rows: usize) {
assert_eq!(num_rows, record_batch.num_rows());
assert_eq!(2, record_batch.num_columns());
assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type());
assert_eq!(5, record_batch.column(0).data().len());
assert_eq!(5, record_batch.column(1).data().len());
assert_eq!(num_rows, record_batch.column(0).data().len());
assert_eq!(num_rows, record_batch.column(1).data().len());
}

#[test]
#[should_panic(expected = "assertion failed: (offset + length) <= self.num_rows()")]
fn create_record_batch_slice() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]);
let expected_schema = schema.clone();

let a = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let b = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "h", "i"]);

let record_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
.unwrap();

let offset = 2;
let length = 5;
let record_batch_slice = record_batch.slice(offset, length);

assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
check_batch(record_batch_slice, 5);

let offset = 2;
let length = 0;
let record_batch_slice = record_batch.slice(offset, length);

assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
check_batch(record_batch_slice, 0);

let offset = 2;
let length = 10;
let _record_batch_slice = record_batch.slice(offset, length);
}

#[test]
#[should_panic(expected = "assertion failed: (offset + length) == 0")]
fn create_record_batch_slice_empty_batch() {
let schema = Schema::new(vec![]);

let record_batch = RecordBatch::new_empty(Arc::new(schema));

let offset = 0;
let length = 0;
let record_batch_slice = record_batch.slice(offset, length);
assert_eq!(0, record_batch_slice.schema().fields().len());

let offset = 1;
let length = 2;
let _record_batch_slice = record_batch.slice(offset, length);
}

#[test]
Expand All @@ -445,7 +522,7 @@ mod tests {
Field::new("b", DataType::Utf8, false),
]);
assert_eq!(record_batch.schema().as_ref(), &expected_schema);
check_batch(record_batch);
check_batch(record_batch, 5);
}

#[test]
Expand All @@ -465,7 +542,7 @@ mod tests {
Field::new("b", DataType::Utf8, true),
]);
assert_eq!(record_batch.schema().as_ref(), &expected_schema);
check_batch(record_batch);
check_batch(record_batch, 5);
}

#[test]
Expand Down

0 comments on commit de62168

Please sign in to comment.