diff --git a/daliuge-engine/dlg/manager/session.py b/daliuge-engine/dlg/manager/session.py index 2ea5e307a..58debcfea 100644 --- a/daliuge-engine/dlg/manager/session.py +++ b/daliuge-engine/dlg/manager/session.py @@ -102,7 +102,9 @@ def handleEvent(self, evt): ) if self._completed == self._nexpected: logger.debug("Building Reproducibility BlockDAG") - init_runtime_repro_data(self._session._graph, self._session._graphreprodata) + self._session.graphreprodata = \ + init_runtime_repro_data(self._session._graph, self._session._graphreprodata)[ + "reprodata"] self._session.reprostatus = True self._session.write_reprodata() @@ -322,7 +324,7 @@ def deploy(self, completedDrops=[], event_listeners=[], foreach=None): # in reality this particular session is managing nothing status = self.status if (self._graph and status != SessionStates.BUILDING) or ( - not self._graph and status != SessionStates.PRISTINE + not self._graph and status != SessionStates.PRISTINE ): raise InvalidSessionState( "Can't deploy this session in its current status: %d" % (status) @@ -492,7 +494,7 @@ def add_node_subscriptions(self, relationships): # We are in the event receiver side if (rel.rel in evt_consumer and rel.lhs is local_uid) or ( - rel.rel in evt_producer and rel.rhs is local_uid + rel.rel in evt_producer and rel.rhs is local_uid ): dropsubs[remote_uid].add(local_uid) @@ -515,7 +517,7 @@ def append_reprodata(self, oid, reprodata): if self._graph[oid].get("reprodata") is None: return if self._graph[oid]["reprodata"]["rmode"] == str( - ReproducibilityFlags.ALL.value + ReproducibilityFlags.ALL.value ): drop_reprodata = reprodata.get("data", {}) drop_hashes = reprodata.get("merkleroot", {}) @@ -557,9 +559,9 @@ def end(self): def getGraphStatus(self): if self.status not in ( - SessionStates.RUNNING, - SessionStates.FINISHED, - SessionStates.CANCELLED, + SessionStates.RUNNING, + SessionStates.FINISHED, + SessionStates.CANCELLED, ): raise InvalidSessionState( "The session is currently not running, cannot get graph status" @@ -594,9 +596,9 @@ def cancel(self): dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP) ] if drop.status not in ( - DROPStates.ERROR, - DROPStates.COMPLETED, - DROPStates.CANCELLED, + DROPStates.ERROR, + DROPStates.COMPLETED, + DROPStates.CANCELLED, ): drop.cancel() self.status = SessionStates.CANCELLED