Skip to content

Commit

Permalink
Rename setup_db -> create_db, add integration tests, update README
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Oct 1, 2018
1 parent 8d2703c commit 3021874
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 41 deletions.
58 changes: 20 additions & 38 deletions README.md
Expand Up @@ -24,51 +24,33 @@ The server executable accepts two sets of arguments separated by `--`,
one for [configuring timely dataflow](https://github.com/frankmcsherry/timely-dataflow)
and the other for configuring the server.

Server configuration:
## Configuration

OPTION | DESCRIPTION | DEFAULT
--port | port to listen at | 6262
--enable-cli | accept commands via stdin? | false
OPTION | DESCRIPTION | DEFAULT
--port | port to listen at | 6262
--enable-cli | accept commands via stdin? | false
--enable-history | keep full traces | false

Logging at a specific level can be enabled by setting the `RUST_LOG`
environment variable to

RUST_LOG=server=info
environment variable to `RUST_LOG=server=info`.

## Documentation

Read the [high-level motivation for this project](https://www.nikolasgoebel.com/2018/09/13/incremental-datalog.html).

Architectural decisions are documented in the [docs/adr/](docs/adr/)
sub-directory.

The synthesizer supports the following operators:
Read the [high-level motivation for this
project](https://www.nikolasgoebel.com/2018/09/13/incremental-datalog.html).

```
Plan := Project Plan Var
| 'Union Var Plan+
| 'Join Plan Plan Var
| 'Not Plan
| 'Lookup Entity Attribute Var
| 'Entity Entity Var Var
| 'HasAttr Var Attribute Var
| 'Filter Var Attribute Value
| 'RuleExpr String Var+
```
Important architectural decisions are documented in the
[docs/adr/](docs/adr/) sub-directory.

The following sample plan is intended to be an examples of what a
non-trivial query plan looks like. It also illustrates that query
plans are cumbersome to write manually (i.e. without a parser like
[clj-3df](https://github.com/comnik/clj-3df)), because symbols and
attributes are encoded by numerical ids.
Documentation for this package can be built via `cargo doc --no-deps`
and viewed in a browser via `cargo doc --no-deps --open`. Please refer
to `declarative_dataflow::plan::Plan` for documentation on the
available operators. The [tests/](tests/) directory contains usage
examples.

``` json
{"Union": [[0, 1]
[{"HasAttr": [0, 600, 1]}
{"Join": [{"HasAttr": [2, 400, 1]}
{"RuleExpr": ["propagate", [0, 2]]} 2]}]]}
```
## Front ends

Please also refer to the [clj-3df
repository](https://github.com/comnik/clj-3df) for a roadmap of
planned features.
Query plans are rather cumbersome to write manually and do not map to
any interesting, higher-level semantics. Currently we provide a
[Datalog front end](https://github.com/comnik/clj-3df) written in
Clojure.
4 changes: 2 additions & 2 deletions src/bin/server.rs
Expand Up @@ -39,7 +39,7 @@ use slab::Slab;

use ws::connection::{Connection, ConnEvent};

use declarative_dataflow::{Context, Plan, Rule, Entity, Attribute, Value, Datom, setup_db, implement};
use declarative_dataflow::{Context, Plan, Rule, Entity, Attribute, Value, Datom, create_db, implement};

// mod sequencer;
// use sequencer::{Sequencer};
Expand Down Expand Up @@ -117,7 +117,7 @@ fn main() {

// setup interpreter context
let mut ctx = worker.dataflow(|scope| {
let (input_handle, db) = setup_db(scope);
let (input_handle, db) = create_db(scope);

Context { db, input_handle, queries: HashMap::new(), }
});
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -296,7 +296,7 @@ pub fn implement<A: Allocate, T: Timestamp+Lattice>(
}

/// Create a new DB instance and interactive session.
pub fn setup_db<A: Allocate, T: Timestamp+Lattice>(scope: &mut Child<Worker<A>, T>) -> (InputSession<T, Datom, isize>, DB<T>) {
pub fn create_db<A: Allocate, T: Timestamp+Lattice>(scope: &mut Child<Worker<A>, T>) -> (InputSession<T, Datom, isize>, DB<T>) {
let (input_handle, datoms) = scope.new_collection::<Datom, isize>();
let db = DB {
e_av: datoms.map(|Datom(e, a, v)| (e, (a, v))).arrange_by_key().trace,
Expand Down
106 changes: 106 additions & 0 deletions tests/query_test.rs
@@ -0,0 +1,106 @@
extern crate timely;
extern crate declarative_dataflow;

use std::collections::{HashMap};
use std::sync::mpsc::{channel};

use timely::{Configuration};
use timely::dataflow::{ProbeHandle};

use declarative_dataflow::{Context, Plan, Rule, Datom, Value, create_db, implement};
use declarative_dataflow::plan::{Project, Join};

#[test]
fn match_ea() {
timely::execute(Configuration::Thread, move |worker| {

let n = 1;
let attr_name = 100;

// [:find ?v :where [1 :name ?n]]
let plan = Plan::MatchEA(1, attr_name, n);

let mut probe = ProbeHandle::new();
let mut ctx = worker.dataflow(|scope| {
let (input_handle, db) = create_db(scope);
Context { db, input_handle, queries: HashMap::new(), }
});

let (send_results, results) = channel();

worker.dataflow::<usize, _, _>(|scope| {
let query_name = "match_ea";
let rules = vec![Rule { name: query_name.to_string(), plan: plan }];
let publish = vec![query_name.to_string()];

let mut rel_map = implement(rules, publish, scope, &mut ctx, &mut probe);

rel_map.get_mut(query_name).unwrap().import(scope)
.as_collection(|tuple,_| tuple.clone())
.inspect(move |x| { send_results.send((x.0.clone(), x.2)).unwrap(); })
.probe_with(&mut probe);
});

ctx.input_handle.insert(Datom(1, attr_name, Value::String("Dipper".to_string())));
ctx.input_handle.insert(Datom(1, attr_name, Value::String("Alias".to_string())));
ctx.input_handle.insert(Datom(2, attr_name, Value::String("Mabel".to_string())));
ctx.input_handle.advance_to(1);
ctx.input_handle.flush();

while probe.less_than(ctx.input_handle.time()) { worker.step(); }

assert_eq!(results.recv().unwrap(), (vec![Value::String("Alias".to_string())], 1));
assert_eq!(results.recv().unwrap(), (vec![Value::String("Dipper".to_string())], 1));

}).unwrap();
}

#[test]
fn join() {
timely::execute(Configuration::Thread, move |worker| {

let (e, a, n) = (1, 2, 3);
let (attr_name, attr_age) = (100, 200);

// [:find ?e ?n ?a :where [?e :age ?a] [?e :name ?n]]
let plan = Plan::Project(Project {
variables: vec![e, n, a],
plan: Box::new(Plan::Join(Join {
variables: vec![e],
left_plan: Box::new(Plan::MatchA(e, attr_name, n)),
right_plan: Box::new(Plan::MatchA(e, attr_age, a)),
}))
});

let mut probe = ProbeHandle::new();
let mut ctx = worker.dataflow(|scope| {
let (input_handle, db) = create_db(scope);
Context { db, input_handle, queries: HashMap::new(), }
});

let (send_results, results) = channel();

worker.dataflow::<usize, _, _>(|scope| {
let query_name = "join";
let rules = vec![Rule { name: query_name.to_string(), plan: plan }];
let publish = vec![query_name.to_string()];

let mut rel_map = implement(rules, publish, scope, &mut ctx, &mut probe);

rel_map.get_mut(query_name).unwrap().import(scope)
.as_collection(|tuple,_| tuple.clone())
.inspect(move |x| { send_results.send((x.0.clone(), x.2)).unwrap(); })
.probe_with(&mut probe);
});

ctx.input_handle.insert(Datom(1, attr_name, Value::String("Dipper".to_string())));
ctx.input_handle.insert(Datom(1, attr_age, Value::Number(12)));
ctx.input_handle.advance_to(1);
ctx.input_handle.flush();

while probe.less_than(ctx.input_handle.time()) { worker.step(); }

assert_eq!(results.recv().unwrap(), (vec![Value::Eid(1), Value::String("Dipper".to_string()), Value::Number(12)], 1));

}).unwrap();
}

0 comments on commit 3021874

Please sign in to comment.