Skip to content

Commit

Permalink
Merge branch 'LIU-368' of https://github.com/icrar/daliuge into LIU-368
Browse files Browse the repository at this point in the history
  • Loading branch information
myxie committed May 17, 2024
2 parents bf3a8fc + 6207121 commit b4f23de
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 50 deletions.
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/data_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
# @param persist False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param expireAfterUse True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution#
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
# @par EAGLE_END
class DataDROP(AbstractDROP):
Expand Down
17 changes: 12 additions & 5 deletions daliuge-engine/dlg/data/drops/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
# @param dropclass dlg.data.drops.file.FileDROP/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
# @param persist True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param expireAfterUse True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
Expand All @@ -70,12 +71,18 @@ class FileDROP(DataDROP, PathBasedDrop):
check_filepath_exists = dlg_bool_param("check_filepath_exists", False)
# is_dir = dlg_bool_param("is_dir", False)

# Make sure files are not deleted by default and certainly not if they are
# marked to be persisted no matter what expireAfterUse said
def __init__(self, *args, **kwargs):
if "persist" not in kwargs:
kwargs["persist"] = True
if kwargs["persist"] and "lifespan" not in kwargs:
"""
Initialise default drop behaviour when it is completed with the following rules:
- "expireAfterUse": Remove the data from the workspace once it has been used
by all consumers. This is independent of the "persist" flag. This is false
by default for FileDrops.
"""

# 'lifespan' and 'expireAfterUse' are mutually exclusive
if "lifespan" not in kwargs and "expireAfterUse" not in kwargs:
kwargs["expireAfterUse"] = False
self.is_dir = False
super().__init__(*args, **kwargs)
Expand Down
3 changes: 1 addition & 2 deletions daliuge-engine/dlg/data/drops/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def parse_pydata(pd_dict: dict) -> bytes:
# @param pydata None/String/ApplicationArgument/NoPort/ReadWrite//False/False/Data to be loaded into memory
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
# @param persist False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param expireAfterUse True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
Expand All @@ -103,8 +104,6 @@ def __init__(self, *args, **kwargs):
kwargs["persist"] = False
if "expireAfterUse" not in kwargs:
kwargs["expireAfterUse"] = True
if kwargs["persist"]:
kwargs["expireAfterUse"] = False
super().__init__(*args, **kwargs)

def initialize(self, **kwargs):
Expand Down
12 changes: 7 additions & 5 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,14 @@ def __init__(self, oid, uid, **kwargs):
"but they are mutually exclusive" % (self,),
)

self._expireAfterUse = self._popArg(kwargs, "expireAfterUse", False)
# If expireAfterUse is set by the user to be False, we do not want to initiate
# a timeout using lifespan, so we set the default for expireAfterUse to None
self._expireAfterUse = self._popArg(kwargs, "expireAfterUse", None)

# We only initiate the lifespan if the expireAfterUse flag has not been specified
# as an argument on the Drop.
self._expirationDate = -1
if not self._expireAfterUse:
if self._expireAfterUse is None:
lifespan = float(self._popArg(kwargs, "lifespan", -1))
if lifespan != -1:
self._expirationDate = time.time() + lifespan
Expand All @@ -367,9 +372,6 @@ def __init__(self, oid, uid, **kwargs):

# No DROP should be persisted unless stated otherwise; used for replication
self._persist: bool = self._popArg(kwargs, "persist", False)
# If DROP should be persisted, don't expire (delete) it.
if self._persist:
self._expireAfterUse = False

# Useful to have access to all EAGLE parameters without a prior knowledge
self._parameters = dict(kwargs)
Expand Down
13 changes: 11 additions & 2 deletions daliuge-engine/dlg/lifecycle/dlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,16 @@ def deleteExpiredDrops(self):
if drop.status == DROPStates.EXPIRED:
self._deleteDrop(drop)

def expireCompletedDrops(self):
def expireCompletedDrops(self) -> None:
"""
Drops that have 'expireAfterUse' argument specified should be removed
when the drop has completed execution.
Note: This operation occurs independently of the 'persist' argument.
Returns:
None
"""
now = time.time()
for drop in self._drops.values():

Expand All @@ -290,7 +299,7 @@ def expireCompletedDrops(self):

# Expire-after-use: mark as expired if all consumers
# are finished using this DROP
if not drop.persist and drop.expireAfterUse:
if drop.expireAfterUse:
allDone = all(
c.execStatus
in [AppDROPStates.FINISHED, AppDROPStates.ERROR]
Expand Down
133 changes: 97 additions & 36 deletions daliuge-engine/test/lifecycle/test_dlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dlg.apps.app_base import BarrierAppDROP
from dlg.data.drops.directorycontainer import DirectoryContainer
from dlg.data.drops.file import FileDROP
from dlg.data.drops.memory import InMemoryDROP
from dlg.droputils import DROPWaiterCtx
from dlg.lifecycle import dlm

Expand All @@ -61,7 +62,15 @@ def test_dropAddition(self):

def test_dropCompleteTriggersReplication(self):
with dlm.DataLifecycleManager(enable_drop_replication=True) as manager:
drop = FileDROP("oid:A", "uid:A1", expectedSize=1)

# By default a file is non-persistent
drop = FileDROP("oid:B", "uid:B1", expectedSize=1)
manager.addDrop(drop)
self._writeAndClose(drop)
self.assertEqual(DROPPhases.GAS, drop.phase)
self.assertEqual(1, len(manager.getDropUids(drop)))

drop = FileDROP("oid:A", "uid:A1", expectedSize=1, persist=True)
manager.addDrop(drop)
self._writeAndClose(drop)

Expand All @@ -70,12 +79,6 @@ def test_dropCompleteTriggersReplication(self):
self.assertEqual(DROPPhases.SOLID, drop.phase)
self.assertEqual(2, len(manager.getDropUids(drop)))

# Try the same with a non-persisted data object, it shouldn't be replicated
drop = FileDROP("oid:B", "uid:B1", expectedSize=1, persist=False)
manager.addDrop(drop)
self._writeAndClose(drop)
self.assertEqual(DROPPhases.GAS, drop.phase)
self.assertEqual(1, len(manager.getDropUids(drop)))

def test_expiringNormalDrop(self):
with dlm.DataLifecycleManager(check_period=0.5) as manager:
Expand Down Expand Up @@ -130,55 +133,113 @@ def test_cleanupExpiredDrops(self):
self.assertEqual(DROPStates.DELETED, drop.status)
self.assertFalse(drop.exists())

def test_expireAfterUse(self):
def test_expireAfterUseForFile(self):
"""
Test default and non-default behaviour for the expireAfterUse flag
for file drops.
Default: expireAfterUse=False, so the drop should still exist after it
has been consumed.
Non-default: expiredAfterUse=True, so the drop will be expired and
deleted after it is consumed.
"""

class MyApp(BarrierAppDROP):
def run(self):
pass

with dlm.DataLifecycleManager(check_period=0.5, cleanup_period=2) as manager:

# Check default
default_fp, default_name = tempfile.mkstemp()
default = FileDROP(
"a",
"a",
filepath=default_name
)

expired_fp, expired_name = tempfile.mkstemp()
expired = FileDROP(
"b",
"b",
filepath=expired_name,
expireAfterUse=True # Remove the file after use
)
c = MyApp("c", "c")
d = MyApp("d", "d")
default.addConsumer(c)
default.addConsumer(d)
expired.addConsumer(c)
expired.addConsumer(d)

manager.addDrop(default)
manager.addDrop(expired)
manager.addDrop(expired)
manager.addDrop(c)

# Make sure all consumers are done
with DROPWaiterCtx(self, [c, d], 1):
default.setCompleted()
expired.setCompleted()

# Both directories should be there, but after cleanup B's shouldn't
# be there anymore
self.assertTrue(default.exists())
self.assertTrue(expired.exists())
time.sleep(2.5)
self.assertTrue(default.exists())
self.assertFalse(expired.exists())
default.delete()

def test_expireAfterUseForMemory(self):
"""
Simple test for the expireAfterUse flag. Two DROPs are created with
different values, and after they are used we check whether their data
is still there or not
Default: expireAfterUse=True, so the drop should not exist after it
has been consumed.
Non-default: expiredAfterUse=False, so the drop will be not be expired
after it is consumed.
"""

class MyApp(BarrierAppDROP):
def run(self):
pass

with dlm.DataLifecycleManager(check_period=0.5, cleanup_period=2) as manager:
a = DirectoryContainer(

# Check default behaviour - deleted for memory drops
default = InMemoryDROP(
"a",
"a",
persist=False,
expireAfterUse=True,
dirname=tempfile.mkdtemp(),
)
b_dirname = tempfile.mkdtemp()
b = DirectoryContainer(

# Non-default behaviour - memory is not deleted
non_expired = InMemoryDROP(
"b",
"b",
persist=False,
expireAfterUse=False,
dirname=b_dirname,
expireAfterUse=False
)
c = MyApp("c", "c")
d = MyApp("d", "d")
a.addConsumer(c)
a.addConsumer(d)
b.addConsumer(c)
b.addConsumer(d)

manager.addDrop(a)
manager.addDrop(b)
manager.addDrop(b)
default.addConsumer(c)
default.addConsumer(d)
non_expired.addConsumer(c)
non_expired.addConsumer(d)

manager.addDrop(default)
manager.addDrop(non_expired)
manager.addDrop(non_expired)
manager.addDrop(c)

# Make sure all consumers are done
with DROPWaiterCtx(self, [c, d], 1):
a.setCompleted()
b.setCompleted()
default.setCompleted()
non_expired.setCompleted()

# Both directories should be there, but after cleanup A's shouldn't
# Both directories should be there, but after cleanup B's shouldn't
# be there anymore
self.assertTrue(a.exists())
self.assertTrue(b.exists())
self.assertTrue(default.exists())
self.assertTrue(non_expired.exists())
time.sleep(2.5)
self.assertFalse(a.exists())
self.assertTrue(b.exists())
b.delete()
self.assertFalse(default.exists())
self.assertTrue(non_expired.exists())
non_expired.delete()

0 comments on commit b4f23de

Please sign in to comment.