New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework timely layers #201

frankmcsherry opened this Issue Nov 5, 2018 · 0 comments


None yet
1 participant

frankmcsherry commented Nov 5, 2018

This is a proposal for a different layering of the timely dataflow, intended to tease apart different levels of functionality and allow us to compartmentalize some aspects a bit better (e.g. progress tracking).

It seems like we could have a layering that includes the following three layers:

  1. Communication fabric.
    This is essentially the current timely::communication module, except rather than baking in "what to do with messages" we allow the user to specify this as part of receiving the messages. I'm thinking of a trait replacing Allocate that looks something like:

    trait Fabric {
        fn index(&self) -> usize;
        fn peers(&self) -> usize;
        // This may have a hard time being generic...
        type Message;
        /// Analogous to `allocate`. 
        fn channel(&mut self) -> (Vec<Box<Push<Self::Message>>>, Box<Pull<Self::Message>>);
        /// Analogous to `pre_work`, this responds to each received message.
        fn receive<F: Fn(Header, Self::Message)>(&mut self, action: F);

    The idea is that the work currently done by the various communicators, putting received messages into per-channel queues, gets handled in receive instead (or even avoiding that step if at all possible; you could in principle put map and filter operators right in receive).

    One downside of the above is that the associated type Message can't easily be generic, which may mean that we need to use Box<Any> to transit typed allocations (or roll our own variant). This is not currently required, but if we want the receive logic to notice the arrival of each extra-worker message, we need to do something here. We could have two lines of communication: typed channels and a "please schedule me" message, but we could also start here and treat the other as an optimization. This choice also removes some opportunistic concurrency from the processing (a worker would only see those messages that had been received by the time receive was last called, and not any received while it is doing work).

  2. Tasklet scheduling.
    The above has no clue about the relationship between messages and what you might do with them. This next layer would introduce the idea of a "process" and its "threads", for lack of better terms (things that are currently "dataflow" and "scopes/operators" respectively). Actually, it seems that fiber is perhaps the better term. In particular, this layer understands "addresses" in the form &[usize] indicating a path along nested tasklets to something that needs to run (again, nested scopes / operators in a dataflow).

    The main feature this layer adds is event-based scheduling. I'm imagining that what is currently Worker would have a new method

    fn activate(&mut self, address: &[usize]);

    and that what is currently the Operate trait will have its methods push_external_progress and pull_internal_progress replaced by

    fn schedule(&mut self, activations: &mut Activations) -> bool;

    where Activations is a tree-structured representation of which children need processing. I'm thinking something like:

    /// Tree-shaped requested activations.
    struct Activations {
        activated: Vec<usize>,
        children: Vec<Activations>,

    where the activated member indicates which of children need to be scheduled. This could all change if we figure out that scheduling a tasklet needs more information about what to do (right now all are scheduled, with no information).

  3. Timely dataflow (above + progress)

    The above has no information about dataflow structure or progress information. I'm a bit torn about whether we should have a pure dataflow layer, and put progress on top of that, or have a fused layer that does both of them (certain concepts, like "termination", only result from progress tracking).

    In any case, we would want to layer on top of the tasklets above dataflow structure and progress tracking. Whereas this was previously part of the scheduling (i.e. calls to pull_internal_progress) it is now on the side, with the intent being that a scope shares some Rc<RefCell<ChangeBatch<T>>> with its managed children, and atomically pulls this information out at various moments.

    Previously this happened as part of scheduling because there was (and still is, I think) no way that progress information could manifest without scheduling the operator, and no reason not to harvest progress information once reported. The decoupling probably isn't mandatory, and is proposed mainly to help clarify the specific obligations surround progress tracking (it can be collected and updated independent of operator scheduling, even if we don't have a great reason to do that).

I'm not exactly sure of the plan of attack on this issue, probably just starting by rewriting bits of the lowest layer and seeing what breaks, then moving up. Posting for now to solicit comments!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment