Skip to content
This repository has been archived by the owner on Jan 31, 2020. It is now read-only.

Commit

Permalink
Merge pull request #37 from mark-burnett/matrix-parallel-by
Browse files Browse the repository at this point in the history
Add support for parallel by multiple times on the same input
  • Loading branch information
mark-burnett committed Oct 27, 2014
2 parents 72414a2 + d9d533c commit d8289a4
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 50 deletions.
21 changes: 7 additions & 14 deletions ptero_workflow/implementation/models/input_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,16 @@ def get_data(self, colors, begins):
).filter(result.Result.color.in_(colors)).one()

indexes = self.parallel_indexes(colors, begins)

if indexes:
LOG.debug('%s[%s] parallel_depths=%s, colors=%s, begins=%s '
'-> indexes=%s',
self.source_task.name, self.source_property,
self.parallel_depths, colors, begins, indexes)
# XXX This will only work for completely orthogonal inputs.
assert len(indexes) == 1
return r.get_element(indexes[0])

else:
return r.data
LOG.debug('%s[%s] parallel_depths=%s, colors=%s, begins=%s '
'-> indexes=%s',
self.source_task.name, self.source_property,
self.parallel_depths, colors, begins, indexes)
return r.get_data(indexes)

def get_size(self, colors, begins):
indexes = self.parallel_indexes(colors, begins)
s = object_session(self)

r = s.query(result.Result
).filter_by(task=self.source_task, name=self.source_property
).filter(result.Result.color.in_(colors)).one()
return r.size
return r.get_size(indexes)
52 changes: 24 additions & 28 deletions ptero_workflow/implementation/models/json_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,42 @@ def process_result_value(self, value, dialect):
return value


def get_data_element_brute_force(task, index):
return task.data[index]
def get_data_element_brute_force(task, indexes):
d = task.data
for i in indexes:
d = d[i]
return d

def get_data_element_postgres_extensions(task, indexes):
if indexes:
q = task.__class__.data[indexes]
else:
q = task.__class__.data

def get_data_element_postgres_extensions(task, index):
s = object_session(task)
tup = s.query(task.__class__.data[index]).filter_by(id=task.id).one()
tup = s.query(q).filter_by(id=task.id).one()
return tup[0]


def get_data_size_brute_force(task):
return len(task.data)
def get_data_size_brute_force(task, indexes):
d = task.data
for i in indexes:
d = d[i]
return len(d)


class json_array_length(GenericFunction):
type = Integer

def get_data_size_postgres_extensions(task):
s = object_session(task)
tup = s.query(json_array_length(task.__class__.data)
).filter_by(id=task.id).one()
return tup[0]


def get_referenced_element_brute_force(task, index):
from . import result
element_result_id = task.reference_ids[index]
def get_data_size_postgres_extensions(task, indexes):
if indexes:
q = task.__class__.data[indexes]
else:
q = task.__class__.data

s = object_session(task)
r = s.query(result.Result).filter_by(id=element_result_id).one()
return r.data

def get_referenced_element_postgres_extensions(task, index):
from . import result
s = object_session(task)
r = s.query(result.Result
).join(result.Result.id == task.__class__.reference_ids[index]
).filter(task.__class__.id == task.id).one()
return r.data
tup = s.query(json_array_length(q)).filter_by(id=task.id).one()
return tup[0]


if os.environ.get('PTERO_WORKFLOW_DB_STRING', 'sqlite://'
Expand All @@ -81,10 +79,8 @@ def get_referenced_element_postgres_extensions(task, index):
JSON = psqlJSON
get_data_element = get_data_element_postgres_extensions
get_data_size = get_data_size_postgres_extensions
get_referenced_element = get_referenced_element_brute_force

else:
JSON = JSONEncodedDict(1000)
get_data_element = get_data_element_brute_force
get_data_size = get_data_size_brute_force
get_referenced_element = get_referenced_element_brute_force
30 changes: 23 additions & 7 deletions ptero_workflow/implementation/models/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ class ConcreteResult(Result):
'polymorphic_identity': 'concrete'
}

@property
def size(self):
return json_type.get_data_size(self)
def get_data(self, indexes):
return json_type.get_data_element(self, indexes)

def get_element(self, index):
return json_type.get_data_element(self, index)
def get_size(self, indexes):
return json_type.get_data_size(self, indexes)


class ArrayReferenceResult(Result):
Expand All @@ -76,5 +75,22 @@ def data(self):
results.append(s.query(Result).filter_by(id=rid).one())
return [r.data for r in results]

def get_element(self, index):
return json_type.get_referenced_element(self, index)
def get_data(self, indexes):
if indexes:
s = object_session(self)
rid = self.reference_ids[indexes[0]]
r = s.query(Result).filter_by(id=rid).one()
return r.get_data(indexes[1:])

else:
return self.data

def get_size(self, indexes):
if indexes:
s = object_session(self)
rid = self.reference_ids[indexes][0]
r = s.query(Result).filter_by(id=rid).one()
return r.get_size(indexes[1:])

else:
return self.size
4 changes: 3 additions & 1 deletion ptero_workflow/implementation/models/task/task_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def create_array_result(self, body_data, query_string_data):
output_name, [])
results = s.query(result.Result
).filter_by(task=source, name=name, parent_color=color
).order_by('color'
).all()

array_result = result.ArrayReferenceResult(task=source, name=name,
Expand Down Expand Up @@ -402,7 +403,8 @@ def failure_place_name(self):

def resolve_input_source(self, session, name, parallel_depths):
if self.parallel_by == name:
pdepths = parallel_depths + [self.parallel_depth]
pdepths = [self.parallel_depth] + parallel_depths

else:
pdepths = parallel_depths

Expand Down
2 changes: 2 additions & 0 deletions tests/api/v1/generator/base_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
class TestCaseMixin(object):
__metaclass__ = abc.ABCMeta

maxDiff = None

@property
def api_port(self):
return int(os.environ['PTERO_WORKFLOW_PORT'])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"outputs": {
"out_matrix": [
["Simba", "Tabby", "Emilio"],
["Pluto", "Snoopy"]
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"parallelBy": "in_matrix",

"tasks": {
"A": {
"methods": [
{
"name": "execute",
"service": "ShellCommand",
"parameters": {
"commandLine": ["cat"]
}
}
],
"parallelBy": "name"
}
},

"edges": [
{
"source": "input connector",
"destination": "A",
"sourceProperty": "in_matrix",
"destinationProperty": "name"
},

{
"source": "A",
"destination": "output connector",
"sourceProperty": "name",
"destinationProperty": "out_matrix"
}
],

"inputs": {
"in_matrix": [
["Simba", "Tabby", "Emilio"],
["Pluto", "Snoopy"]
]
},

"environment": {}
}

0 comments on commit d8289a4

Please sign in to comment.