Skip to content

comnik/timely-speculation

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Timely Speculation

Experimenting with speculative results in timely.

The idea here was to take timely's Aggregate operator and give it a second input, that is only used to propagate a speculative frontier. The implementation is a complete hack but revolves around a separate notificator giving results without affecting operator state:

spec_notif.for_each(|time,_,_| {
    if let Some(aggs) = aggregates.get(time.time()) {
        let mut session = output.session(&time);
        for (key, agg) in aggs {
            session.give(emit((*key).clone(), (*agg).clone()));
        }
    }
});

Once the actual input advances, the actual notificator triggers the correct outputs and cleans up as usual.

// pop completed aggregates, send along whatever
notificator.for_each(|time,_,_| {
    if let Some(aggs) = aggregates.remove(time.time()) {
        let mut session = output.session(&time);
        for (key, agg) in aggs {
            session.give(emit(key, agg));
        }
    }
});

Without speculative results, we can close windows as such:

cap = cap.delayed(&RootTimestamp::new(window));

Doing so means that we'll have to discard late arrivals. Ofcourse eventually we will have to do this, to avoid operator state growing unbounded. A speculative frontier allows us to advance the (otherwise non-contributing) speculative input and get results, before downgrading our capability to input late data.

The demo implemented in src/main.rs is based on the excellent "Dataflow: A Unified Model for Batch and Streaming Data Processing" talk by Frances Perry.

About

Experimenting with speculative results in timely.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages