Skip to content

Commit

Permalink
Final (hopefully) formatting run with black
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed May 6, 2022
1 parent 905de37 commit 0c31220
Show file tree
Hide file tree
Showing 37 changed files with 622 additions and 398 deletions.
3 changes: 2 additions & 1 deletion daliuge-common/dlg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
# set the version
try:
from dlg.common import version

__version__ = version.full_version
except:
# This can happen when running from source
__version__ = 'unknown'
__version__ = "unknown"
4 changes: 1 addition & 3 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,7 @@ def session_repro_status(self, sessionId):
"""
Returns the reproducibility status of session `sessionId`.
"""
status = self._get_json(
"/sessions/%s/repro/status" % (quote(sessionId),)
)
status = self._get_json("/sessions/%s/repro/status" % (quote(sessionId),))
logger.debug(
"Successfully read session %s reproducibility status (%s) from %s:%s",
sessionId,
Expand Down
13 changes: 7 additions & 6 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Categories:
Categories.PLASMA,
Categories.PLASMAFLIGHT,
Categories.PARSET,
Categories.ENVIRONMENTVARS
Categories.ENVIRONMENTVARS,
}
APP_DROP_TYPES = [
Categories.COMPONENT,
Expand Down Expand Up @@ -129,7 +129,7 @@ def _addSomething(self, other, key, IdText=None):
if key not in self:
self[key] = []
if other["oid"] not in self[key]:
append = {other["oid"]:IdText} if IdText else other["oid"]
append = {other["oid"]: IdText} if IdText else other["oid"]
self[key].append(append)

def addConsumer(self, other, IdText=None):
Expand All @@ -150,21 +150,22 @@ def addOutput(self, other, IdText=None):
def addProducer(self, other, IdText=None):
self._addSomething(other, "producers", IdText=IdText)


def _sanitize_links(links):
"""
Links can now be dictionaries, but we only need
the key.
"""
if isinstance(links,list):
if isinstance(links, list):
nlinks = []
for l in links:
if isinstance(l,dict): # could be a list of dicts
if isinstance(l, dict): # could be a list of dicts
nlinks.extend(list(l.keys()))
else:
nlinks.extend(l) if isinstance(l,list) else nlinks.append(l)
nlinks.extend(l) if isinstance(l, list) else nlinks.append(l)
return nlinks
elif isinstance(links, dict):
return list(links.keys()) if isinstance(links,dict) else links
return list(links.keys()) if isinstance(links, dict) else links


def get_roots(pg_spec):
Expand Down
2 changes: 1 addition & 1 deletion daliuge-common/dlg/common/reproducibility/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def rflag_caster(val, default=REPRO_DEFAULT):
out = ReproducibilityFlags(int(val))
except ValueError:
for rmode in ALL_RMODES:
if val == rmode.name or val == 'Reproducibility.'+rmode.name:
if val == rmode.name or val == "Reproducibility." + rmode.name:
out = rmode
return out
return default
Expand Down
54 changes: 30 additions & 24 deletions daliuge-common/dlg/common/reproducibility/reprodata_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import logging
import itertools

from dlg.common.reproducibility.constants import ALL_RMODES, rflag_caster, ReproducibilityFlags
from dlg.common.reproducibility.constants import (
ALL_RMODES,
rflag_caster,
ReproducibilityFlags,
)

logger = logging.getLogger(__name__)

Expand All @@ -47,7 +51,7 @@ def open_file(path: pathlib.Path):
"""
Opens the passed filepath, returns a dictionary of the contained rmode signatures
"""
with path.open('r', encoding='utf-8') as infile:
with path.open("r", encoding="utf-8") as infile:
data = json.load(infile)
if isinstance(data, list):
return data[-1]
Expand All @@ -58,7 +62,7 @@ def is_single(data):
"""
Determines if the passed reprodata contains several signatures, or a single signature.
"""
if data.get('rmode') == str(ReproducibilityFlags.ALL.value):
if data.get("rmode") == str(ReproducibilityFlags.ALL.value):
return False
return True

Expand All @@ -68,7 +72,7 @@ def process_single(data):
Processes reprodata containing a single signature.
Builds a small dictionary mapping the 'rmode' to the signature
"""
return {rflag_caster(data.get('rmode')).value: data.get('signature')}
return {rflag_caster(data.get("rmode")).value: data.get("signature")}


def process_multi(data):
Expand All @@ -79,7 +83,7 @@ def process_multi(data):
"""
out_data = {rmode.value: None for rmode in ALL_RMODES}
for rmode in ALL_RMODES:
out_data[rmode.value] = data.get(rmode.name, {}).get('signature')
out_data[rmode.value] = data.get(rmode.name, {}).get("signature")
return out_data


Expand All @@ -102,7 +106,7 @@ def process_directory(dirname: pathlib.Path):
Processes a directory assuming to contain reprodata.out file(s) referring to the same workflow.
"""
out_data = {}
for file in dirname.glob('*.out'):
for file in dirname.glob("*.out"):
new_data = process_file(file)
for rmode, sig in new_data.items():
if sig is not None:
Expand All @@ -119,8 +123,9 @@ def generate_comparison(data):
"""
outdata = {}
for combination in itertools.combinations(data.keys(), 2):
outdata[combination[0] + ':' + combination[1]] = compare_signatures(data[combination[0]],
data[combination[1]])
outdata[combination[0] + ":" + combination[1]] = compare_signatures(
data[combination[0]], data[combination[1]]
)
return outdata


Expand All @@ -140,10 +145,11 @@ def write_outfile(data, outfilepath, outfilesuffix="summary", verbose=False):
"""
Writes a dictionary to csv file.
"""
fieldnames = ['workflow'] + [rmode.name for rmode in ALL_RMODES]
with open(outfilepath + f'-{outfilesuffix}.csv', 'w+', newline='',
encoding='utf-8') as ofile:
writer = csv.writer(ofile, delimiter=',')
fieldnames = ["workflow"] + [rmode.name for rmode in ALL_RMODES]
with open(
outfilepath + f"-{outfilesuffix}.csv", "w+", newline="", encoding="utf-8"
) as ofile:
writer = csv.writer(ofile, delimiter=",")
writer.writerow(fieldnames)

for filepath, signature_data in data.items():
Expand All @@ -158,18 +164,18 @@ def write_comparison(data, outfilepath, verbose=False):
Writes comparison dictionary to csv file.
"""
if len(data) > 0:
write_outfile(data, outfilepath, 'comparison', verbose)
write_outfile(data, outfilepath, "comparison", verbose)


def write_outputs(data, comparisons, outfile_root='.', verbose=False):
def write_outputs(data, comparisons, outfile_root=".", verbose=False):
"""
Writes reprodata signatures for all workflows to a summary csv and comparison of these
signatures to a separate comparison csv.
"""
if verbose:
print(json.dumps(data, indent=4))
try:
write_outfile(data, outfile_root, outfilesuffix='summary', verbose=verbose)
write_outfile(data, outfile_root, outfilesuffix="summary", verbose=verbose)
except IOError:
logger.debug("Could not write summary csv")
try:
Expand Down Expand Up @@ -207,27 +213,27 @@ def _main(pathnames: list, outfilepath: str, verbose=False):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'filename',
"filename",
action="store",
default=None,
nargs='+',
nargs="+",
type=str,
help='The first filename or directory to access'
help="The first filename or directory to access",
)
parser.add_argument(
'-o',
'--outfile',
"-o",
"--outfile",
action="store",
default=".",
type=str,
help="Directory to write output files to"
help="Directory to write output files to",
)
parser.add_argument(
'-v',
'--verbose',
"-v",
"--verbose",
default=False,
action="store_true",
help="If set, will write output to standard out"
help="If set, will write output to standard out",
)
args = parser.parse_args()
_main(list(args.filename), args.outfile, args.verbose)
55 changes: 29 additions & 26 deletions daliuge-common/dlg/common/reproducibility/reproducibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def build_lg_block_data(drop: dict, rmode=None):
lg_hash = drop["reprodata"][rmode.name]["lg_data"]["merkleroot"]
block_data.append(lg_hash)
for parenthash in sorted(
drop["reprodata"][rmode.name]["lg_parenthashes"].values()
drop["reprodata"][rmode.name]["lg_parenthashes"].values()
):
block_data.append(parenthash)
mtree = MerkleTree(block_data, common_hash)
Expand Down Expand Up @@ -411,7 +411,7 @@ def build_pgt_block_data(drop: dict, rmode=None):
if "lg_blockhash" in drop["reprodata"][rmode.name]:
block_data.append(drop["reprodata"][rmode.name]["lg_blockhash"])
for parenthash in sorted(
drop["reprodata"][rmode.name]["pgt_parenthashes"].values()
drop["reprodata"][rmode.name]["pgt_parenthashes"].values()
):
block_data.append(parenthash)
mtree = MerkleTree(block_data, common_hash)
Expand Down Expand Up @@ -441,7 +441,7 @@ def build_pg_block_data(drop: dict, rmode=None):
drop["reprodata"][rmode.name]["lg_blockhash"],
]
for parenthash in sorted(
drop["reprodata"][rmode.name]["pg_parenthashes"].values()
drop["reprodata"][rmode.name]["pg_parenthashes"].values()
):
block_data.append(parenthash)
mtree = MerkleTree(block_data, common_hash)
Expand All @@ -467,14 +467,17 @@ def build_rg_block_data(drop: dict, rmode=None):
drop["reprodata"]["rg_blockhash"] = mtree.merkle_root
else:
import json

block_data = [
drop["reprodata"][rmode.name].get("rg_data", {"merkleroot": b''})["merkleroot"],
drop["reprodata"][rmode.name].get("rg_data", {"merkleroot": b""})[
"merkleroot"
],
drop["reprodata"][rmode.name]["pg_blockhash"],
drop["reprodata"][rmode.name]["pgt_blockhash"],
drop["reprodata"][rmode.name]["lg_blockhash"],
]
for parenthash in sorted(
drop["reprodata"][rmode.name]["rg_parenthashes"].values()
drop["reprodata"][rmode.name]["rg_parenthashes"].values()
):
block_data.append(parenthash)
mtree = MerkleTree(block_data, common_hash)
Expand Down Expand Up @@ -530,9 +533,11 @@ def lg_build_blockdag(logical_graph: dict, level=None):
parenthash = {}
if rmode != ReproducibilityFlags.NOTHING:
if rmode == ReproducibilityFlags.REPRODUCE:
if dropset[did][0]["category"] in STORAGE_TYPES and (
dropset[did][1] == 0 or dropset[did][2] == 0
) and (did in roots or did in leaves):
if (
dropset[did][0]["category"] in STORAGE_TYPES
and (dropset[did][1] == 0 or dropset[did][2] == 0)
and (did in roots or did in leaves)
):
# Add my new hash to the parent-hash list
if did not in parenthash:
if level is None:
Expand All @@ -557,9 +562,7 @@ def lg_build_blockdag(logical_graph: dict, level=None):
]
)
# parenthash.extend(dropset[did][0]['reprodata']['lg_parenthashes'])
if (
rmode != ReproducibilityFlags.REPRODUCE
): # Non-compressing behaviour
if rmode != ReproducibilityFlags.REPRODUCE: # Non-compressing behaviour
if level is None:
parenthash[did] = dropset[did][0]["reprodata"]["lg_blockhash"]
else:
Expand Down Expand Up @@ -640,7 +643,7 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
dropset[did][2] += 1
neighbourset[did].append(dest)
if (
"consumers" in drop
"consumers" in drop
): # There may be some bizarre scenario when a drop has both
for dest in drop["consumers"]:
if isinstance(dest, dict):
Expand Down Expand Up @@ -670,20 +673,22 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
if rmode == ReproducibilityFlags.REPRODUCE:
# WARNING: Hack! may break later, proceed with caution
if level is None:
category = dropset[did][0]["reprodata"]["lgt_data"][
"category"]
category = dropset[did][0]["reprodata"]["lgt_data"]["category"]
else:
category = dropset[did][0]["reprodata"][rmode.name]["lgt_data"][
"category"]
if category in STORAGE_TYPES and (
dropset[did][1] == 0 or dropset[did][2] == 0) and (
did in roots or did in leaves):
"category"
]
if (
category in STORAGE_TYPES
and (dropset[did][1] == 0 or dropset[did][2] == 0)
and (did in roots or did in leaves)
):
# Add my new hash to the parent-hash list
if did not in parenthash:
if level is None:
parenthash[did] = dropset[did][0]["reprodata"][
blockstr + "_blockhash"
]
]
else:
parenthash[did] = dropset[did][0]["reprodata"][
level.name
Expand All @@ -693,9 +698,7 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
else:
# Add my parenthashes to the parent-hash list
if level is None:
parenthash.update(
dropset[did][0]["reprodata"][parentstr]
)
parenthash.update(dropset[did][0]["reprodata"][parentstr])
else:
parenthash.update(
dropset[did][0]["reprodata"][level.name][parentstr]
Expand All @@ -704,16 +707,16 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
if level is None:
parenthash[did] = dropset[did][0]["reprodata"][
blockstr + "_blockhash"
]
]
else:
parenthash[did] = dropset[did][0]["reprodata"][level.name][
blockstr + "_blockhash"
]
]
# Add our new hash to the parent-hash list if on the critical path
if rmode == ReproducibilityFlags.RERUN:
if "iid" in dropset[did][0].keys():
if (
dropset[did][0]["iid"] == "0/0"
dropset[did][0]["iid"] == "0/0"
): # TODO: This is probably wrong
if level is None:
dropset[neighbour][0]["reprodata"][parentstr].update(
Expand Down Expand Up @@ -752,7 +755,7 @@ def build_blockdag(drops: list, abstraction: str = "pgt", level=None):
else:
leaves[i] = dropset[leaf][0]["reprodata"][level.name][
blockstr + "_blockhash"
]
]
return leaves, visited

# logger.info("BlockDAG Generated at" + abstraction + " level")
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/dlg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
# set the version
try:
from dlg.common import version

__version__ = version.full_version
except:
# This can happen when running from source
__version__ = 'unknown'
__version__ = "unknown"
Loading

0 comments on commit 0c31220

Please sign in to comment.