Skip to content

Commit

Permalink
show status of workflows (#71)
Browse files Browse the repository at this point in the history
* show status of workflows.

* fix bugs

* change scheduler converged to complete. add complete for stage_scheduler

* fix == None

Co-authored-by: Han Wang <wang_han@iapcm.ac.cn>
  • Loading branch information
wanghan-iapcm and Han Wang committed Sep 11, 2022
1 parent 0a27bf8 commit 74290e1
Show file tree
Hide file tree
Showing 8 changed files with 507 additions and 35 deletions.
32 changes: 28 additions & 4 deletions dpgen2/entrypoint/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
submit_concurrent_learning,
resubmit_concurrent_learning,
)
from .status import (
status,
)
from dpgen2 import (
__version__
)
Expand Down Expand Up @@ -48,7 +51,7 @@ def main_parser() -> argparse.ArgumentParser:
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_run.add_argument(
"INPUT", help="the input file in json format defining the workflow."
"CONFIG", help="the config file in json format defining the workflow."
)

parser_resubmit = subparsers.add_parser(
Expand All @@ -57,7 +60,7 @@ def main_parser() -> argparse.ArgumentParser:
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_resubmit.add_argument(
"INPUT", help="the input file in json format defining the workflow."
"CONFIG", help="the config file in json format defining the workflow."
)
parser_resubmit.add_argument(
"ID", help="the ID of the existing workflow."
Expand All @@ -69,6 +72,18 @@ def main_parser() -> argparse.ArgumentParser:
"--reuse", type=str, nargs='+', default=None, help="specify which Steps to reuse."
)

parser_status = subparsers.add_parser(
"status",
help="Print the status (stage, iteration, convergence) of the DPGEN2 workflow",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_status.add_argument(
"CONFIG", help="the config file in json format."
)
parser_status.add_argument(
"ID", help="the ID of the existing workflow."
)

# --version
parser.add_argument(
'--version',
Expand Down Expand Up @@ -102,16 +117,25 @@ def main():
dict_args = vars(args)

if args.command == "submit":
with open(args.INPUT) as fp:
with open(args.CONFIG) as fp:
config = json.load(fp)
submit_concurrent_learning(config)
elif args.command == "resubmit":
with open(args.INPUT) as fp:
with open(args.CONFIG) as fp:
config = json.load(fp)
wfid = args.ID
resubmit_concurrent_learning(
config, wfid, list_steps=args.list, reuse=args.reuse,
)
elif args.command == "status":
with open(args.CONFIG) as fp:
config = json.load(fp)
wfid = args.ID
status(
wfid, config,
)
elif args.command is None:
pass
else:
raise RuntimeError(f"unknown command {args.command}")

32 changes: 32 additions & 0 deletions dpgen2/entrypoint/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
from dflow import (
Workflow,
)
from dpgen2.utils import (
dflow_config,
)
from dpgen2.utils.dflow_query import (
get_last_scheduler,
)
from typing import (
Optional, Dict, Union, List,
)

def status(
workflow_id,
wf_config : Optional[Dict] = {},
):
dflow_config_data = wf_config.get('dflow_config', None)
dflow_config(dflow_config_data)

wf = Workflow(id=workflow_id)

wf_keys = wf.query_keys_of_steps()

scheduler = get_last_scheduler(wf, wf_keys)

if scheduler is not None:
ptr_str = scheduler.print_convergence()
print(ptr_str)
else:
logging.warn('no scheduler is finished')
35 changes: 26 additions & 9 deletions dpgen2/exploration/scheduler/convergence_check_stage_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,34 @@ def __init__(
self.max_numb_iter = max_numb_iter
self.fatal_at_max = fatal_at_max
self.nxt_iter = 0
self.conv = False
self.reached_max_iter = False
self.complete_ = False
self.reports = []

def complete(self):
return self.complete_

def converged(self):
return self.conv

def reached_max_iteration(self):
return self.reached_max_iter

def plan_next_iteration(
self,
hist_reports : List[ExplorationReport] = [],
report : ExplorationReport = None,
trajs : List[Path] = None,
) -> Tuple[bool, ExplorationTaskGroup, ConfSelector] :
if report is None:
converged = False
stg_complete = False
self.conv = stg_complete
lmp_task_grp = self.stage.make_task()
ret_selector = self.selector
else :
converged = report.accurate_ratio() >= self.conv_accuracy
if not converged:
stg_complete = report.accurate_ratio() >= self.conv_accuracy
self.conv = stg_complete
if not stg_complete:
# check if we have any candidate to improve the quality of the model
if report.candidate_ratio() == 0.0:
raise FatalError(
Expand All @@ -49,20 +63,23 @@ def plan_next_iteration(
'improved and the iteraction would not end. '
'Please try to increase the higher trust levels. '
)
# if not converged, check max iter
# if not stg_complete, check max iter
if self.max_numb_iter is not None and self.nxt_iter == self.max_numb_iter:
self.reached_max_iter = True
if self.fatal_at_max:
raise FatalError('reached maximal number of iterations')
else:
converged = True
stg_complete = True
# make lmp tasks
if converged:
# if converged, no more lmp task
if stg_complete:
# if stg_complete, no more lmp task
lmp_task_grp = None
ret_selector = None
else :
lmp_task_grp = self.stage.make_task()
ret_selector = self.selector
self.reports.append(report)
self.nxt_iter += 1
return converged, lmp_task_grp, ret_selector
self.complete_ = stg_complete
return stg_complete, lmp_task_grp, ret_selector

122 changes: 112 additions & 10 deletions dpgen2/exploration/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import numpy as np

from typing import (
List,
Tuple,
Expand All @@ -24,9 +26,9 @@ def __init__(
self,
):
self.stage_schedulers = []
self.stage_reports = [[]]
self.cur_stage = 0
self.iteration = -1
self.complete_ = False

def add_stage_scheduler(
self,
Expand All @@ -44,6 +46,7 @@ def add_stage_scheduler(
"""
self.stage_schedulers.append(stage_scheduler)
self.complete_ = False
return self

def get_stage(self):
Expand All @@ -64,6 +67,13 @@ def get_iteration(self):
"""
return self.iteration

def complete(self):
"""
Tell if all stages are converged.
"""
return self.complete_

def plan_next_iteration(
self,
report : ExplorationReport = None,
Expand All @@ -81,8 +91,8 @@ def plan_next_iteration(
Returns
-------
converged: bool
If DPGEN converges.
complete: bool
If all the DPGEN stages complete.
task: ExplorationTaskGroup
A `ExplorationTaskGroup` defining the exploration of the next iteration. Should be `None` if converged.
conf_selector: ConfSelector
Expand All @@ -91,26 +101,118 @@ def plan_next_iteration(
"""

try:
converged, lmp_task_grp, conf_selector = \
stg_complete, lmp_task_grp, conf_selector = \
self.stage_schedulers[self.cur_stage].plan_next_iteration(
self.stage_reports[self.cur_stage],
report,
trajs,
)
self.stage_reports[self.cur_stage].append(report)
except FatalError as e:
raise FatalError(f'stage {self.cur_stage}: ' + str(e))

if converged:
if stg_complete:
self.cur_stage += 1
self.stage_reports.append([])
if self.cur_stage < len(self.stage_schedulers):
# goes to next stage
return self.plan_next_iteration()
else:
# all stages converged
# all stages complete
self.complete_ = True
return True, None, None,
else :
self.iteration += 1
return converged, lmp_task_grp, conf_selector
return stg_complete, lmp_task_grp, conf_selector


def get_stage_of_iterations(self):
"""
Get the stage index and the index in the stage of iterations.
"""
stages = self.stage_schedulers
n_stage_iters = []
for ii in range(self.get_stage() + 1):
if ii < len(stages) and len(stages[ii].reports) > 0:
n_stage_iters.append(len(stages[ii].reports))
cumsum_stage_iters = np.cumsum(n_stage_iters)

max_iter = self.get_iteration()
if self.complete() or max_iter == -1:
max_iter += 1
stage_idx = []
idx_in_stage = []
iter_idx = []
for ii in range(max_iter):
idx = np.searchsorted(cumsum_stage_iters, ii+1)
stage_idx.append(idx)
if idx > 0:
idx_in_stage.append(ii - cumsum_stage_iters[idx-1])
else :
idx_in_stage.append(ii)
iter_idx.append(ii)
assert( len(stage_idx) == max_iter)
assert( len(idx_in_stage) == max_iter)
assert( len(iter_idx) == max_iter)
return stage_idx, idx_in_stage, iter_idx


def get_convergence_ratio(self):
"""
Get the accurate, candidate and failed ratios of the iterations
Returns
-------
accu np.ndarray
The accurate ratio. length of array the same as # iterations.
cand np.ndarray
The candidate ratio. length of array the same as # iterations.
fail np.ndarray
The failed ration. length of array the same as # iterations.
"""
stages = self.stage_schedulers
stag_idx, idx_in_stag, iter_idx = self.get_stage_of_iterations()
accu = []
cand = []
fail = []
for ii in range(np.size(iter_idx)):
accu.append(stages[stag_idx[ii]].reports[idx_in_stag[ii]].accurate_ratio())
cand.append(stages[stag_idx[ii]].reports[idx_in_stag[ii]].candidate_ratio())
fail.append(stages[stag_idx[ii]].reports[idx_in_stag[ii]].failed_ratio())
return np.array(accu), np.array(cand), np.array(fail)

def _print_prev_summary(self, prev_stg_idx):
if prev_stg_idx >= 0:
yes = 'YES' if self.stage_schedulers[prev_stg_idx].converged() else 'NO '
rmx = 'YES' if self.stage_schedulers[prev_stg_idx].reached_max_iteration() else 'NO '
return f'# Stage {prev_stg_idx:4d} converged {yes} reached max numb iterations {rmx}'
else:
return None

def print_convergence(self):
spaces = [8, 8, 8, 10, 10, 10]
fmt_str = ' '.join([f'%{ii}s' for ii in spaces])
fmt_flt = '%.4f'
header_str = '#' + fmt_str % ('stage', 'id_stg.', 'iter.', 'accu.', 'cand.', 'fail.')
ret = [header_str]

stage_idx, idx_in_stage, iter_idx = self.get_stage_of_iterations()
accu, cand, fail = self.get_convergence_ratio()

iidx = 0
prev_stg_idx = -1
for iidx in range(len(accu)):
if stage_idx[iidx] != prev_stg_idx:
if prev_stg_idx >= 0:
ret.append(self._print_prev_summary(prev_stg_idx))
ret.append(f'# Stage {stage_idx[iidx]:4d} ' + '-'*20)
prev_stg_idx = stage_idx[iidx]
ret.append(' ' + fmt_str % (
str(stage_idx[iidx]), str(idx_in_stage[iidx]), str(iidx),
fmt_flt%(accu[iidx]*1),
fmt_flt%(cand[iidx]*1),
fmt_flt%(fail[iidx]*1),
))
if self.complete():
if prev_stg_idx >= 0:
ret.append(self._print_prev_summary(prev_stg_idx))
ret.append(f'# All stages converged')
return '\n'.join(ret + [''])
19 changes: 16 additions & 3 deletions dpgen2/exploration/scheduler/stage_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@ class StageScheduler(ABC):
The scheduler for an exploration stage.
"""

@abstractmethod
def converged(self):
"""
Tell if the stage is converged
Returns
-------
converged bool
the convergence
"""
pass

@abstractmethod
def plan_next_iteration(
self,
hist_reports : List[ExplorationReport],
report : ExplorationReport,
trajs : List[Path],
) -> Tuple[bool, ExplorationTaskGroup, ConfSelector] :
Expand All @@ -39,8 +50,10 @@ def plan_next_iteration(
Returns
-------
converged: bool
If the stage converged.
stg_complete: bool
If the stage completed. Two cases may happen:
1. converged.
2. when not fatal_at_max, not converged but reached max number of iterations.
task: ExplorationTaskGroup
A `ExplorationTaskGroup` defining the exploration of the next iteration. Should be `None` if the stage is converged.
conf_selector: ConfSelector
Expand Down
2 changes: 1 addition & 1 deletion dpgen2/utils/dflow_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_last_scheduler(
return None
else:
skey = sorted(scheduler_keys)[-1]
step = wf.query_step(key=skey)
step = wf.query_step(key=skey)[0]
return step.outputs.parameters['exploration_scheduler'].value


Expand Down

0 comments on commit 74290e1

Please sign in to comment.