Skip to content
This repository has been archived by the owner on Jan 31, 2020. It is now read-only.

Nested workflows pass inputs correctly #4

Merged
merged 3 commits into from
May 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ptero_workflow/implementation/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ def _save_workflow(self, workflow_data):
}

workflow.root_operation = operations.create_operation('root', root_data)
workflow.root_operation.children['input connector'].set_outputs(
workflow_data['inputs'])

workflow.input_holder_operation = operations.create_input_holder(
workflow.root_operation, workflow_data['inputs'])

self.session.add(workflow)
self.session.commit()
Expand Down
94 changes: 75 additions & 19 deletions ptero_workflow/implementation/models/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import simplejson


__all__ = ['Operation']
__all__ = ['Operation', 'InputHolderOperation']


LOG = logging.getLogger(__file__)
Expand Down Expand Up @@ -59,24 +59,28 @@ def to_dict(self):
def _as_dict_data(self):
return {}

@property
def unique_name(self):
return '-'.join(['op', str(self.id), self.name.replace(' ', '_')])

@property
def success_place_name(self):
return 'op-%d-success' % self.id
return '%s-success' % self.unique_name

def success_place_pair_name(self, op):
return 'op-%d-succes-for-%d' % (self.id, op.id)
return '%s-success-for-%s' % (self.unique_name, op.unique_name)

@property
def ready_place_name(self):
return 'op-%d-ready' % self.id
return '%s-ready' % self.unique_name

@property
def response_wait_place_name(self):
return 'op-%d-response-wait' % self.id
return '%s-response-wait' % self.unique_name

@property
def response_callback_place_name(self):
return 'op-%d-response-callback' % self.id
return '%s-response-callback' % self.unique_name

def notify_callback_url(self, event):
return 'http://%s:%d/v1/callbacks/operations/%d/events/%s' % (
Expand Down Expand Up @@ -173,11 +177,29 @@ def get_inputs(self):
def get_input(self, name):
return self.get_inputs()[name]


def execute(self, inputs):
pass


class InputHolderOperation(Operation):
__tablename__ = 'operation_input_holder'

id = Column(Integer, ForeignKey('operation.id'), primary_key=True)

__mapper_args__ = {
'polymorphic_identity': '__input_holder',
}

def get_inputs(self):
raise RuntimeError()

def get_input(self, name):
raise RuntimeError()

def get_petri_transitions(self):
return []


class InputConnectorOperation(Operation):
__tablename__ = 'operation_input_connector'

Expand All @@ -187,11 +209,29 @@ class InputConnectorOperation(Operation):
'polymorphic_identity': 'input connector',
}

def get_output(self, name):
return self.get_inputs().get(name)

def get_outputs(self):
return self.get_inputs()

def get_inputs(self):
return self.parent.get_inputs()

def get_input(self, name):
return self.parent.get_input(name)

def get_petri_transitions(self):
return [{
'inputs': [self.success_place_name],
'outputs': [self.success_place_pair_name(o) for o in self.output_ops],
}]
return [
{
'inputs': [self.parent.ready_place_name],
'outputs': [self.success_place_name],
},
{
'inputs': [self.success_place_name],
'outputs': [self.success_place_pair_name(o) for o in self.output_ops],
}
]


class OutputConnectorOperation(Operation):
Expand Down Expand Up @@ -222,28 +262,44 @@ class ModelOperation(Operation):
'polymorphic_identity': 'model',
}

def get_inputs(self):
return self.children['input connector'].get_inputs()

def get_input(self, name):
return self.children['input connector'].get_input(name)

def get_output(self, name):
return self.children['output connector'].get_output(name)

def get_outputs(self):
return self.children['output connector'].get_outputs()

def get_petri_transitions(self):
return [{
result = []

if self.input_ops:
result.append({
'inputs': [o.success_place_pair_name(self) for o in self.input_ops],
'outputs': [self.ready_place_name],
})

if self.output_ops:
success_outputs = [self.success_place_pair_name(o) for o in self.output_ops]
if self.parent:
success_outputs.append(self.success_place_pair_name(self.parent))
result.append({
'inputs': [self.success_place_name],
'outputs': success_outputs,
})

result.append({
'inputs': [o.success_place_pair_name(self)
for o in self.real_child_ops],
'outputs': [self.success_place_name],
'action': {
'type': 'notify',
'url': self.notify_callback_url('done'),
},
}]
})

for child in self.children.itervalues():
result.extend(child.get_petri_transitions())

return result


class CommandOperation(Operation):
Expand Down
15 changes: 9 additions & 6 deletions ptero_workflow/implementation/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ class Workflow(Base):
root_operation_id = Column(Integer,
ForeignKey('operation.id'), nullable=False)

root_operation = relationship('Operation', backref='workflow')
root_operation = relationship('Operation', backref='workflow',
foreign_keys=[root_operation_id])

input_holder_operation_id = Column(Integer,
ForeignKey('operation.id'), nullable=False)

input_holder_operation = relationship('InputHolderOperation',
foreign_keys=[input_holder_operation_id])

net_key = Column(Text, unique=True)

@property
def start_place_name(self):
LOG.debug('root_op: %d', self.root_operation.id)
LOG.debug('root input_connector: %d',
self.root_operation.children['input connector'].id)
return self.root_operation.children['input connector'
].success_place_name
return self.root_operation.ready_place_name

@property
def links(self):
Expand Down
8 changes: 8 additions & 0 deletions ptero_workflow/implementation/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,11 @@ def create_operation(name, operation_data, parent=None):
_build_model_operation(operation_data, operation=operation)

return operation

def create_input_holder(root, inputs):
operation = models.InputHolderOperation(name='input_holder')
operation.set_outputs(inputs)
for i in inputs.iterkeys():
models.Link(source_operation=operation, destination_operation=root,
source_property=i, destination_property=i)
return operation
4 changes: 0 additions & 4 deletions ptero_workflow/implementation/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,4 @@ def build_petri_net(workflow):
'transitions': workflow.root_operation.get_petri_transitions(),
}

for operation in workflow.root_operation.children.itervalues():
transitions = operation.get_petri_transitions()
data['transitions'].extend(transitions)

return data
6 changes: 6 additions & 0 deletions tests/api/v1/system_tests/nested/result.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"outputs": {
"outer_out_c_1": "ferret",
"outer_out_c_2": "badger"
}
}
78 changes: 78 additions & 0 deletions tests/api/v1/system_tests/nested/submit.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{
"operations": {
"A": {
"type": "pass-through"
},
"Inner": {
"type": "model",
"operations": {
"B": {
"type": "pass-through"
}
},
"links": [
{
"source": "input connector",
"destination": "B",
"source_property": "inner_in_b",
"destination_property": "param"
},
{
"source": "B",
"destination": "output connector",
"source_property": "param",
"destination_property": "inner_out_b"
}
]
},
"C": {
"type": "pass-through"
}
},

"links": [
{
"source": "input connector",
"destination": "A",
"source_property": "outer_in_a",
"destination_property": "param"
},
{
"source": "input connector",
"destination": "Inner",
"source_property": "outer_in_inner",
"destination_property": "inner_in_b"
},
{
"source": "A",
"destination": "C",
"source_property": "param",
"destination_property": "param_1"
},
{
"source": "Inner",
"destination": "C",
"source_property": "inner_out_b",
"destination_property": "param_2"
},
{
"source": "C",
"destination": "output connector",
"source_property": "param_1",
"destination_property": "outer_out_c_1"
},
{
"source": "C",
"destination": "output connector",
"source_property": "param_2",
"destination_property": "outer_out_c_2"
}
],

"inputs": {
"outer_in_a": "ferret",
"outer_in_inner": "badger"
},

"environment": {}
}
6 changes: 6 additions & 0 deletions tests/api/v1/system_tests/split_passthrough/result.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"outputs": {
"out_1": "parrots",
"out_2": "toucans"
}
}
59 changes: 59 additions & 0 deletions tests/api/v1/system_tests/split_passthrough/submit.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"operations": {
"A": {
"type": "pass-through"
},
"B": {
"type": "pass-through"
},
"C": {
"type": "pass-through"
}
},

"links": [
{
"source": "input connector",
"destination": "A",
"source_property": "in_1",
"destination_property": "param_1"
},
{
"source": "input connector",
"destination": "A",
"source_property": "in_2",
"destination_property": "param_2"
},
{
"source": "A",
"destination": "B",
"source_property": "param_1",
"destination_property": "param"
},
{
"source": "A",
"destination": "C",
"source_property": "param_2",
"destination_property": "param"
},
{
"source": "B",
"destination": "output connector",
"source_property": "param",
"destination_property": "out_1"
},
{
"source": "C",
"destination": "output connector",
"source_property": "param",
"destination_property": "out_2"
}
],

"inputs": {
"in_1": "parrots",
"in_2": "toucans"
},

"environment": {}
}