Skip to content

Commit

Permalink
Merge pull request #2848 from btbest/remove-transaction
Browse files Browse the repository at this point in the history
Remove transaction
  • Loading branch information
btbest committed Apr 22, 2024
2 parents 60b582f + 4580a98 commit 723a458
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 201 deletions.
85 changes: 4 additions & 81 deletions lazyflow/graph.py
Expand Up @@ -19,103 +19,32 @@
# This information is also available on the ilastik web site at:
# http://ilastik.org/license/
###############################################################################
"""
This module implements the basic flow graph
of the lazyflow module.
Basic usage example:
---
import numpy
import lazyflow.graph
from lazyflow.operators.operators import OpArrayPiper
g = lazyflow.graph.Graph()
operator1 = OpArrayPiper(graph=g)
operator2 = OpArrayPiper(graph=g)
operator1.inputs["Input"].setValue(numpy.zeros((10,20,30), dtype=numpy.uint8))
operator2.inputs["Input"].connect(operator1.outputs["Output"])
result = operator2.outputs["Output"][:].wait()
---
"""

# Python
import sys
import copy
import functools
import collections
import itertools
import threading
import logging
from lazyflow.utility import OrderedSignal

logger = logging.getLogger(__name__)

# third-party
import psutil

if int(psutil.__version__.split(".")[0]) < 1 and int(psutil.__version__.split(".")[1]) < 3:
msg = "Lazyflow: Please install a psutil python module version of at least >= 0.3.0"
sys.stderr.write(msg)
logger.error(msg)
sys.exit(1)

# SciPy
import numpy

# lazyflow
from lazyflow import rtype
from lazyflow.request import Request
from lazyflow.stype import ArrayLike
from lazyflow.utility import slicingtools, Tracer, OrderedSignal, Singleton
from lazyflow.utility import slicingtools, Tracer, Singleton
from lazyflow.slot import InputSlot, OutputSlot, Slot
from lazyflow.operator import Operator, InputDict, OutputDict, OperatorMetaClass
from lazyflow.operatorWrapper import OperatorWrapper
from lazyflow.metaDict import MetaDict

logger = logging.getLogger(__name__)


class Graph:
"""
A Graph instance is shared by all connected operators and contains any
bookkeeping or globally accessible state needed by all operators/slots in the graph.
"""

class Transaction:
def __init__(self):
self._deferred_callbacks = None

@property
def active(self):
return self._deferred_callbacks is not None

def on_exit(self, fn):
assert self.active, "Cannot register callbacks on inactive transaction"
if fn in self._deferred_callbacks:
return
else:
self._deferred_callbacks.append(fn)

def __enter__(self):
assert not self.active, "Nested transactions are not supported"
self._deferred_callbacks = []

def __exit__(self, *args, **kw):
try:
for cb in self._deferred_callbacks:
cb()
finally:
self._deferred_callbacks = None

def __init__(self):
self._setup_depth = 0
self._sig_setup_complete = None
self._lock = threading.Lock()
self.transaction = self.Transaction()

def call_when_setup_finished(self, fn):
# The graph is considered in "setup" mode if any slot is executing a function that affects the state of the graph.
Expand All @@ -136,12 +65,6 @@ def call_when_setup_finished(self, fn):
# Subscribe to the next completion.
self._sig_setup_complete.subscribe(fn)

def maybe_call_within_transaction(self, fn):
if self.transaction.active:
self.transaction.on_exit(fn)
else:
fn()

class SetupDepthContext(object):
"""
A context manager to manage the "depth" of a setup operation.
Expand Down
9 changes: 0 additions & 9 deletions lazyflow/operator.py
Expand Up @@ -188,15 +188,6 @@ class Operator(metaclass=OperatorMetaClass):
inputs: InputDict
outputs: OutputDict

@property
def transaction(self):
"""
Create transaction for this operation deferring setupOutputs call
until transaction is finished
:returns: Transaction context manager
"""
return self.graph.transaction

def __new__(cls, *args, **kwargs):
##
# before __init__
Expand Down
12 changes: 1 addition & 11 deletions lazyflow/slot.py
Expand Up @@ -1335,17 +1335,7 @@ def _getInstance(self, operator, **init_kwarg_overrides):
s = OutputSlot(self.name, operator, **init_kwargs)
return s

def maybe_call_within_transaction(self, fn):
if self.graph:
self.graph.maybe_call_within_transaction(fn)
else:
fn()

def _changed(self):
self.maybe_call_within_transaction(self._changed_impl)

def _changed_impl(self):
oldMeta = self.meta
old_ready = self.ready()
if self.upstream_slot is not None and self.meta != self.upstream_slot.meta:
self.meta = self.upstream_slot.meta.copy()
Expand Down Expand Up @@ -1388,7 +1378,7 @@ def _configureOperator(self, slot, oldSize=0, newSize=0, notify=True):
"""
if self.operator is not None:
# check whether all slots are connected and notify operator
self.maybe_call_within_transaction(self.operator._setupOutputs)
self.operator._setupOutputs()

def _setupOutputs(self):
""""""
Expand Down
79 changes: 0 additions & 79 deletions tests/test_lazyflow/test_operators/testOperatorInterface.py
Expand Up @@ -680,85 +680,6 @@ def propagateDirty(self, slot, roid, index):
pass


class TestTransaction:
def testTransactionMultipleSetsOnSameSlot(self):
g = graph.Graph()
op = TransactionOp(graph=g)

with op.transaction:
op.Input1.setValue("val1")
op.Input1.setValue("val2")

op.setupOutputs.assert_called_once()

def testTransactionSetMultipleSlots(self):
input1, input2 = None, None

def fetch_values(self, *args, **kwargs):
nonlocal input1, input2
input1 = self.Input1.value
input2 = self.Input2.value

g = graph.Graph()
op = TransactionOp(graph=g)

setup_mock = mock.Mock()
setup_mock.side_effect = fetch_values

op.setupOutputs = types.MethodType(setup_mock, op)

with op.transaction:
op.Input1.setValue("val1")
op.Input2.setValue("val2")

op.setupOutputs.assert_called_once()
assert input1 == "val1"
assert input2 == "val2"

def testNestedTransactionFails(self):
g = graph.Graph()
op = TransactionOp(graph=g)

with op.transaction:
op.Input1.setValue("val1")

with pytest.raises(AssertionError):
with op.transaction:
op.Input2.setValue("val2")

def test_chain(self):
class OpA(graph.Operator):
Input = graph.InputSlot() # required slot

def setupOutputs(self):
pass

def propagateDirty(self, *a, **kw):
pass

class OpB(graph.Operator):
Input = graph.InputSlot() # required slot
Output = graph.OutputSlot()

setupOutputs = mock.Mock()

def propagateDirty(self, *a, **kw):
pass

g = graph.Graph()

op_a = OpA(graph=g)
op_b = OpB(graph=g)

op_b.Input.connect(op_a.Input)

with op_a.transaction:
op_a.Input.setValue("fadf")
op_b.setupOutputs.assert_not_called()

op_b.setupOutputs.assert_called_once()


class TestCompatibilityChecks:
class OpA(graph.Operator):
Output = graph.OutputSlot(stype=stype.ArrayLike)
Expand Down
21 changes: 0 additions & 21 deletions tests/test_lazyflow/test_operators/testOperatorWrapper.py
Expand Up @@ -174,24 +174,3 @@ def test_setValues(self):

assert wrappedCopier.Output[0].value == values[0]
assert wrappedCopier.Output[1].value == values[1]


class TestOperatorWrapperTransaction:
def test_transaction(self, graph):
class OpTest(Operator):
Input = InputSlot()
Output = OutputSlot()

setupOutputs = mock.Mock()

def propagateDirty(self, inputSlot, subindex, roi):
self.Output.setDirty(roi)

wrapped = OperatorWrapper(OpTest, graph=graph)
values = ["Subslot One", "Subslot Two"]

with wrapped.transaction:
wrapped.Input.setValues(values)
OpTest.setupOutputs.assert_not_called()

assert OpTest.setupOutputs.call_count == 2, "Should call setupOutputs for each lane on exit"

0 comments on commit 723a458

Please sign in to comment.