Skip to content

Commit

Permalink
Merge pull request #207 from ICRAR/liu-308-persist-flag
Browse files Browse the repository at this point in the history
LIU-308: Rename precious flag to persist
  • Loading branch information
juliancarrivick authored Oct 24, 2022
2 parents 42ab4fd + af90a56 commit e77aaa2
Show file tree
Hide file tree
Showing 25 changed files with 296 additions and 253 deletions.
6 changes: 3 additions & 3 deletions OpenAPI/tests/test.graph
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"collapsed": true,
"flipPorts": false,
"streaming": false,
"precious": false,
"persist": false,
"subject": null,
"expanded": false,
"readonly": true,
Expand Down Expand Up @@ -236,7 +236,7 @@
"collapsed": true,
"flipPorts": false,
"streaming": false,
"precious": false,
"persist": false,
"subject": null,
"expanded": false,
"readonly": true,
Expand Down Expand Up @@ -338,7 +338,7 @@
"collapsed": true,
"flipPorts": false,
"streaming": false,
"precious": false,
"persist": false,
"subject": null,
"expanded": false,
"readonly": true,
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/data/drops/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ class FileDROP(DataDROP, PathBasedDrop):
check_filepath_exists = dlg_bool_param("check_filepath_exists", False)

# Make sure files are not deleted by default and certainly not if they are
# marked as precious no matter what expireAfterUse said
# marked to be persisted no matter what expireAfterUse said
def __init__(self, *args, **kwargs):
if "precious" not in kwargs:
kwargs["precious"] = True
if kwargs["precious"] and "lifespan" not in kwargs:
if "persist" not in kwargs:
kwargs["persist"] = True
if kwargs["persist"] and "lifespan" not in kwargs:
kwargs["expireAfterUse"] = False
super().__init__(*args, **kwargs)

Expand Down
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/data/drops/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ class InMemoryDROP(DataDROP):

# Allow in-memory drops to be automatically removed by default
def __init__(self, *args, **kwargs):
if "precious" not in kwargs:
kwargs["precious"] = False
if "persist" not in kwargs:
kwargs["persist"] = False
if "expireAfterUse" not in kwargs:
kwargs["expireAfterUse"] = True
if kwargs["precious"]:
if kwargs["persist"]:
kwargs["expireAfterUse"] = False
super().__init__(*args, **kwargs)

Expand Down
14 changes: 7 additions & 7 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,10 @@ def __init__(self, oid, uid, **kwargs):
if "expectedSize" in kwargs and kwargs["expectedSize"]:
self._expectedSize = int(kwargs.pop("expectedSize"))

# No DROP is precious unless stated otherwise; used for replication
self._precious = self._popArg(kwargs, "precious", False)
# If DROP is precious, don't expire (delete) it.
if self._precious:
# No DROP should be persisted unless stated otherwise; used for replication
self._persist: bool = self._popArg(kwargs, "persist", False)
# If DROP should be persisted, don't expire (delete) it.
if self._persist:
self._expireAfterUse = False

# Useful to have access to all EAGLE parameters without a prior knowledge
Expand Down Expand Up @@ -798,11 +798,11 @@ def size(self, size):
self._size = size

@property
def precious(self):
def persist(self):
"""
Whether this DROP should be considered as 'precious' or not
Whether this DROP should be considered persisted after completion
"""
return self._precious
return self._persist

@property
def status(self):
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def copyDropContents(source: DataDROP, target: DataDROP, bufsize=65536):
return


def getUpstreamObjects(drop):
def getUpstreamObjects(drop: AbstractDROP):
"""
Returns a list of all direct "upstream" DROPs for the given+
DROP. An DROP A is "upstream" with respect to DROP B if
Expand All @@ -179,7 +179,7 @@ def getUpstreamObjects(drop):
In practice if A is an upstream DROP of B means that it must be moved
to the COMPLETED state before B can do so.
"""
upObjs = []
upObjs: list[AbstractDROP] = []
if isinstance(drop, AppDROP):
upObjs += drop.inputs
upObjs += drop.streamingInputs
Expand Down
18 changes: 15 additions & 3 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .common import Categories
from .ddap_protocol import DROPRel, DROPLinkType
from .drop import (
AbstractDROP,
ContainerDROP,
LINKTYPE_NTO1_PROPERTY,
LINKTYPE_1TON_APPEND_METHOD,
Expand Down Expand Up @@ -326,7 +327,7 @@ def createGraphFromDropSpecList(dropSpecList, session=None):

# We're done! Return the roots of the graph to the caller
logger.info("Calculating graph roots")
roots = []
roots: list[AbstractDROP] = []
for drop in drops.values():
if not droputils.getUpstreamObjects(drop):
roots.append(drop)
Expand All @@ -345,7 +346,7 @@ def _createData(dropSpec, dryRun=False, session=None):
module = importlib.import_module(".".join(parts[:-1]))
storageType = getattr(module, parts[-1])
else:
# Fall back to old behaviour or to FileDROP
# Fall back to old behaviour or to FileDROP
# if nothing else is specified
if "storage" in dropSpec:
storageType = STORAGE_TYPES[dropSpec["storage"]]
Expand Down Expand Up @@ -424,18 +425,29 @@ def _getIds(dropSpec):


def _getKwargs(dropSpec):
kwargs = dict(dropSpec)

REMOVE = [
"oid",
"uid",
"app",
"appclass",
]
kwargs = dict(dropSpec)
for kw in REMOVE:
if kw in kwargs:
del kwargs[kw]

RENAME = {
"precious": "persist",
}
for find, replace in RENAME.items():
if find in kwargs:
kwargs[replace] = kwargs[find]
del kwargs[find]

for name, spec in dropSpec.get("applicationArgs", dict()).items():
kwargs[name] = spec["value"]

return kwargs


Expand Down
17 changes: 9 additions & 8 deletions daliuge-engine/dlg/lifecycle/dlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
SDP_REQ-252, and the need for data tracing, provenance and access control,
SKA1-SYS_REQ-2821, SDP_REQ-255
3. Migration between storage layers, includes SDP_REQ-263
4. Replication/distribution including resilience (preciousness) support,
4. Replication/distribution including resilience (persistence) support,
incl. SKA1-SYS_REQ-2350, SDP_REQ-260 - 262
5. Retirement of expired (temporary) data, incl. SDP_REQ-256."
Expand Down Expand Up @@ -126,9 +126,10 @@

from . import registry
from .hsm import manager
from .hsm.store import AbstractStore
from .. import droputils
from ..ddap_protocol import DROPStates, DROPPhases, AppDROPStates
from ..drop import ContainerDROP
from ..drop import AbstractDROP, ContainerDROP

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -222,7 +223,7 @@ def __init__(self, check_period=0, cleanup_period=0, enable_drop_replication=Fal
# instead of _drops.itervalues() to get a full, thread-safe copy of the
# dictionary values. Maybe there's a better approach for thread-safety
# here
self._drops = {}
self._drops: dict[str, AbstractDROP] = {}

self._check_period = check_period
self._cleanup_period = cleanup_period
Expand Down Expand Up @@ -290,7 +291,7 @@ def expireCompletedDrops(self):

# Expire-after-use: mark as expired if all consumers
# are finished using this DROP
if not drop.precious and drop.expireAfterUse:
if not drop.persist and drop.expireAfterUse:
allDone = all(
c.execStatus in [AppDROPStates.FINISHED, AppDROPStates.ERROR]
for c in drop.consumers
Expand Down Expand Up @@ -482,8 +483,8 @@ def handleCompletedDrop(self, uid):
return

drop = self._drops[uid]
if drop.precious and self.isReplicable(drop):
logger.debug("Replicating %r because it's precious", drop)
if drop.persist and self.isReplicable(drop):
logger.debug("Replicating %r because it's marked to be persisted", drop)
try:
self.replicateDrop(drop)
except:
Expand Down Expand Up @@ -532,7 +533,7 @@ def replicateDrop(self, drop):
def getDropUids(self, drop):
return self._reg.getDropUids(drop)

def _replicate(self, drop, store):
def _replicate(self, drop: AbstractDROP, store: AbstractStore):

# Dummy, but safe, new UID
newUid = "uid:" + "".join(
Expand All @@ -546,7 +547,7 @@ def _replicate(self, drop, store):

# For the time being we manually copy the contents of the current DROP into it
newDrop = store.createDrop(
drop.oid, newUid, expectedSize=drop.size, precious=drop.precious
drop.oid, newUid, expectedSize=drop.size, persist=drop.persist
)
droputils.copyDropContents(drop, newDrop)

Expand Down
12 changes: 6 additions & 6 deletions daliuge-engine/test/lifecycle/test_dlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def test_dropCompleteTriggersReplication(self):
self.assertEqual(DROPPhases.SOLID, drop.phase)
self.assertEqual(2, len(manager.getDropUids(drop)))

# Try the same with a non-precious data object, it shouldn't be replicated
drop = FileDROP("oid:B", "uid:B1", expectedSize=1, precious=False)
# Try the same with a non-persisted data object, it shouldn't be replicated
drop = FileDROP("oid:B", "uid:B1", expectedSize=1, persist=False)
manager.addDrop(drop)
self._writeAndClose(drop)
self.assertEqual(DROPPhases.GAS, drop.phase)
Expand All @@ -93,7 +93,7 @@ def test_expiringNormalDrop(self):
def test_lostDrop(self):
with dlm.DataLifecycleManager(check_period=0.5) as manager:
drop = FileDROP(
"oid:A", "uid:A1", expectedSize=1, lifespan=10, precious=False
"oid:A", "uid:A1", expectedSize=1, lifespan=10, persist=False
)
manager.addDrop(drop)
self._writeAndClose(drop)
Expand All @@ -110,7 +110,7 @@ def test_lostDrop(self):
def test_cleanupExpiredDrops(self):
with dlm.DataLifecycleManager(check_period=0.5, cleanup_period=2) as manager:
drop = FileDROP(
"oid:A", "uid:A1", expectedSize=1, lifespan=1, precious=False
"oid:A", "uid:A1", expectedSize=1, lifespan=1, persist=False
)
manager.addDrop(drop)
self._writeAndClose(drop)
Expand Down Expand Up @@ -140,13 +140,13 @@ def test_expireAfterUse(self):
a = DirectoryContainer(
"a",
"a",
precious=False,
persist=False,
expireAfterUse=True,
dirname=tempfile.mkdtemp(),
)
b_dirname = tempfile.mkdtemp()
b = DirectoryContainer(
"b", "b", precious=False, expireAfterUse=False, dirname=b_dirname
"b", "b", persist=False, expireAfterUse=False, dirname=b_dirname
)
c = BarrierAppDROP("c", "c")
d = BarrierAppDROP("d", "d")
Expand Down
23 changes: 23 additions & 0 deletions daliuge-engine/test/test_graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,26 @@ def test_applicationArgs(self):
self.assertEqual(app.integer, True)
self.assertEqual(app.low, 34)
self.assertEqual(app.high, 3456)

def test_backwardsCompatibilityWithPreciousFlag(self):
"""
Ensure that precious is detected and exposed as the persist flag
"""
testCases = [
("precious", True),
("precious", False),
("persist", True),
("persist", False),
(None, False), # Default of False is specific to MemoryDrops
]
for key, value in testCases:
with self.subTest(key=key, value=value):
dropSpec = {"oid": "A", "type": "data", "storage": Categories.MEMORY}
if key is not None:
dropSpec[key] = value

graph = graph_loader.createGraphFromDropSpecList([dropSpec])
data = graph[0]
self.assertIsInstance(data, InMemoryDROP)
self.assertEqual(value, data.persist)
self.assertFalse(hasattr(data, 'precious'))
7 changes: 7 additions & 0 deletions daliuge-translator/dlg/dropmake/lg.graph.schema
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@
"expanded": {
"type": "boolean"
},
"precious": {
"type": "boolean",
"deprecated": true
},
"persist": {
"type": "boolean"
},
"inputApplicationName": {
"type": "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"showPorts": true,
"flipPorts": false,
"streaming": false,
"precious": false,
"persist": false,
"subject": null,
"selected": true,
"expanded": false,
Expand Down Expand Up @@ -124,7 +124,7 @@
"showPorts": false,
"flipPorts": false,
"streaming": false,
"precious": false,
"persist": false,
"subject": null,
"selected": false,
"expanded": false,
Expand Down
Loading

0 comments on commit e77aaa2

Please sign in to comment.