Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add step_or_park #260

Open
wants to merge 6 commits into
base: master
from

Conversation

Projects
None yet
3 participants
@frankmcsherry
Copy link
Member

frankmcsherry commented Apr 4, 2019

This PR introduces the step_or_park(Duration) method to a worker.

The design is that the communication fabric allocator now has a method

await_events(&self, duration: Duration)

which is allowed to park the thread for at most duration, or until self.events() has some events to report (these are events that prompt the scheduling of operators). The implementation of this is a no-op by default (not parking the thread), but the single-threaded communicator has an implementation that parks itself if its event queue is empty. The plan is to add these in for other communicators so that eventually all of them behave well.

The worker now has a method

step_or_park(&mut self, duration: Duration) -> bool

which .. drains events from the communicator, and just before it is about to schedule its activations, checks to see if it has any and if not calls in to await_events(duration).

I haven't really thought through the concurrency design here. It is possible that there is a race condition somewhere, but we can take a look at it and see whether it does the right thing in principle.

cc @comnik @ryzhyk @benesch

@benesch

This comment has been minimized.

Copy link
Contributor

benesch commented Apr 4, 2019

🎉

N00b question though: what mandates the timeout? Are there cases where an operator that has work to do won't receive an event and so won't get scheduled, so the timeout is a backup trigger? Or is it just to support operators like the Kafka source where we're responsible for polling an upstream data source at some frequency?

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

frankmcsherry commented Apr 4, 2019

Or is it just to support operators like the Kafka source where we're responsible for polling an upstream data source at some frequency?

More for this. I suppose we could take an Option<Duration> if we wanted to let folks say "I really don't need to be woken up again, thanks!".

Are there cases where an operator that has work to do won't receive an event and so won't get scheduled,

I hope this doesn't happen. The only reason we will visit an operator is if it is in a list of operators to schedule, so if we don't have it in a list we aren't going to find it there when we come back in a few seconds.

@benesch

This comment has been minimized.

Copy link
Contributor

benesch commented Apr 4, 2019

I suppose we could take an Option<Duration> if we wanted to let folks say "I really don't need to be woken up again, thanks!".

Since Materialize will always be polling Kafka (there is absolutely no support for pushing messages to consumers that I'm aware of), it certainly doesn't need such a feature, but I do think taking an Option<Duration> would increase the understandability of the API by just a hair.

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

frankmcsherry commented Apr 4, 2019

Another option, to parallel the two methods in std::thread, would be to have two methods step_or_park and step_or_park_timeout(Duration).

@comnik

This comment has been minimized.

Copy link
Contributor

comnik commented Apr 4, 2019

Option<Duration> would mirror mio's polling API (although I do not know how popular mio is anymore, without any fancy async layers on top...), but I like the std::thread API for clarity. In general I think it would be helpful to support a fully blocking mode.

Polling sources have been the more pressing issue in actual use, but how would a source defer its re-activation with this? Wouldn't there always be an activation pending and therefore no chance for the worker to actually park itself?

I think the scheduling workaround we discussed and implemented for declarative recently also provides a bit more flexibility when working with heterogeneous sources, that might have different polling frequencies? But that can still be added as a layer on top.

Maybe I'm also misunderstanding how this is supposed to be used.

In general: excited for this!

@comnik comnik referenced this pull request Apr 4, 2019

Closed

Expose status of activations #255

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

frankmcsherry commented Apr 4, 2019

I think the intended use is: you have your BinaryHeap<(Duration, Activator)>, and each time around the loop you peek at the front to see how long until the next event in the heap (the duration minus "now"). You would then step_or_park_timeout(that_diff) to make sure you wake up roughly around the time you would want to dequeue that schedule event.

Alternately, you could have another thread driving the wake-ups, but this is how you could do it with just this one thread. I think, right?

Edit: And polling sources would cease re-activating themselves, and use the mechanisms you use in declarative instead: re-activate if they found data, and if they don't find data in a polling step put themselves on your timer queue (which then drives the _timeout duration).

@comnik

This comment has been minimized.

Copy link
Contributor

comnik commented Apr 4, 2019

It is late, I had too much Pizza. That would work, I think. David has a test case for a polling source up and running if I'm not mistaken, we could verify pretty easily, either on the branch or once it lands on master.

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

frankmcsherry commented Apr 5, 2019

I went with an Option<Duration> argument, as it seems that internally there would need to be a step_or_park_core(Option<Duration>) method, and at that point one might as well make it public and have people use that. We can easily wrap it, but I didn't want to pollute the API with several ways of doing the same thing.

frankmcsherry added some commits Apr 5, 2019

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

frankmcsherry commented Apr 5, 2019

The most recent commit changes the behavior of timely in shutdown to step_or_park(None), allowing workers to park as the computation shuts down. This changes the behavior, and could be "surprising" to people who were using timely's shutdown to drive their computation.

My reasoning is that the polite parking shutdown is probably a better default, and if you need high performance shutdown, or need to spin for whatever reason, you can replicate that logic yourself. Any thoughts about that?

@comnik

This comment has been minimized.

Copy link
Contributor

comnik commented Apr 5, 2019

Can you explain a bit more about how that changes shutdown behaviour? I don't quite follow what "polite parking shutdown" looks like in practice. Is that referring to workers that can go to sleep while others finish up?

ffs
@frankmcsherry

This comment has been minimized.

Copy link
Member Author

frankmcsherry commented Apr 5, 2019

If you peek at what happens when you call timely::execute(.. it looks a bit like this:

    initialize_from(builders, others, move |allocator| {
        let mut worker = Worker::new(allocator);
        let result = func(&mut worker);
        while worker.step_or_park(None) { }
        result
    })

except that step_or_park(None) used to be a step(). What that mean was that once we had finished with func, which is the closure supplied to execute, the worker spins waiting for all of its dataflows to shut down. That used to be spin waiting, which would burn 100% CPU for each worker. It now could be (except that it won't until communication gets fixed) workers that go to sleep waiting for things to wake them up and make them work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.