Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvements #72

Merged
merged 20 commits into from
Feb 22, 2020
Merged

improvements #72

merged 20 commits into from
Feb 22, 2020

Conversation

Max-Meldrum
Copy link
Member

@Max-Meldrum Max-Meldrum commented Feb 20, 2020

turned into a bit of a jumbo PR. Anyways this PR adds the following:

  • Adds batching to Channel Strategies (closes Add batching at the channel level #68)
  • Removed ChannelStrategy trait object in favour of enum dispatch
  • Add docs to the more stable parts
  • Fix flight serde and add benches for it.
  • Add SourceContext that all sources can use. (closes Add Stream source macro/abstraction #33)
  • Clean sources and rework them to use the SourceContext
  • Remove unused deps
  • Add experiments lib containing a simple pipeline used to profile arcon..

These are some results running perf binary in the experiments lib (on my laptop).
The old version does not have the updated kompact where the heap allocation issue is fixed.
There are a lot settings that can be adjusted in perf, but this was executed with the defaults.
10M total elements, 1024 batch size for the new version.

Arcon Ver Execution Time Throughput Type
Old 5774 ms 2.9M Not Pinned
Old 7947 ms 2.1M Pinned
New 298 ms 28M Not Pinned
New 186 ms 33M Pinned

There are stil many things that can be improved and cleaned. But we are making progress :)

🍺

let output: proc_macro2::TokenStream = {
quote! {
#[derive(Clone, Abomonation)]
#item
impl #impl_generics ArconType for #name #ty_generics #where_clause {}
impl ::std::hash::Hash for #name {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly feel that we should have our own trait for getting a key out of a data type rather than (ab)using Hash. This makes the type unusable as a hashmap key in user code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could do that. I think I forced Hash upon the ArconType to make it work with ChannelStrategy enum. As the KeyBy strategy requires it to have Hash.

Btw, why would it make it unusable to use in a hashmap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't know if it would become unusable for real, but I'm worried that this may cause some problem. There may not actually be a huge problem with hashmap, because the user wouldn't probably use that type as a key, but imagine wanting to store values of that type in a hashset. Narrowing the hash impl to use only one field may introduce unnecessary collisions there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open an issue about a custom trait for extracting keys. But merging this for now.

tmp_buf.put_slice(&bytes);
if let Some((msg, _)) = unsafe { abomonation::decode::<ArconMessage<A>>(&mut tmp_buf) }
{
Ok(msg.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary clone

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abomonation::decode returns a reference, need to clone in order to return an owned object. But, that whole function is a bit of a mess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nevermind. I'm used to deserialization copying stuff and msg was the only used in one place :P

/// A batch size indicating when the channel should flush data
batch_size: usize,
/// A counter keeping track of how many events there are in the buffer
buffer_counter: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't buffer.len() enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good point.

pub struct ArconMessage<A: ArconType> {
pub event: ArconEvent<A>,
/// Buffer of ArconEvents
pub events: Vec<ArconEvent<A>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if using smallvec or something would make sense here as an optimization for the case where no batching is done - that would avoid an allocation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, could be worth looking at. If I remember correctly, I had some issues with custom types and smallvec. But could have been me doing it wrong.

///
/// The `Hash` impl rounds the floats down to an integer and then hashes it.
#[derive(Clone, prost::Message, Abomonation)]
pub struct ArconF32 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, the wrapper wouldn't be needed if we had our own trait for extracting the key.

Channel::Local(actor_ref) => {
actor_ref.tell(message);
}
Channel::Remote(actor_path, flight_serde, dispatcher_source) => match &flight_serde {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can rewrite this to

Channel::Remote(actor_path, FlightSerde::Unsafe, dispatcher_source) => { /*...*/ }
Channel::Remote(actor_path, FlightSerde::Reliable, dispatcher_source) => { /*...*/ }

to reduce nesting

/// Determines how the `Operator` processes Watermarks
///
/// The function either returns None or a Vec of ArconEvents (e.g., Window Computation)
fn handle_watermark(&mut self, watermark: Watermark) -> Option<Vec<ArconEvent<OUT>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a semantic difference between None and an empty Vec here? Because if that's just an optimization, it's an unnecessary one, as rust docs specifically mention that Vec::new() does not allocate :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not really. Just thought it was nicer to return a None rather than a Vec.

But then again, the whole Operator trait is a bit confusing. handle_element is now taking a ChannelStrategy parameter rather than returning in most cases a single element using vec![elem]. I left the others as they were as they are not called as often..

TLDR: The Operator trait needs some more love.

@Max-Meldrum Max-Meldrum merged commit 772cfb8 into cda-group:master Feb 22, 2020
@Max-Meldrum Max-Meldrum deleted the channel_batching branch April 30, 2020 20:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add batching at the channel level Add Stream source macro/abstraction
2 participants