Skip to content

Commit

Permalink
Ensures graph reprodata is updated when built.
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed Jun 28, 2022
1 parent 144c9e9 commit 538223a
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ def handleEvent(self, evt):
)
if self._completed == self._nexpected:
logger.debug("Building Reproducibility BlockDAG")
init_runtime_repro_data(self._session._graph, self._session._graphreprodata)
self._session.graphreprodata = \
init_runtime_repro_data(self._session._graph, self._session._graphreprodata)[
"reprodata"]
self._session.reprostatus = True
self._session.write_reprodata()

Expand Down Expand Up @@ -322,7 +324,7 @@ def deploy(self, completedDrops=[], event_listeners=[], foreach=None):
# in reality this particular session is managing nothing
status = self.status
if (self._graph and status != SessionStates.BUILDING) or (
not self._graph and status != SessionStates.PRISTINE
not self._graph and status != SessionStates.PRISTINE
):
raise InvalidSessionState(
"Can't deploy this session in its current status: %d" % (status)
Expand Down Expand Up @@ -492,7 +494,7 @@ def add_node_subscriptions(self, relationships):

# We are in the event receiver side
if (rel.rel in evt_consumer and rel.lhs is local_uid) or (
rel.rel in evt_producer and rel.rhs is local_uid
rel.rel in evt_producer and rel.rhs is local_uid
):
dropsubs[remote_uid].add(local_uid)

Expand All @@ -515,7 +517,7 @@ def append_reprodata(self, oid, reprodata):
if self._graph[oid].get("reprodata") is None:
return
if self._graph[oid]["reprodata"]["rmode"] == str(
ReproducibilityFlags.ALL.value
ReproducibilityFlags.ALL.value
):
drop_reprodata = reprodata.get("data", {})
drop_hashes = reprodata.get("merkleroot", {})
Expand Down Expand Up @@ -557,9 +559,9 @@ def end(self):

def getGraphStatus(self):
if self.status not in (
SessionStates.RUNNING,
SessionStates.FINISHED,
SessionStates.CANCELLED,
SessionStates.RUNNING,
SessionStates.FINISHED,
SessionStates.CANCELLED,
):
raise InvalidSessionState(
"The session is currently not running, cannot get graph status"
Expand Down Expand Up @@ -594,9 +596,9 @@ def cancel(self):
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
]
if drop.status not in (
DROPStates.ERROR,
DROPStates.COMPLETED,
DROPStates.CANCELLED,
DROPStates.ERROR,
DROPStates.COMPLETED,
DROPStates.CANCELLED,
):
drop.cancel()
self.status = SessionStates.CANCELLED
Expand Down

0 comments on commit 538223a

Please sign in to comment.