Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in differential #219

Closed
ryzhyk opened this issue Oct 3, 2019 · 3 comments · Fixed by #220

Comments

@ryzhyk
Copy link
Contributor

ryzhyk commented Oct 3, 2019

I believe we ran into a memory leak in DD master. It seems to happen when we haven't fed enough input data to activate some of the operators in the program, but keep incrementing the timestamp on input sessions. Here is a minimal failing example based on examples/bfs.rs. When I run it, I see the memory usage of the program grow unboundedly.

Note that uncommenting the line marked with

// !!! UNCOMMENTING THIS LINE ELIMINATES THE LEAK !!!`

eliminates the leak, with the memory usage remaining constant.

extern crate rand;
extern crate timely;
extern crate differential_dataflow;

use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;

type Node = u32;
type Edge = (Node, Node);

fn main() {

    // define a new computational scope, in which to run BFS
    timely::execute_from_args(std::env::args(), move |worker| {

        if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {

            eprintln!("enabled DIFFERENTIAL logging to {}", addr);

            if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
                let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
                let mut logger = ::timely::logging::BatchLogger::new(writer);
                worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
                    logger.publish_batch(time, data)
                );
            }
            else {
                panic!("Could not connect to differential log address: {:?}", addr);
            }
        }

        // define BFS dataflow; return handles to roots and edges inputs
        let mut probe = Handle::new();
        let (mut roots, mut graph) = worker.dataflow(|scope| {

            let (root_input, roots) = scope.new_collection();
            let (edge_input, graph) = scope.new_collection();

            let result = bfs(&graph, &roots);

            result.map(|(_,l)| l)
                  .consolidate()
                  .probe_with(&mut probe);

            (root_input, edge_input)
        });

        roots.insert(0);
        roots.close();

        // !!! UNCOMMENTING THIS LINE ELIMINATES THE LEAK !!!
        //graph.insert((0,1));
        graph.advance_to(1);
        graph.flush();

        loop {
            let time = *graph.time();
            graph.advance_to(time + 1);
            graph.flush();
            while probe.less_than(graph.time()) {
                worker.step_or_park(None);
            }
        }

    }).unwrap();
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord {

    // initialize roots as reaching themselves at distance 0
    let nodes = roots.map(|x| (x, 0));

    // repeatedly update minimal distances each node can be reached from each root
    nodes.iterate(|inner| {

        let edges = edges.enter(&inner.scope());
        let nodes = nodes.enter(&inner.scope());

        inner.join_map(&edges, |_k,l,d| (*d, l+1))
             .concat(&nodes)
             .reduce(|_, s, t| t.push((*s[0].0, 1)))
     })
}

I realize this looks artificial, but we also observed this in more realistic scenarios, when it just happens so that that some inputs of a computation remain empty. I don't believe the bug existed in 0.9, but I haven't bisected exactly when it was introduced.

@frankmcsherry
@mbudiu-vmw
@ekcs

@frankmcsherry

This comment has been minimized.

Copy link
Member

frankmcsherry commented Oct 4, 2019

On it this weekend!

@frankmcsherry

This comment has been minimized.

Copy link
Member

frankmcsherry commented Oct 4, 2019

Yup, this reproduces. I'll get to work on it, but my guess is that this relates to the formation and cleaning up (or not) of empty batches. They follow a different code path because they cannot be sent along timely channels (as we may not hold capabilities for them).

@frankmcsherry

This comment has been minimized.

Copy link
Member

frankmcsherry commented Oct 4, 2019

A simpler reproduction (doesn't grow as fast, but ... yeah). Also, there seems to be a leak with distinct_total with or without the input, so that's obviously worrying.

extern crate timely;
extern crate differential_dataflow;

use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::operators::*;

fn main() {

    // TODO: distinct leaks without arguments (i.e. no input).
    // TODO: distinct_total seems to leak in each case (!!!).

    timely::execute_from_args(std::env::args(), move |worker| {

        let mut probe = Handle::new();
        let mut graph = worker.dataflow::<usize,_,_>(|scope| {
            let (edge_input, graph) = scope.new_collection::<i32,isize>();
            graph.distinct().probe_with(&mut probe);
            edge_input
        });

        // Insert an input with any arguments (and not otherwise).
        if std::env::args().count() > 1 {
            graph.insert(0);
        }

        loop {
            let time = *graph.time();
            graph.advance_to(time + 1);
            graph.flush();
            while probe.less_than(graph.time()) {
                worker.step_or_park(None);
            }
        }

    }).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.