Skip to content

Commit

Permalink
functionality to ignore multiple dirty propagations from same source (#…
Browse files Browse the repository at this point in the history
…2694)

* functionality to ignore multiple dirtyprop from same source

Dirty propagation in ilastik goes depth-first. It is possible for dirtiness
to propagate to the same target via multiple upstream paths. This commit adds
the concept of a "modification time" (`mod_time` in the code) to encode sources
of dirtiness.
Operators will keep track of the last `mod_time` they've seen (during dirty
propagation) and can ignore it, if it is already known (see
`Operator.propagateDirtyIfNewModTime`).

In more complicated graphs like in autocontext, this solves heavy delays when
annotating.

  * all calls to `slot.setDirty` or `slot.setValue` without a mod_time will trigger
    a "new" mod_time.
  * `op._pending_dirty_mod_time` is only "active" during dirty prop
  * `op._previous_dirty_mod_time` holds the last completed dirtiness
    mod_time.
  * `op.propagateDirtyIfNewModTime`: note, this function will set all
     output slots dirty - for their entire extend (empty roi).
  • Loading branch information
k-dominik committed May 2, 2023
1 parent a7742df commit fd15ff5
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 26 deletions.
23 changes: 20 additions & 3 deletions lazyflow/operator.py
Expand Up @@ -255,6 +255,13 @@ def __init__(self, parent=None, graph=None):
self._debug_text = None
self._setup_count = 0

# keeps track of any setDirty calls on inputs slots
self._previous_dirty_mod_time = -1
# temporary variable during dirty notification to buffer the previous
# value. Used in `propagateDirtyIfNewModTime` in order to ignore multiple dirty
# notifications cause by the same upstream source
self._pending_dirty_mod_time = -1

@property
def children(self):
return list(self._children.keys())
Expand Down Expand Up @@ -624,10 +631,20 @@ def debug_text(self):
# return self._debug_text
return "setups: {}".format(self._setup_count)

def propagateDirtyIfNewModTime(self):
"""set all outputs dirty and ignore if _pending_dirty_mod_time is new
This ignores subsequent dirty prop from same source (=same _pending_dirty_mod_time)
Should only be called within `Operator.propagateDirty`.
See https://github.com/ilastik/ilastik/pull/2694
"""
assert self._pending_dirty_mod_time != -1
if self._pending_dirty_mod_time <= self._previous_dirty_mod_time:
return

# @debug_text.setter
# def debug_text(self, text):
# self._debug_text = text
for slot in self.outputs.values():
slot.setDirty((), _mod_time=self._pending_dirty_mod_time)


def format_operator_stack(tb):
Expand Down
2 changes: 1 addition & 1 deletion lazyflow/operators/classifierOperators.py
Expand Up @@ -240,7 +240,7 @@ def execute(self, slot, subindex, roi, result):
)

def propagateDirty(self, slot, subindex, roi):
self.Classifier.setDirty()
self.propagateDirtyIfNewModTime()


class OpTrainVectorwiseClassifierBlocked(Operator):
Expand Down
23 changes: 5 additions & 18 deletions lazyflow/operators/generic.py
Expand Up @@ -307,24 +307,11 @@ def propagateDirty(self, inputSlot, subindex, roi):
if not self.Output.ready():
# If we aren't even fully configured, there's no need to notify downstream slots about dirtiness
return
if inputSlot == self.AxisFlag or inputSlot == self.AxisIndex:
self.Output.setDirty(slice(None))

elif inputSlot == self.Images:
imageIndex = subindex[0]
axisflag = self.AxisFlag.value
axisIndex = self.Output.meta.axistags.index(axisflag)

if len(roi.start) == len(self.Output.meta.shape):
# axis is already in the input
roi.start[axisIndex] += self.intervals[imageIndex][0]
roi.stop[axisIndex] += self.intervals[imageIndex][0]
self.Output.setDirty(roi)
else:
# insert axis into roi
newroi = copy.copy(roi)
newroi = newroi.insertDim(axisIndex, self.intervals[imageIndex][0], self.intervals[imageIndex][0] + 1)
self.Output.setDirty(newroi)
if inputSlot in (self.AxisFlag, self.AxisIndex, self.Images):
# Any upstream change will cause the whole output to be set dirty
# Often enough this would happen eventually (e.g. stacking the output
# of different Filter operators, all connected to the same input).
self.propagateDirtyIfNewModTime()

else:
assert False, "Unknown input slot."
Expand Down
2 changes: 1 addition & 1 deletion lazyflow/operators/opCompressedUserLabelArray.py
Expand Up @@ -450,7 +450,7 @@ def _setInSlotInput(self, slot, subindex, roi, new_pixels):
max_label = max(max_label, cleaned_block_data.max())

# We could wait to send out one big dirty notification (instead of one per block),
# But that might result in a lot of unecessarily dirty pixels in cases when the
# But that might result in a lot of unnecessarily dirty pixels in cases when the
# new_pixels were mostly empty (such as when importing labels from disk).
# That's bad for downstream operators like OpFeatureMatrixCache
# So instead, we only send notifications for the blocks that were touched.
Expand Down
32 changes: 29 additions & 3 deletions lazyflow/slot.py
Expand Up @@ -37,6 +37,7 @@
from functools import partial, wraps
from contextlib import contextmanager
import warnings
import time

# SciPy
import numpy
Expand Down Expand Up @@ -884,32 +885,57 @@ def __call__(self, destination=None):
return destination

@is_setup_fn
def setDirty(self, *args, **kwargs):
def setDirty(self, *args, _mod_time: int = None, **kwargs):
"""This method is called by a partnering OutputSlot when its
content changes.
The 'key' parameter identifies the changed region
of an numpy.ndarray
Args:
* if args[0] is not a Roi instance, it is expected that a roi can be
constructed via self.rtype(self, *args, **kwargs)
* _mod_time: modification time, used to track changes from a single
source, that might propagate through the graph. Allows ignoring
recurrent notifications.
"""
assert (
self.operator is not None
), "Slot '{}' cannot be set dirty, slot not belonging to any actual operator instance".format(self.name)

if _mod_time is None:
if self._type == "output":
# if setDirty called outside of dirty propagation
# generate a new dirty time.
if self.operator._pending_dirty_mod_time == -1:
_mod_time = time.perf_counter_ns()
else:
_mod_time = self.operator._pending_dirty_mod_time
elif self._type == "input":
_mod_time = time.perf_counter_ns()

if self.stype.isConfigured():
if len(args) == 0 or not isinstance(args[0], rtype.Roi):
roi = self.rtype(self, *args, **kwargs)
else:
roi = args[0]

for c in self.downstream_slots:
c.setDirty(roi)
c.setDirty(roi, _mod_time=_mod_time)

# call callbacks
self._sig_dirty(self, roi)

if self._type == "input" and self.operator.configured():
self.operator.propagateDirty(self.top_level_slot, self.subindex, roi)
self.operator._pending_dirty_mod_time = _mod_time
try:
self.operator.propagateDirty(self.top_level_slot, self.subindex, roi)
finally:
self.operator._previous_dirty_mod_time = max(
self.operator._previous_dirty_mod_time, self.operator._pending_dirty_mod_time
)
self.operator._pending_dirty_mod_time = -1

def __iter__(self):
assert self.level >= 1
Expand Down
185 changes: 185 additions & 0 deletions tests/test_lazyflow/test_graph/test_dirty_modtime.py
@@ -0,0 +1,185 @@
from lazyflow.graph import Operator, InputSlot, OutputSlot

from unittest import mock
import pytest


class MockOp(Operator):
Input = InputSlot(value=(10,))
Output = OutputSlot()

def setupOutputs(self):
self.Output.meta.assignFrom(self.Input.meta)

def propagateDirty(self, slot, subindex, roi):
self.Output.setDirty(())

def execute(self, slot, subindex, roi):
pass


def test_op_mod_time(graph):
"""setDirty _mod_time modifies parent op correctly"""
op = MockOp(graph=graph)

assert op._pending_dirty_mod_time == -1

for mod_time in (1, 2, 13, 42):
op.Input.setDirty((), _mod_time=mod_time)
assert op._previous_dirty_mod_time == mod_time
assert op._pending_dirty_mod_time == -1


def test_op_lower_mod_time_does_not_modify(graph):
"""setDirty _mod_time modifies parent op correctly"""
op = MockOp(graph=graph)

assert op._pending_dirty_mod_time == -1

op.Input.setDirty((), _mod_time=42)
assert op._previous_dirty_mod_time == 42
assert op._pending_dirty_mod_time == -1

op.Input.setDirty((), _mod_time=41)
assert op._previous_dirty_mod_time == 42
assert op._pending_dirty_mod_time == -1


def test_op_mod_time_chain(graph):
"""mod_time is propagated to all ops in the chain"""
op1 = MockOp(graph=graph)
op2 = MockOp(graph=graph)
op2.Input.connect(op1.Output)

assert op1._previous_dirty_mod_time == -1
assert op2._previous_dirty_mod_time == -1

for mod_time in (1, 2, 13, 42):
op1.Input.setDirty((), _mod_time=mod_time)
assert op1._previous_dirty_mod_time == mod_time
assert op2._previous_dirty_mod_time == mod_time
assert op1._pending_dirty_mod_time == -1
assert op2._pending_dirty_mod_time == -1


def test_op_mod_time_source(graph):
"""setDirty on Input should always modify OPs ._previous_dirty_mod_time"""
op = MockOp(graph=graph)

assert op._previous_dirty_mod_time == -1
assert op._pending_dirty_mod_time == -1

op.Input.setDirty(())

assert op._previous_dirty_mod_time > -1


def test_op_output_dirty(graph):
"""setDirty on output should not modify OPs ._previous_dirty_mod_time"""
op = MockOp(graph=graph)

assert op._previous_dirty_mod_time == -1

op.Output.setDirty(())

assert op._previous_dirty_mod_time == -1


class SourceOp(Operator):
Input = InputSlot(value=(10,))

Output1 = OutputSlot()
Output2 = OutputSlot()

def setupOutputs(self):
self.Output1.meta.assignFrom(self.Input.meta)
self.Output2.meta.assignFrom(self.Input.meta)

def propagateDirty(self, slot, subindex, roi):
self.Output1.setDirty()
self.Output2.setDirty()

def execute(self, slot, subindex, roi):
pass


class AllDirtyOp(Operator):
Input1 = InputSlot()
Input2 = InputSlot()

Output = OutputSlot()

def setupOutputs(self):
self.Output.meta.assignFrom(self.Input1.meta)

def propagateDirty(self, slot, subindex, roi):
self.propagateDirtyIfNewModTime()

def execute(self, slot, subindex, roi):
pass


def test_op_all_dirty(graph):
"""using propagateDirtyIfNewModTime prevents subsequent dirty-prop with same mod_time"""
op_source = SourceOp(graph=graph)

op_all_dirty = AllDirtyOp(graph=graph)

dirty_cb = mock.Mock()
op_all_dirty.Output.notifyDirty(dirty_cb)

op_all_dirty.Input1.connect(op_source.Output1)
op_all_dirty.Input2.connect(op_source.Output2)

op_source.Input.setDirty((), _mod_time=42)

dirty_cb.assert_called_once()
assert op_all_dirty._previous_dirty_mod_time == 42


class MockOpLastDirty(Operator):
Input = InputSlot(value=(10,))
Output = OutputSlot()

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._previous_dirty_mod_time = 13

def setupOutputs(self):
self.Output.meta.assignFrom(self.Input.meta)

def propagateDirty(self, slot, subindex, roi):
assert self._previous_dirty_mod_time == 13
self.Output.setDirty(())

def execute(self, slot, subindex, roi):
pass


def test_previous_dirty_mod_time_set(graph):
op = MockOpLastDirty(graph=graph)
assert op._pending_dirty_mod_time == -1
assert op._previous_dirty_mod_time == 13

op.Input.setDirty((), _mod_time=42)
assert op._pending_dirty_mod_time == -1
assert op._previous_dirty_mod_time == 42


class MockOpLastDirtyEx(MockOpLastDirty):
def propagateDirty(self, slot, subindex, roi):
assert self._previous_dirty_mod_time == 13
self.Output.setDirty(())
raise ValueError()


def test_previous_dirty_mod_time_ex(graph):
op = MockOpLastDirtyEx(graph=graph)
assert op._pending_dirty_mod_time == -1
assert op._previous_dirty_mod_time == 13

with pytest.raises(ValueError):
op.Input.setDirty((), _mod_time=42)

assert op._pending_dirty_mod_time == -1
assert op._previous_dirty_mod_time == 42

0 comments on commit fd15ff5

Please sign in to comment.