Skip to content

Commit

Permalink
Add folded mode to resubmit (#198)
Browse files Browse the repository at this point in the history
Signed-off-by: zjgemi <liuxin_zijian@163.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
zjgemi and pre-commit-ci[bot] committed Mar 16, 2024
1 parent 2583850 commit fa4a4cf
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 12 deletions.
7 changes: 7 additions & 0 deletions dpgen2/entrypoint/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ def main_parser() -> argparse.ArgumentParser:
action="store_true",
help="if set then keep schedule of the old workflow. otherwise use the schedule defined in the input file",
)
parser_resubmit.add_argument(
"-f",
"--fold",
action="store_true",
help="if set then super OPs are folded to be reused in the new workflow",
)

##########################################
# show key
Expand Down Expand Up @@ -339,6 +345,7 @@ def main():
list_steps=args.list,
reuse=args.reuse,
replace_scheduler=(not args.keep_schedule),
fold=args.fold,
)
elif args.command == "status":
with open(args.CONFIG) as fp:
Expand Down
3 changes: 2 additions & 1 deletion dpgen2/entrypoint/showkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def showkey(
global_config_workflow(wf_config)

wf = Workflow(id=wf_id)
all_step_keys = get_resubmit_keys(wf)
folded_keys = get_resubmit_keys(wf)
all_step_keys = sum(folded_keys.values(), [])
prt_str = print_keys_in_nice_format(
all_step_keys,
["run-train", "run-lmp", "run-fp"],
Expand Down
90 changes: 79 additions & 11 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import pickle
import re
from pathlib import (
Path,
)
Expand Down Expand Up @@ -732,32 +733,83 @@ def print_list_steps(


def successful_step_keys(wf):
all_step_keys_ = wf.query_keys_of_steps()
wf_info = wf.query()
all_step_keys = []
for ii in all_step_keys_:
if wf_info.get_step(key=ii)[0]["phase"] == "Succeeded":
all_step_keys.append(ii)
for step in wf.query_step():
if step.key is not None and step.phase == "Succeeded":
all_step_keys.append(step.key)
return all_step_keys


def get_superop(key):
if "prep-train" in key:
return key.replace("prep-train", "prep-run-train")
elif "run-train-" in key:
return re.sub("run-train-[0-9]*", "prep-run-train", key)
elif "prep-lmp" in key:
return key.replace("prep-lmp", "prep-run-explore")
elif "run-lmp-" in key:
return re.sub("run-lmp-[0-9]*", "prep-run-explore", key)
elif "prep-fp" in key:
return key.replace("prep-fp", "prep-run-fp")
elif "run-fp-" in key:
return re.sub("run-fp-[0-9]*", "prep-run-fp", key)
elif "prep-caly-input" in key:
return key.replace("prep-caly-input", "prep-run-explore")
elif "collect-run-calypso-" in key:
return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-run-dp-optim-" in key:
return re.sub("prep-run-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "run-caly-model-devi" in key:
return key.replace("run-caly-model-devi", "prep-run-explore")
return None


def fold_keys(all_step_keys):
folded_keys = {}
for key in all_step_keys:
is_superop = False
for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]:
if superop in key:
if key not in folded_keys:
folded_keys[key] = []
is_superop = True
break
if is_superop:
continue
superop = get_superop(key)
# if its super OP is succeeded, fold it into its super OP
if superop is not None and superop in all_step_keys:
if superop not in folded_keys:
folded_keys[superop] = []
folded_keys[superop].append(key)
else:
folded_keys[key] = [key]
for k, v in folded_keys.items():
if v == []:
folded_keys[k] = [k]
return folded_keys


def get_resubmit_keys(
wf,
):
all_step_keys = successful_step_keys(wf)
all_step_keys = matched_step_key(
all_step_keys,
[
"prep-run-train",
"prep-train",
"run-train",
"modify-train-script",
"prep-caly-input",
"collect-run-calypso",
"prep-run-dp-optim",
"run-caly-model-devi",
"prep-run-explore",
"prep-lmp",
"run-lmp",
"select-confs",
"prep-run-fp",
"prep-fp",
"run-fp",
"collect-data",
Expand All @@ -769,7 +821,8 @@ def get_resubmit_keys(
all_step_keys,
["run-train", "run-lmp", "run-fp"],
)
return all_step_keys
folded_keys = fold_keys(all_step_keys)
return folded_keys


def resubmit_concurrent_learning(
Expand All @@ -778,13 +831,15 @@ def resubmit_concurrent_learning(
list_steps=False,
reuse=None,
replace_scheduler=False,
fold=False,
):
wf_config = normalize_args(wf_config)

global_config_workflow(wf_config)

old_wf = Workflow(id=wfid)
all_step_keys = get_resubmit_keys(old_wf)
folded_keys = get_resubmit_keys(old_wf)
all_step_keys = sum(folded_keys.values(), [])

if list_steps:
prt_str = print_keys_in_nice_format(
Expand All @@ -796,10 +851,23 @@ def resubmit_concurrent_learning(
if reuse is None:
return None
reuse_idx = expand_idx(reuse)
reuse_step = []
old_wf_info = old_wf.query()
for ii in reuse_idx:
reuse_step += old_wf_info.get_step(key=all_step_keys[ii])
reused_keys = [all_step_keys[ii] for ii in reuse_idx]
if fold:
reused_folded_keys = {}
for key in reused_keys:
superop = get_superop(key)
if superop is not None:
if superop not in reused_folded_keys:
reused_folded_keys[superop] = []
reused_folded_keys[superop].append(key)
else:
reused_folded_keys[key] = [key]
for k, v in reused_folded_keys.items():
# reuse the super OP iif all steps within it are reused
if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]):
reused_folded_keys[k] = [k]
reused_keys = sum(reused_folded_keys.values(), [])
reuse_step = old_wf.query_step(key=reused_keys)

wf = submit_concurrent_learning(
wf_config,
Expand Down
109 changes: 109 additions & 0 deletions tests/entrypoint/test_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dpgen2.entrypoint.submit import (
copy_scheduler_plans,
expand_idx,
fold_keys,
print_list_steps,
submit_concurrent_learning,
update_reuse_step_scheduler,
Expand Down Expand Up @@ -956,3 +957,111 @@ def test(self):
PRINT ARG=restraint.bias
"""
)


def test_fold_keys_lmp():
all_step_keys = [
"init--scheduler",
"init--id",
"iter-000000--prep-run-train",
"iter-000000--prep-train",
"iter-000000--run-train-0000",
"iter-000000--run-train-0001",
"iter-000000--run-train-0002",
"iter-000000--run-train-0003",
"iter-000000--prep-run-explore",
"iter-000000--prep-lmp",
"iter-000000--run-lmp-000000",
"iter-000000--run-lmp-000001",
"iter-000000--select-confs",
"iter-000000--prep-run-fp",
"iter-000000--prep-fp",
"iter-000000--run-fp-000000",
"iter-000000--run-fp-000001",
"iter-000000--run-fp-000002",
]
folded_keys = fold_keys(all_step_keys)
assert folded_keys == {
"init--scheduler": ["init--scheduler"],
"init--id": ["init--id"],
"iter-000000--prep-run-train": [
"iter-000000--prep-train",
"iter-000000--run-train-0000",
"iter-000000--run-train-0001",
"iter-000000--run-train-0002",
"iter-000000--run-train-0003",
],
"iter-000000--prep-run-explore": [
"iter-000000--prep-lmp",
"iter-000000--run-lmp-000000",
"iter-000000--run-lmp-000001",
],
"iter-000000--select-confs": ["iter-000000--select-confs"],
"iter-000000--prep-run-fp": [
"iter-000000--prep-fp",
"iter-000000--run-fp-000000",
"iter-000000--run-fp-000001",
"iter-000000--run-fp-000002",
],
}


def test_fold_keys_caly():
all_step_keys = [
"init--scheduler",
"init--id",
"iter-000000--prep-run-train",
"iter-000000--prep-train",
"iter-000000--run-train-0000",
"iter-000000--run-train-0001",
"iter-000000--run-train-0002",
"iter-000000--run-train-0003",
"iter-000000--prep-run-explore",
"iter-000000--prep-caly-input",
"iter-000000--prep-run-dp-optim-000000-0",
"iter-000000--prep-run-dp-optim-000000-1",
"iter-000000--prep-run-dp-optim-000001-0",
"iter-000000--prep-run-dp-optim-000001-1",
"iter-000000--collect-run-calypso-000000-0",
"iter-000000--collect-run-calypso-000000-1",
"iter-000000--collect-run-calypso-000001-0",
"iter-000000--collect-run-calypso-000001-1",
"iter-000000--run-caly-model-devi",
"iter-000000--select-confs",
"iter-000000--prep-run-fp",
"iter-000000--prep-fp",
"iter-000000--run-fp-000000",
"iter-000000--run-fp-000001",
"iter-000000--run-fp-000002",
]
folded_keys = fold_keys(all_step_keys)
assert folded_keys == {
"init--scheduler": ["init--scheduler"],
"init--id": ["init--id"],
"iter-000000--prep-run-train": [
"iter-000000--prep-train",
"iter-000000--run-train-0000",
"iter-000000--run-train-0001",
"iter-000000--run-train-0002",
"iter-000000--run-train-0003",
],
"iter-000000--prep-run-explore": [
"iter-000000--prep-caly-input",
"iter-000000--prep-run-dp-optim-000000-0",
"iter-000000--prep-run-dp-optim-000000-1",
"iter-000000--prep-run-dp-optim-000001-0",
"iter-000000--prep-run-dp-optim-000001-1",
"iter-000000--collect-run-calypso-000000-0",
"iter-000000--collect-run-calypso-000000-1",
"iter-000000--collect-run-calypso-000001-0",
"iter-000000--collect-run-calypso-000001-1",
"iter-000000--run-caly-model-devi",
],
"iter-000000--select-confs": ["iter-000000--select-confs"],
"iter-000000--prep-run-fp": [
"iter-000000--prep-fp",
"iter-000000--run-fp-000000",
"iter-000000--run-fp-000001",
"iter-000000--run-fp-000002",
],
}

0 comments on commit fa4a4cf

Please sign in to comment.