Skip to content

Conversation

@tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

Simplifies the logic in #6049 to make it a bit easier to see what is actually going on.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Apr 28, 2023
)?))
}
let schema = self.schema.clone();
let state = (data, self.batches[partition % batch_count].clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous logic had a special case for if the partitioning matched, this would effectively save an atomic increment per batch. Given we are polling a dyn Stream here, I am very skeptical there is any performance difference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a performance difference of batch_count * acquire_lock, if this is OK, we can move on to this. Do you think it would affect the benchmarks? I am not quite familiar with that part.

let schema = self.schema.clone();
let state = (data, self.batches[partition % batch_count].clone());

let stream = futures::stream::unfold(state, |mut state| async move {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfold is incredibly useful. The only downside is you can't name the type, at least until we get existential types

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I get what you mean, if we do not hold the async lock in the state as acquired, the folding becomes possible. This substantially shrinks the code size, cool pattern.

Copy link
Contributor Author

@tustvold tustvold Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could easily hold the lock in the state if we wanted to, I just didn't think it warranted the added complexity. You could definitely do something like

let stream = futures::stream::unfold(state, |mut state| async move {
    let locked = state.1.write_owned().await;
    loop {
        let batch = match state.0.next().await {
            Some(Ok(batch)) => batch,
            Some(Err(e)) => return Some((Err(e), state)),
            None => return None,
        };
        locked.push(batch)
    }
});

Or even

let stream = futures::stream::unfold(state, |mut state| async move {
    let locked = state.1.write().await;
    loop {
        let batch = match state.0.next().await {
            Some(Ok(batch)) => batch,
            Some(Err(e)) => {
                drop(locked);
                return Some((Err(e), state))
            }
            None => {
                drop(locked);
                return None
            }
        };
        locked.push(batch)
    }
});

Ultimately an uncontended lock is not going to matter to performance, unless in a hot loop with no other branches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, both implementations are neat (partitions matching & not matching). Thanks for the PR.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold

This looks like a great simplification to me. cc @metesynnada and @ozankabak

@ozankabak
Copy link
Contributor

Will take a look and measure if the special casing is really not useful or not

@alamb
Copy link
Contributor

alamb commented Apr 30, 2023

Will take a look and measure if the special casing is really not useful or not

@ozankabak have you had a chance to review the performance of this PR? If so, is it acceptable to merge?

@ozankabak
Copy link
Contributor

Will do tomorrow and let you know

@ozankabak
Copy link
Contributor

We did the benchmarks, @metesynnada will share them here today

@metesynnada
Copy link
Contributor

TLDR; it is recommended to retain both versions of the lock mechanism to accommodate both scenarios where the partitions match and where they do not match.

So, we ran a benchmark to see how two different locking strategies perform when processing a stream of RecordBatch objects. Here are the two functions we compared:

  1. lock_only_once:
fn lock_only_once(schema: SchemaRef, state: (SendableRecordBatchStream, Arc<RwLock<Vec<RecordBatch>>>)) -> SendableRecordBatchStream{
    let iter = futures::stream::unfold(state, |mut state| async move {
        let mut locked = state.1.write().await;
        loop {
            let batch = match state.0.next().await {
                Some(Ok(batch)) => batch,
                Some(Err(e)) => {
                    drop(locked);
                    return Some((Err(e), state))
                }
                None => {
                    drop(locked);
                    return None
                }
            };
            locked.push(batch)
        }
    });
    Box::pin(RecordBatchStreamAdapter::new(schema, iter))
}
  1. lock_multiple:
fn lock_multiple(schema: SchemaRef, state: (SendableRecordBatchStream, Arc<RwLock<Vec<RecordBatch>>>)) -> SendableRecordBatchStream{
    let iter = futures::stream::unfold(state, |mut state| async move {
        loop {
            let batch = match state.0.next().await {
                Some(Ok(batch)) => batch,
                Some(Err(e)) => return Some((Err(e), state)),
                None => return None,
            };
            state.1.write().await.push(batch)
        }
    });
    Box::pin(RecordBatchStreamAdapter::new(schema, iter))
}

We tested these functions with an input size of 10,000, and here's what we found:

Multiple lock strategy (lock_multiple function):

  • Mean execution time: 545.97 µs
  • Range: [544.78 µs, 547.26 µs]
  • A couple of outliers: 2 (2.00%) high mild

Single lock strategy (lock_only_once function):

  • Mean execution time: 330.21 µs
  • Range: [329.53 µs, 330.94 µs]
  • A few outliers: 7 (7.00%) in total
    • 1 (1.00%) low mild
    • 3 (3.00%) high mild
    • 3 (3.00%) high severe

So, it looks like the "Single lock" strategy (using the lock_only_once function) works better than the "Multiple lock" strategy (using the lock_multiple function) for the given input size of 10,000, as we expected. The "Single lock" strategy takes about 39.5% less time to complete compared to the "Multiple lock" strategy.

@tustvold
Copy link
Contributor Author

tustvold commented May 2, 2023

I can add the lock multiple approach if people feel strongly, although at 20ns per row that seems unlikely this would be visible in any practical workload - I wish our kernels were a similar order of magnitude 😅

@ozankabak
Copy link
Contributor

Thank you! Since the code difference is small thanks to how you are leveraging unfold, let's add it and then this is good to go from our perspective.

@alamb
Copy link
Contributor

alamb commented May 4, 2023

I made the change suggested by @metesynnada by #6154 (comment) in 01fa417 and merged up from main to try and nudge this PR along.

Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com>
@ozankabak
Copy link
Contributor

Seems like we have a whitespace issue making cargo fmt fail

@alamb alamb merged commit 6118a93 into apache:main May 4, 2023
@alamb
Copy link
Contributor

alamb commented May 4, 2023

Thanks @metesynnada @tustvold and @ozankabak

Here is another PR that I think simplifies and speeds up this code even more: #6236

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants