Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions datafusion/core/src/physical_plan/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ struct SortPreservingMergeStream<C> {
/// the path from bottom to top is visited, keeping the number of
/// comparisons close to the theoretical limit of `log(S)`.
///
/// The current implementation uses a vector to store the tree.
/// Conceptually, it looks like this (assuming 8 streams):
///
/// ```text
/// 0 (winner)
///
/// 1
/// / \
/// 2 3
/// / \ / \
/// 4 5 6 7
/// ```
///
/// Where element at index 0 in the vector is the current winner. Element
/// at index 1 is the root of the loser tree, element at index 2 is the
/// left child of the root, and element at index 3 is the right child of
/// the root and so on.
///
/// reference: <https://en.wikipedia.org/wiki/K-way_merge_algorithm#Tournament_Tree>
loser_tree: Vec<usize>,

Expand Down Expand Up @@ -242,22 +260,61 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
}
}

/// Find the leaf node index in the loser tree for the given cursor index
///
/// Note that this is not necessarily a leaf node in the tree, but it can
/// also be a half-node (a node with only one child). This happens when the
/// number of cursors/streams is not a power of two. Thus, the loser tree
/// will be unbalanced, but it will still work correctly.
///
/// For example, with 5 streams, the loser tree will look like this:
///
/// ```text
/// 0 (winner)
///
/// 1
/// / \
/// 2 3
/// / \ / \
/// 4 | | |
/// / \ | | |
/// -+---+--+---+---+---- Below is not a part of loser tree
/// S3 S4 S0 S1 S2
/// ```
///
/// S0, S1, ... S4 are the streams (read: stream at index 0, stream at
/// index 1, etc.)
///
/// Zooming in at node 2 in the loser tree as an example, we can see that
/// it takes as input the next item at (S0) and the loser of (S3, S4).
///
#[inline]
fn lt_leaf_node_index(&self, cursor_index: usize) -> usize {
(self.cursors.len() + cursor_index) / 2
}

/// Find the parent node index for the given node index
#[inline]
fn lt_parent_node_index(&self, node_idx: usize) -> usize {
node_idx / 2
}

/// Attempts to initialize the loser tree with one value from each
/// non exhausted input, if possible
fn init_loser_tree(&mut self) {
// Init loser tree
self.loser_tree = vec![usize::MAX; self.cursors.len()];
for i in 0..self.cursors.len() {
let mut winner = i;
let mut cmp_node = (self.cursors.len() + i) / 2;
let mut cmp_node = self.lt_leaf_node_index(i);
while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX {
let challenger = self.loser_tree[cmp_node];
if self.is_gt(winner, challenger) {
self.loser_tree[cmp_node] = winner;
winner = challenger;
}

cmp_node /= 2;
cmp_node = self.lt_parent_node_index(cmp_node);
}
self.loser_tree[cmp_node] = winner;
}
Expand All @@ -268,14 +325,14 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
fn update_loser_tree(&mut self) {
let mut winner = self.loser_tree[0];
// Replace overall winner by walking tree of losers
let mut cmp_node = (self.cursors.len() + winner) / 2;
let mut cmp_node = self.lt_leaf_node_index(winner);
while cmp_node != 0 {
let challenger = self.loser_tree[cmp_node];
if self.is_gt(winner, challenger) {
self.loser_tree[cmp_node] = winner;
winner = challenger;
}
cmp_node /= 2;
cmp_node = self.lt_parent_node_index(cmp_node);
}
self.loser_tree[0] = winner;
self.loser_tree_adjusted = true;
Expand Down