Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions datafusion/core/benches/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@

use std::sync::Arc;

use arrow::array::DictionaryArray;
use arrow::datatypes::Int32Type;
use arrow::{
array::{Float64Array, Int64Array, StringArray, UInt64Array},
compute::{self, SortOptions, TakeOptions},
Expand Down Expand Up @@ -113,9 +115,13 @@ const INPUT_SIZE: u64 = 100000;
lazy_static! {
static ref I64_STREAMS: Vec<Vec<RecordBatch>> = i64_streams();
static ref F64_STREAMS: Vec<Vec<RecordBatch>> = f64_streams();
// TODO: add dictionay encoded values

static ref UTF8_LOW_CARDINALITY_STREAMS: Vec<Vec<RecordBatch>> = utf8_low_cardinality_streams();
static ref UTF8_HIGH_CARDINALITY_STREAMS: Vec<Vec<RecordBatch>> = utf8_high_cardinality_streams();

static ref DICTIONARY_STREAMS: Vec<Vec<RecordBatch>> = dictionary_streams();
static ref DICTIONARY_TUPLE_STREAMS: Vec<Vec<RecordBatch>> = dictionary_tuple_streams();
static ref MIXED_DICTIONARY_TUPLE_STREAMS: Vec<Vec<RecordBatch>> = mixed_dictionary_tuple_streams();
// * (string(low), string(low), string(high)) -- tpch q1 + iox
static ref UTF8_TUPLE_STREAMS: Vec<Vec<RecordBatch>> = utf8_tuple_streams();
// * (f64, string, string, int) -- tpch q2
Expand Down Expand Up @@ -154,6 +160,22 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || case.run())
});

c.bench_function("merge utf8 dictionary", |b| {
let case = MergeBenchCase::new(&DICTIONARY_STREAMS);

b.iter(move || case.run())
});

c.bench_function("merge utf8 dictionary tuple", |b| {
let case = MergeBenchCase::new(&DICTIONARY_TUPLE_STREAMS);
b.iter(move || case.run())
});

c.bench_function("merge mixed utf8 dictionary tuple", |b| {
let case = MergeBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS);
b.iter(move || case.run())
});

c.bench_function("merge mixed tuple", |b| {
let case = MergeBenchCase::new(&MIXED_TUPLE_STREAMS);

Expand Down Expand Up @@ -330,6 +352,79 @@ fn mixed_tuple_streams() -> Vec<Vec<RecordBatch>> {
split_batch(batch)
}

/// Create a batch of (utf8_dict)
fn dictionary_streams() -> Vec<Vec<RecordBatch>> {
let mut gen = DataGenerator::new();
let values = gen.utf8_low_cardinality_values();
let dictionary: DictionaryArray<Int32Type> =
values.iter().map(Option::as_deref).collect();

let batch =
RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap();

split_batch(batch)
}

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict)
fn dictionary_tuple_streams() -> Vec<Vec<RecordBatch>> {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.collect();
tuples.sort_unstable();

let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();

let a: DictionaryArray<Int32Type> = a.iter().map(Option::as_deref).collect();
let b: DictionaryArray<Int32Type> = b.iter().map(Option::as_deref).collect();
let c: DictionaryArray<Int32Type> = c.iter().map(Option::as_deref).collect();

let batch = RecordBatch::try_from_iter(vec![
("a", Arc::new(a) as _),
("b", Arc::new(b) as _),
("c", Arc::new(c) as _),
])
.unwrap();

split_batch(batch)
}

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64)
fn mixed_dictionary_tuple_streams() -> Vec<Vec<RecordBatch>> {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.collect();
tuples.sort_unstable();

let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();

let a: DictionaryArray<Int32Type> = a.iter().map(Option::as_deref).collect();
let b: DictionaryArray<Int32Type> = b.iter().map(Option::as_deref).collect();
let c: DictionaryArray<Int32Type> = c.iter().map(Option::as_deref).collect();
let d: Int64Array = d.into_iter().collect();

let batch = RecordBatch::try_from_iter(vec![
("a", Arc::new(a) as _),
("b", Arc::new(b) as _),
("c", Arc::new(c) as _),
("d", Arc::new(d) as _),
])
.unwrap();

split_batch(batch)
}

/// Encapsulates creating data for this test
struct DataGenerator {
rng: StdRng,
Expand Down Expand Up @@ -363,13 +458,15 @@ impl DataGenerator {

/// array of low cardinality (100 distinct) values
fn utf8_low_cardinality_values(&mut self) -> Vec<Option<Arc<str>>> {
let strings = (0..100).map(|s| format!("value{}", s)).collect::<Vec<_>>();
let strings = (0..100)
.map(|s| format!("value{}", s).into())
.collect::<Vec<_>>();

// pick from the 100 strings randomly
let mut input = (0..INPUT_SIZE)
.map(|_| {
let idx = self.rng.gen_range(0..strings.len());
let s = Arc::from(strings[idx].as_str());
let s = Arc::clone(&strings[idx]);
Some(s)
})
.collect::<Vec<_>>();
Expand Down