Skip to content

Commit

Permalink
Properly handling incoming link options
Browse files Browse the repository at this point in the history
  • Loading branch information
vmagamedov committed May 28, 2016
1 parent 1a1cc24 commit 790cdae
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
20 changes: 12 additions & 8 deletions hiku/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ def link_result_to_ids(is_list, to_list, result):
return [result]


def get_options(graph_obj, query_obj):
_options = query_obj.options or {}
options = {}
for opt in graph_obj.options.values():
options[opt.name] = _options.get(opt.name, opt.default)
return options


class Query(Workflow):

def __init__(self, queue, task_set, root):
Expand Down Expand Up @@ -159,11 +167,9 @@ def process_edge(self, edge, pattern, ids):
self._process_edge_link(edge, _gl, _ql, ids)
))
else:
# TODO: validate query_link.options according to the
# graph_link.options
if graph_link.options:
fut = self._task_set.submit(graph_link.func,
query_link.options)
options = get_options(graph_link, query_link)
fut = self._task_set.submit(graph_link.func, options)
else:
fut = self._task_set.submit(graph_link.func)
self._queue.add_callback(fut, (
Expand All @@ -173,11 +179,9 @@ def process_edge(self, edge, pattern, ids):

def _process_edge_link(self, edge, graph_link, query_link, ids):
reqs = link_reqs(self._result, edge, graph_link, ids)
# TODO: validate query_link.options according to the
# graph_link.options
if graph_link.options:
fut = self._task_set.submit(graph_link.func, reqs,
query_link.options)
options = get_options(graph_link, query_link)
fut = self._task_set.submit(graph_link.func, reqs, options)
else:
fut = self._task_set.submit(graph_link.func, reqs)
self._queue.add_callback(fut, (
Expand Down
33 changes: 28 additions & 5 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def _patch(func):
]),
Link('f', _(query_link1), edge='c', requires=None, to_list=True),
Link('g', _(query_link2), edge='c', requires=None, to_list=True),
Link('h', _(query_link1), edge='c', requires=None, to_list=True,
options=[Option('foo')]),
Link('k', _(query_link1), edge='c', requires=None, to_list=True,
options=[Option('foo', default=1)]),
])

thread_pool = ThreadPoolExecutor(2)
Expand Down Expand Up @@ -119,13 +123,32 @@ def testFieldOptions(self):
query.Field('a', options={'foo': 'bar'}),
])

@patch.object(TEST_ENV.fields['f'], 'options', {'foo': Option('foo')})
def testLinkOptions(self):
def testLinkOption(self):
with _patch(query_link1) as ql1, _patch(query_fields1) as qf1:
ql1.return_value = [1]
qf1.return_value = [['d1']]
result = self.execute('[{(:f {:foo "bar"}) [:d]}]')
self.assertResult(result, {'f': [{'d': 'd1'}]})
result = self.execute('[{(:h {:foo 5}) [:d]}]')
self.assertResult(result, {'h': [{'d': 'd1'}]})
with reqs_eq_patcher():
ql1.assert_called_once_with({'foo': 'bar'})
ql1.assert_called_once_with({'foo': 5})
qf1.assert_called_once_with([query.Field('d')], [1])

def testLinkOptionDefault(self):
with _patch(query_link1) as ql1, _patch(query_fields1) as qf1:
ql1.return_value = [1]
qf1.return_value = [['d1']]
result = self.execute('[{:k [:d]}]')
self.assertResult(result, {'k': [{'d': 'd1'}]})
with reqs_eq_patcher():
ql1.assert_called_once_with({'foo': 1})
qf1.assert_called_once_with([query.Field('d')], [1])

def testLinkOptionUnknown(self):
with _patch(query_link1) as ql1, _patch(query_fields1) as qf1:
ql1.return_value = [1]
qf1.return_value = [['d1']]
result = self.execute('[{(:k {:foo 2 :bar 3}) [:d]}]')
self.assertResult(result, {'k': [{'d': 'd1'}]})
with reqs_eq_patcher():
ql1.assert_called_once_with({'foo': 2})
qf1.assert_called_once_with([query.Field('d')], [1])

0 comments on commit 790cdae

Please sign in to comment.