Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIU-257 Skipped and cancelled drops fliing reproducibility data #161

Merged
merged 10 commits into from
Jun 30, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ def write_outfile(data, outfilepath, outfilesuffix="summary", verbose=False):
writer.writerow(fieldnames)

for filepath, signature_data in data.items():
if signature_data == {}:
continue
row = [filepath] + [signature_data[rmode.value] for rmode in ALL_RMODES]
writer.writerow(row)
if verbose:
Expand Down
9 changes: 4 additions & 5 deletions daliuge-common/dlg/common/reproducibility/reproducibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,13 @@ def accumulate_lg_drop_data(drop: dict, level: ReproducibilityFlags):
raise NotImplementedError(
f"Reproducibility level {level.name} not yet supported"
)
category_type = drop.get(
"categoryType", ""
) # Made conditional to support older graphs
category = drop.get("category", "")

# Cheeky way to get field list into dicts. map(dict, drop...) makes a copy
fields = {e.pop("name"): e["value"] for e in map(dict, drop["fields"])}
lg_fields = lg_block_fields(category_type, category, level)
fields = {e.pop("name"): e["value"] for e in map(dict, drop.get("fields", {}))}
app_fields = {e.pop("name"): e["value"] for e in map(dict, drop.get("applicationArgs", {}))}
fields.update(app_fields)
lg_fields = lg_block_fields(category, level, app_fields.keys())
data = extract_fields(fields, lg_fields)
return data

Expand Down
37 changes: 18 additions & 19 deletions daliuge-common/dlg/common/reproducibility/reproducibility_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,29 @@ def lgt_block_fields(rmode: ReproducibilityFlags):
return data


def lg_block_fields(
category: Categories, category_type: str, rmode: ReproducibilityFlags
):
def lg_block_fields(category_type: str, rmode: ReproducibilityFlags, custom_fields=None):
"""
Collects dict of fields and operations for all drop types at the lg layer for
the supplied reproducibility standard.
:param category: The broad type of drop
:param category_type: The specific type of drop
:param rmode: The reproducibility level in question
:param custom_fields: Additional application args (used in custom components)
:return: Dictionary of <str, FieldOp> pairs
"""
data = {}
if rmode in (
ReproducibilityFlags.NOTHING,
ReproducibilityFlags.RERUN,
ReproducibilityFlags.REPRODUCE,
ReproducibilityFlags.REPLICATE_SCI,
ReproducibilityFlags.NOTHING,
ReproducibilityFlags.RERUN,
ReproducibilityFlags.REPRODUCE,
ReproducibilityFlags.REPLICATE_SCI,
):
return data
# Drop category considerations
if category == "Application":
data["execution_time"] = FieldOps.STORE
data["num_cpus"] = FieldOps.STORE
elif category == "Group":
data["inputApplicationName"] = FieldOps.STORE
data["inputApplicationType"] = FieldOps.STORE
elif category == Categories.DATA: # An anomaly, I know
data["data_volume"] = FieldOps.STORE
# Drop category considerations - Just try to get everything we can, will be filtered later
data["execution_time"] = FieldOps.STORE
data["num_cpus"] = FieldOps.STORE
data["inputApplicationName"] = FieldOps.STORE
data["inputApplicationType"] = FieldOps.STORE
data["data_volume"] = FieldOps.STORE

# Drop type considerations
if category_type == Categories.START:
Expand All @@ -126,8 +121,8 @@ def lg_block_fields(
elif category_type == Categories.FILE:
data["check_filepath_exists"] = FieldOps.STORE
if rmode in (
ReproducibilityFlags.RECOMPUTE,
ReproducibilityFlags.REPLICATE_COMP,
ReproducibilityFlags.RECOMPUTE,
ReproducibilityFlags.REPLICATE_COMP,
):
data["filepath"] = FieldOps.STORE
data["dirname"] = FieldOps.STORE
Expand Down Expand Up @@ -188,6 +183,10 @@ def lg_block_fields(
data["libpath"] = FieldOps.STORE
elif category_type == Categories.DYNLIB_PROC_APP:
data["libpath"] = FieldOps.STORE
if custom_fields is not None and rmode in (
ReproducibilityFlags.RECOMPUTE, ReproducibilityFlags.REPLICATE_COMP):
for name in custom_fields:
data[name] = FieldOps.STORE
return data


Expand Down
8 changes: 7 additions & 1 deletion daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1139,12 +1139,14 @@ def cancel(self):
if self.status in [DROPStates.INITIALIZED, DROPStates.WRITING]:
self._closeWriters()
self.status = DROPStates.CANCELLED
self.completedrop()

def skip(self):
"""Moves this drop to the SKIPPED state closing any writers we opened"""
if self.status in [DROPStates.INITIALIZED, DROPStates.WRITING]:
self._closeWriters()
self.status = DROPStates.SKIPPED
self.completedrop()

@property
def node(self):
Expand Down Expand Up @@ -1851,7 +1853,11 @@ def dataURL(self) -> str:
def generate_reproduce_data(self):
from .droputils import allDropContents

data = allDropContents(self, self.size)
data = b""
try:
data = allDropContents(self, self.size)
except Exception:
logger.debug("Could not read drop reproduce data")
return {"data_hash": common_hash(data)}


Expand Down
29 changes: 16 additions & 13 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ def handleEvent(self, evt):
"%d/%d drops filed reproducibility", self._completed, self._nexpected
)
if self._completed == self._nexpected:
logger.debug("Building Reproducibility BlockDAG")
init_runtime_repro_data(self._session._graph, self._session._graphreprodata)
self._session.reprostatus = True
self._session.write_reprodata()
if not self._session.reprostatus:
logger.debug("Building Reproducibility BlockDAG")
new_reprodata = init_runtime_repro_data(self._session._graph, self._session._graphreprodata).get("reprodata", {})
logger.debug("Reprodata for %s is %s", self._session.sessionId, json.dumps(new_reprodata))
self._session._graphreprodata = new_reprodata
self._session.reprostatus = True
self._session.write_reprodata()


class EndListener(object):
Expand Down Expand Up @@ -322,7 +325,7 @@ def deploy(self, completedDrops=[], event_listeners=[], foreach=None):
# in reality this particular session is managing nothing
status = self.status
if (self._graph and status != SessionStates.BUILDING) or (
not self._graph and status != SessionStates.PRISTINE
not self._graph and status != SessionStates.PRISTINE
):
raise InvalidSessionState(
"Can't deploy this session in its current status: %d" % (status)
Expand Down Expand Up @@ -492,7 +495,7 @@ def add_node_subscriptions(self, relationships):

# We are in the event receiver side
if (rel.rel in evt_consumer and rel.lhs is local_uid) or (
rel.rel in evt_producer and rel.rhs is local_uid
rel.rel in evt_producer and rel.rhs is local_uid
):
dropsubs[remote_uid].add(local_uid)

Expand All @@ -515,7 +518,7 @@ def append_reprodata(self, oid, reprodata):
if self._graph[oid].get("reprodata") is None:
return
if self._graph[oid]["reprodata"]["rmode"] == str(
ReproducibilityFlags.ALL.value
ReproducibilityFlags.ALL.value
):
drop_reprodata = reprodata.get("data", {})
drop_hashes = reprodata.get("merkleroot", {})
Expand Down Expand Up @@ -557,9 +560,9 @@ def end(self):

def getGraphStatus(self):
if self.status not in (
SessionStates.RUNNING,
SessionStates.FINISHED,
SessionStates.CANCELLED,
SessionStates.RUNNING,
SessionStates.FINISHED,
SessionStates.CANCELLED,
):
raise InvalidSessionState(
"The session is currently not running, cannot get graph status"
Expand Down Expand Up @@ -594,9 +597,9 @@ def cancel(self):
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
]
if drop.status not in (
DROPStates.ERROR,
DROPStates.COMPLETED,
DROPStates.CANCELLED,
DROPStates.ERROR,
DROPStates.COMPLETED,
DROPStates.CANCELLED,
):
drop.cancel()
self.status = SessionStates.CANCELLED
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/manager/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def test_reprodata_get(self):
self.assertEqual(
{"a": {"oid": "a", "storage": "Memory", "type": "plain"}}, response["graph"]
)
self.assertIsNone(response["reprodata"])
self.assertEqual({}, response["reprodata"])

def test_reprostatus_get(self):
# Test with reprodata
Expand Down