Skip to content

Commit

Permalink
next step to make expansions work
Browse files Browse the repository at this point in the history
  • Loading branch information
rkoschmitzky committed Feb 8, 2021
1 parent 289622c commit c78acf3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
25 changes: 17 additions & 8 deletions dispatch/trixterdispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,23 @@ def dispatch(self, nodes):
code = "from jobtronaut.author import (Task, ProcessorDefinition)"

for hierarchy_node in all_hierarchy_nodes:
template = TaskTemplate(hierarchy_node.getName())

name = hierarchy_node.getName()
# figure out if the hierarchy is the direct descendant of a Root within expansion
_incoming = hierarchy_node.getChild("in").getInput() if hierarchy_node.getChild("in") else None
if _incoming:
node = _incoming.node()
if isinstance(node, Root):
arguments_in = _incoming.node().getChild("arguments_in")
if arguments_in.getInput() and hasattr(arguments_in.getInput().source().node(), "expansions"):
name = node.getChild("type").getValue()

template = TaskTemplate(name)
template.required_tasks = JobtronautDispatcher.get_required_tasks(hierarchy_node, scriptnode)

for processor_node in JobtronautDispatcher.get_processors(hierarchy_node):
processor = ProcessorDefinitionTemplate(processor_node.getChild("type").getValue())
processor.scope = list(processor_node.getChild("scope").getValue())

processor.parameters = self._get_named_values(processor_node, "parameters", ignore_if_default=True)

template.argument_processors.append(processor)
Expand Down Expand Up @@ -241,7 +251,6 @@ def get_processors(startnode):
current_plug = current_node.getChild("in").getInput()
return processors


@staticmethod
def get_required_tasks(startnode, scriptnode):
graphgadget = GafferUI.GraphGadget(scriptnode)
Expand Down Expand Up @@ -271,21 +280,23 @@ def _get_nodes(current):
Gaffer.Plug.Direction.Out,
1
)])

# Sorting by the x position is the expected behaviour for serial execution.
# We assume that the x ordering of downstream nodes is the determining
# factor for execution order.
downstream_nodes = sorted(downstream_nodes, key=lambda node: graphgadget.getNodePosition(node).x)

for node in downstream_nodes:
if isinstance(node, Gaffer.Dot):
required_tasks.append(_reduce_hierarchy_levels(_get_nodes(node)))
elif isinstance(node, Serial):
required_tasks.append(_reduce_hierarchy_levels(Tuple(_get_nodes(node))))
elif isinstance(node, Parallel):
required_tasks.append(_reduce_hierarchy_levels(List(_get_nodes(node))))
elif isinstance(node, (HierarchyTask, JobtronautTask)):
elif isinstance(node, HierarchyTask):
required_tasks.append(node.getName())
elif isinstance(node, JobtronautTask):
required_tasks.append(node.getChild("type").getValue())
elif isinstance(node, Root):
required_tasks.append(node.getChild("arguments_in").getInput().getName())

return _reduce_hierarchy_levels(required_tasks)

Expand All @@ -294,8 +305,6 @@ def _get_nodes(current):
# Make sure that we don't end up with a single required task without any wrapping List or Tuple
return required_tasks if isinstance(required_tasks, (List, Tuple)) else [required_tasks]



@staticmethod
def initialize(parent_plug):
pass
Expand Down
20 changes: 15 additions & 5 deletions nodes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def constructor(self, node, serialisation):


class ArgumentsPlug(Gaffer.Plug):
def __init__( self, name="ArgumentsPlug", direction=Gaffer.Plug.Direction.In, flags = Gaffer.Plug.Flags.Default ) :
def __init__(self, name="ArgumentsPlug", direction=Gaffer.Plug.Direction.In, flags=Gaffer.Plug.Flags.Default):
Gaffer.Plug.__init__(self, name, direction, flags)
self.inputHasBeenSet = False

Expand Down Expand Up @@ -151,7 +151,7 @@ def createCounterpart(self, name, direction):


class ProcessorPlug(Gaffer.Plug):
def __init__( self, name="ProcessorPlug", direction=Gaffer.Plug.Direction.In, flags = Gaffer.Plug.Flags.Default ) :
def __init__(self, name="ProcessorPlug", direction=Gaffer.Plug.Direction.In, flags=Gaffer.Plug.Flags.Default):
Gaffer.Plug.__init__(self, name, direction, flags)
self.inputHasBeenSet = False

Expand Down Expand Up @@ -266,6 +266,7 @@ def _on_plug_input_changed(self, plug):
if self.ignore_changed_inputs_signal:
return

# TODO: we should use mock.patch_object instead
with temporary_attribute_value(self, "ignore_changed_inputs_signal", True):
self._disconnect_all_tasks()

Expand Down Expand Up @@ -353,8 +354,9 @@ def __init__(self, name, task_name):

Gaffer.Metadata.registerValue(self, "description", plugin.description)

expansions = get_expand_task_names(plugin)
for expansion in expansions:
self.expansions = get_expand_task_names(plugin)

for expansion in self.expansions:
expand_plug = GafferDispatch.TaskNode.TaskPlug(expansion.root, Gaffer.Plug.Direction.Out)
Gaffer.Metadata.registerPlugValue(expand_plug, "nodule:type", "GafferUI::StandardNodule")
Gaffer.Metadata.registerPlugValue(expand_plug, "nodule:color", _TASK_IN_OUT_COLOR)
Expand Down Expand Up @@ -549,7 +551,15 @@ def __init__(self, name="HierarchyTask"):
Gaffer.Metadata.registerPlugValue(processor_plug, "noduleLayout:section", "right")
Gaffer.Metadata.registerPlugValue(processor_plug, "plugValueWidget:type", "")
self.addChild(processor_plug)


# TODO: potentially we should also set the type not only the name
def _on_plug_input_changed(self, plug):
node = plug.getInput().source().node()
if node and hasattr(node, "expansions"):
name = plug.getInput().getName()
self.setName(name)

return

class Root(GafferTaskNodeBase):
def __init__(self, name="Root"):
Expand Down

0 comments on commit c78acf3

Please sign in to comment.