Skip to content

Commit

Permalink
Adds subtlety to how replicate_sci/total work - non-data internal nod…
Browse files Browse the repository at this point in the history
…es still append their reprodata at the physical level.
  • Loading branch information
pritchardn committed Jun 17, 2022
1 parent 3a112a4 commit 4828312
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 18 deletions.
28 changes: 20 additions & 8 deletions daliuge-common/dlg/common/reproducibility/reproducibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,16 +674,16 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
for neighbour in neighbourset[did]:
dropset[neighbour][1] -= 1
parenthash = {}
# WARNING: Hack! may break later, proceed with caution
if level is None:
category = dropset[did][0]["reprodata"]["lgt_data"]["category"]
else:
category = dropset[did][0]["reprodata"][rmode.name]["lgt_data"][
"category"
]
if rmode != ReproducibilityFlags.NOTHING:
if rmode in [ReproducibilityFlags.REPRODUCE, ReproducibilityFlags.REPLICATE_SCI,
ReproducibilityFlags.REPLICATE_TOTAL]:
# WARNING: Hack! may break later, proceed with caution
if level is None:
category = dropset[did][0]["reprodata"]["lgt_data"]["category"]
else:
category = dropset[did][0]["reprodata"][rmode.name]["lgt_data"][
"category"
]
if (
category in STORAGE_TYPES
and (dropset[did][1] == 0 or dropset[did][2] == 0)
Expand All @@ -709,7 +709,8 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
parenthash.update(
dropset[did][0]["reprodata"][level.name][parentstr]
)
if rmode not in [ReproducibilityFlags.REPRODUCE]:
if rmode not in [ReproducibilityFlags.REPRODUCE, ReproducibilityFlags.REPLICATE_SCI,
ReproducibilityFlags.REPLICATE_TOTAL]:
if level is None:
parenthash[did] = dropset[did][0]["reprodata"][
blockstr + "_blockhash"
Expand All @@ -719,6 +720,17 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
blockstr + "_blockhash"
]
# Add our new hash to the parent-hash list if on the critical path
if rmode in [ReproducibilityFlags.REPLICATE_SCI,
ReproducibilityFlags.REPLICATE_TOTAL]:
if category not in STORAGE_TYPES:
if level is None:
parenthash[did] = dropset[did][0]["reprodata"][
blockstr + "_blockhash"
]
else:
parenthash[did] = dropset[did][0]["reprodata"][level.name][
blockstr + "_blockhash"
]
if rmode == ReproducibilityFlags.RERUN:
if "iid" in dropset[did][0].keys():
if (
Expand Down
20 changes: 10 additions & 10 deletions daliuge-engine/test/reproducibility/test_pg_blockdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,9 @@ def test_pg_blockdag_twostart(self):
pgr = _init_pgraph_twostart(self.rmode)
leaves = build_blockdag(pgr, "pg")[0]
parenthashes = list(pgr[1]["reprodata"]["pg_parenthashes"].values())
self.assertTrue(
len(leaves) == 1
and len(parenthashes) == 2
and parenthashes[0] == parenthashes[1]
)
self.assertTrue(len(leaves) == 1)
self.assertTrue(len(parenthashes) == 2)
self.assertTrue(parenthashes[0] == parenthashes[1])

def test_pg_blockdag_twoend(self):
"""
Expand Down Expand Up @@ -691,10 +689,9 @@ def test_data_sandwich(self):
"""
pgr = _init_pgraph_data_sandwich(self.rmode)
build_blockdag(pgr, "pg")
sourcehash = pgr[1]["reprodata"]["pg_blockhash"]
sourcehash = pgr[0]["reprodata"]["pg_blockhash"]
parenthashes = list(pgr[2]["reprodata"]["pg_parenthashes"].values())
self.assertTrue(sourcehash in parenthashes)
print(parenthashes)
self.assertTrue(len(parenthashes) > 0)


Expand Down Expand Up @@ -952,7 +949,7 @@ def test_data_fan(self):
for rmode in ALL_RMODES:
build_blockdag(pgr, "pg", rmode)
if rmode in [ReproducibilityFlags.REPRODUCE, ReproducibilityFlags.REPLICATE_SCI,
ReproducibilityFlags.REPLICATE_TOTAL]:
ReproducibilityFlags.REPLICATE_TOTAL]:
sourcehash = pgr[0]["reprodata"][rmode.name]["pg_blockhash"]
else:
sourcehash = pgr[1]["reprodata"][rmode.name]["pg_blockhash"]
Expand Down Expand Up @@ -1017,9 +1014,12 @@ def test_computation_sandwich(self):
parenthashes = list(
pgr[2]["reprodata"][rmode.name]["pg_parenthashes"].values()
)
if rmode != ReproducibilityFlags.REPRODUCE:
if rmode not in [ReproducibilityFlags.REPRODUCE, ReproducibilityFlags.REPLICATE_SCI,
ReproducibilityFlags.REPLICATE_TOTAL]:
self.assertTrue(
sourcehash in parenthashes)
self.assertTrue(len(parenthashes) > 0)
else:
if rmode in [ReproducibilityFlags.REPLICATE_SCI, ReproducibilityFlags.REPLICATE_TOTAL]:
self.assertTrue(len(parenthashes) > 0)
elif rmode == ReproducibilityFlags.REPRODUCE:
self.assertTrue(len(parenthashes) == 0)
1 change: 1 addition & 0 deletions daliuge-engine/test/reproducibility/test_toposort.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"reprodata": {
"rmode": "1",
"lg_blockhash": "123",
"lgt_data": {"category": "Component"},
"pgt_data": {"merkleroot": "456"},
"pgt_parenthashes": {},
"pgt_blockhash": "135",
Expand Down

0 comments on commit 4828312

Please sign in to comment.