diff --git a/multiml/agent/basic/grid_search.py b/multiml/agent/basic/grid_search.py index 150645c..1d87544 100644 --- a/multiml/agent/basic/grid_search.py +++ b/multiml/agent/basic/grid_search.py @@ -20,15 +20,15 @@ def __init__(self, **kwargs): def execute(self): """Execute grid scan agent.""" if not self._multiprocessing: - for counter, subtasktuples in enumerate(self.task_scheduler): - self._storegate.compile() - result = self.execute_subtasktuples(subtasktuples, counter) - self._history.append(result) + for job_id, subtasktuples in enumerate(self.task_scheduler): + if self._num_trials is None: + result = self.execute_subtasktuples(subtasktuples, job_id) + self._history.append(result) - logger.counter(counter + 1, - len(self.task_scheduler), - divide=1, - message=f'metric={result["metric_value"]}') + else: + for trial_id in range(self._num_trials): + result = self.execute_subtasktuples(subtasktuples, job_id, trial_id) + self._history.append(result) else: # multiprocessing if self._storegate.backend not in ('numpy', 'hybrid'): @@ -39,7 +39,12 @@ def execute(self): queue = ctx.Queue() args = [] - for counter, subtasktuples in enumerate(self.task_scheduler): - args.append([subtasktuples, counter]) + for job_id, subtasktuples in enumerate(self.task_scheduler): + if self._num_trials is None: + args.append([subtasktuples, job_id, None]) + + else: + for trial_id in range(self._num_trials): + args.append([subtasktuples, job_id, trial_id]) self.execute_pool_jobs(ctx, queue, args) diff --git a/multiml/agent/basic/random_search.py b/multiml/agent/basic/random_search.py index 2f12045..cddf326 100644 --- a/multiml/agent/basic/random_search.py +++ b/multiml/agent/basic/random_search.py @@ -15,10 +15,6 @@ def __init__(self, samplings=None, seed=0, metric_type=None, - num_workers=None, - context='spawn', - dump_all_results=False, - disable_tqdm=True, **kwargs): """Initialize simple agent. @@ -28,33 +24,15 @@ def __init__(self, seed (int): seed of random samplings. metric_type (str): 'min' or 'max' for indicating direction of metric optimization. If it is None, ``type`` is retrieved from metric class instance. - num_workers (int or list): number of workers for multiprocessing or lsit of GPU ids. - If ``num_workers`` is given, multiprocessing is enabled. - context (str): fork (default) or spawn. - dump_all_results (bool): dump all results or not. - disable_tqdm (bool): enable tqdm bar. """ super().__init__(**kwargs) if samplings is None: samplings = [0] - self._history = [] self._samplings = samplings self._seed = seed - self._dump_all_results = dump_all_results - self._disable_tqdm = disable_tqdm self._task_prod = self.task_scheduler.get_all_subtasks_with_hps() - self._num_workers = num_workers - self._context = context - self._multiprocessing = False - - if self._num_workers is not None: - self._multiprocessing = True - - if isinstance(self._num_workers, int): - self._num_workers = list(range(self._num_workers)) - if metric_type is None: self._metric_type = self._metric.type else: @@ -62,16 +40,6 @@ def __init__(self, random.seed(seed) - @property - def history(self): - """Return history of execution.""" - return self._history - - @history.setter - def history(self, history): - """Set history of execution.""" - self._history = history - @logger.logging def execute(self): """Execute simple agent.""" @@ -80,10 +48,18 @@ def execute(self): self._samplings = random.choices(samples, k=self._samplings) if not self._multiprocessing: - for counter, index in enumerate(self._samplings): + for job_id, index in enumerate(self._samplings): subtasktuples = self.task_scheduler[index] - result = self.execute_subtasktuples(subtasktuples, counter) - self._history.append(result) + + if self._num_trials is None: + result = self.execute_subtasktuples(subtasktuples, job_id) + self._history.append(result) + + else: + for trial_id in range(self._num_trials): + result = self.execute_subtasktuples(subtasktuples, job_id, trial_id) + self._history.append(result) + else: # multiprocessing if self._storegate.backend not in ('numpy', 'hybrid'): @@ -94,106 +70,59 @@ def execute(self): queue = ctx.Queue() args = [] - for counter, index in enumerate(self._samplings): + for job_id, index in enumerate(self._samplings): subtasktuples = self.task_scheduler[index] - args.append([subtasktuples, counter]) + + if self._num_trials is None: + args.append([subtasktuples, job_id, None]) + else: + for trial_id in range(self._num_trials): + args.append([subtasktuples, job_id, trial_id]) self.execute_pool_jobs(ctx, queue, args) @logger.logging def finalize(self): - """Finalize grid scan agent.""" - if self._result is None: + """Finalize random search agent.""" + if (self._result is None) and (self._history): + metric_values_dict = {} + results_dict = {} + + for history in self._history: + job_id = history['job_id'] - metrics = [result['metric_value'] for result in self._history] + if job_id in metric_values_dict: + metric_values_dict[job_id].append(history['metric_value']) + else: + metric_values_dict[job_id] = [history['metric_value']] + results_dict[job_id] = history + + metric_value_dict = {} + for key, value in metric_values_dict.items(): + metric_value_dict[key] = sum(value) / len(value) + results_dict[key]['metric_value'] = sum(value) / len(value) + if self._num_trials is not None: + results_dict[key]['metric_values'] = value + if self._metric_type == 'max': - index = metrics.index(max(metrics)) + job_id = max(metric_value_dict, key=metric_value_dict.get) elif self._metric_type == 'min': - index = metrics.index(min(metrics)) + job_id = min(metric_value_dict, key=metric_value_dict.get) else: raise NotImplementedError(f'{self._metric_type} is not valid option.') - self._result = self._history[index] + self._result = results_dict[job_id] - # print results - if self._dump_all_results: - header = f'All result of {self.__class__.__name__}' - data = [] - for result in self._history: - names, tmp_data = self._print_result(result) - data += tmp_data + ['-'] - logger.table(header=header, names=names, data=data, max_length=40) - self.saver['history'] = self._history + # print results + if self._dump_all_results: + header = f'All result of {self.__class__.__name__}' + data = [] + for job_id, result in results_dict.items(): + names, tmp_data = self._print_result(result) + data += tmp_data + ['-'] + logger.table(header=header, names=names, data=data, max_length=40) + self.saver['history'] = list(results_dict.values()) super().finalize() - def execute_jobs(self, ctx, queue, args): - """(expert method) Execute multiprocessing jobs.""" - jobs = [] - - for pargs in args: - process = ctx.Process(target=self.execute_wrapper, args=(queue, *pargs), daemon=False) - jobs.append(process) - process.start() - - for job in jobs: - job.join() - - while not queue.empty(): - self._history.append(queue.get()) - - def execute_pool_jobs(self, ctx, queue, args): - """(expert method) Execute multiprocessing pool jobs.""" - jobs = copy.deepcopy(args) - - pool = [0] * len(self._num_workers) - num_jobs = len(jobs) - all_done = False - - pbar_args = dict(ncols=80, total=num_jobs, disable=self._disable_tqdm) - with tqdm(**pbar_args) as pbar: - while not all_done: - time.sleep(1) - - if len(jobs) == 0: - done = True - for ii, process in enumerate(pool): - if (process != 0) and (process.is_alive()): - done = False - all_done = done - - else: - for ii, process in enumerate(pool): - if len(jobs) == 0: - continue - - if (process == 0) or (not process.is_alive()): - time.sleep(0.05) - job_arg = jobs.pop(0) - pool[ii] = ctx.Process(target=self.execute_wrapper, - args=(queue, *job_arg, ii), - daemon=False) - pool[ii].start() - pbar.update(1) - - if self._disable_tqdm: - logger.info(f'launch process ({num_jobs - len(jobs)}/{num_jobs})') - - while not queue.empty(): - self._history.append(queue.get()) - - while not queue.empty(): - self._history.append(queue.get()) - - def execute_wrapper(self, queue, subtasktuples, counter, cuda_id): - """(expert method) Wrapper method to execute multiprocessing pipeline.""" - for subtasktuple in subtasktuples: - subtasktuple.env.pool_id = (cuda_id, self._num_workers, len(self.task_scheduler)) - - result = self.execute_subtasktuples(subtasktuples, counter) - - if self._dump_all_results: - self.saver[f'history_{counter}'] = result - - queue.put(result) diff --git a/multiml/agent/basic/sequential.py b/multiml/agent/basic/sequential.py index a8fbe65..ed1ed86 100644 --- a/multiml/agent/basic/sequential.py +++ b/multiml/agent/basic/sequential.py @@ -1,8 +1,13 @@ """SequentialAgent module.""" import copy +import time from multiml import logger from multiml.agent.basic import BaseAgent +import numpy as np + +from tqdm import tqdm +import multiprocessing as mp class SequentialAgent(BaseAgent): """Agent execute sequential tasks. @@ -23,6 +28,10 @@ def __init__(self, diff_pretrain=False, diff_task_args=None, num_trials=None, + dump_all_results=False, + num_workers=None, + context='spawn', + disable_tqdm=True, **kwargs): """Initialize sequential agent. @@ -34,16 +43,35 @@ def __init__(self, `ConnectionTask()``. diff_task_args (dict): arbitrary args passed to ``ConnectionTask()``. num_trials (ine): number of trials. Average value of trials is used as final metric. + dump_all_results (bool): dump all results or not. + num_workers (int or list): number of workers for multiprocessing or lsit of GPU ids. + If ``num_workers`` is given, multiprocessing is enabled. + context (str): fork (default) or spawn. + disable_tqdm (bool): enable tqdm bar. """ if diff_task_args is None: diff_task_args = {} super().__init__(**kwargs) self._result = None + self._history = [] self._differentiable = differentiable self._diff_pretrain = diff_pretrain self._diff_task_args = diff_task_args self._num_trials = num_trials + self._dump_all_results = dump_all_results + self._num_workers = num_workers + self._disable_tqdm = disable_tqdm + + self._context = context + self._multiprocessing = False + + if self._num_workers is not None: + self._multiprocessing = True + + if isinstance(self._num_workers, int): + self._num_workers = list(range(self._num_workers)) + @property def result(self): @@ -55,6 +83,16 @@ def result(self, result): """Set result of execution.""" self._result = result + @property + def history(self): + """Return history of execution.""" + return self._history + + @history.setter + def history(self, history): + """Set history of execution.""" + self._history = history + @logger.logging def execute(self): """Execute sequential agent.""" @@ -62,50 +100,156 @@ def execute(self): raise ValueError('Multiple sutasks or hyperparameters are defined.') subtasktuples = self.task_scheduler[0] - self._result = self.execute_subtasktuples(subtasktuples, 0) + + if self._num_trials is None: + self._result = self.execute_subtasktuples(subtasktuples, 0) + + else: + if self._multiprocessing: + if self._storegate.backend not in ('numpy', 'hybrid'): + raise NotImplementedError( + 'multiprocessing is supported for only numpy and hybrid backend') + + ctx = mp.get_context(self._context) + queue = ctx.Queue() + args = [] + + for trial_id in range(self._num_trials): + args.append([subtasktuples, 0, trial_id]) + + self.execute_pool_jobs(ctx, queue, args) + + else: + for trial_id in range(self._num_trials): + result = self.execute_subtasktuples(subtasktuples, 0, trial_id) + self._history.append(result) + + @logger.logging def finalize(self): """Finalize sequential agent.""" - if self._result is None: + if (self._result is None) and (not self._history): logger.warn(f'No result at finalize of {self.__class__.__name__}') + + elif self._result: + pass - else: - header = f'Result of {self.__class__.__name__}' - names, data = self._print_result(self._result) - logger.table(header=header, names=names, data=data, max_length=40) - self.saver['result'] = self._result + elif self._history: + metric_values = [] + for history in self._history: + metric_values.append(history['metric_value']) + metric_value = sum(metric_values)/self._num_trials + self._result = self._history[0] + + self._result['metric_value'] = metric_value + self._result['metric_values'] = metric_values + + header = f'Result of {self.__class__.__name__}' + names, data = self._print_result(self._result) + logger.table(header=header, names=names, data=data, max_length=40) + self.saver['result'] = self._result + + + def execute_jobs(self, ctx, queue, args): + """(expert method) Execute multiprocessing jobs.""" + jobs = [] + + for pargs in args: + process = ctx.Process(target=self.execute_wrapper, args=(queue, *pargs), daemon=False) + jobs.append(process) + process.start() + + for job in jobs: + job.join() - def execute_subtasktuples(self, subtasktuples, counter): + while not queue.empty(): + self._history.append(queue.get()) + + + def execute_pool_jobs(self, ctx, queue, args): + """(expert method) Execute multiprocessing pool jobs.""" + jobs = copy.deepcopy(args) + + pool = [0] * len(self._num_workers) + num_jobs = len(jobs) + all_done = False + + pbar_args = dict(ncols=80, total=num_jobs, disable=self._disable_tqdm) + with tqdm(**pbar_args) as pbar: + while not all_done: + time.sleep(1) + + if len(jobs) == 0: + done = True + for ii, process in enumerate(pool): + if (process != 0) and (process.is_alive()): + done = False + all_done = done + + else: + for ii, process in enumerate(pool): + if len(jobs) == 0: + continue + + if (process == 0) or (not process.is_alive()): + time.sleep(0.05) + job_arg = jobs.pop(0) + pool[ii] = ctx.Process(target=self.execute_wrapper, + args=(queue, *job_arg, ii), + daemon=False) + pool[ii].start() + pbar.update(1) + + if self._disable_tqdm: + logger.info(f'launch process ({num_jobs - len(jobs)}/{num_jobs})') + + while not queue.empty(): + self._history.append(queue.get()) + + while not queue.empty(): + self._history.append(queue.get()) + + + def execute_wrapper(self, queue, subtasktuples, job_id, trial_id, cuda_id): + """(expert method) Wrapper method to execute multiprocessing pipeline.""" + for subtasktuple in subtasktuples: + subtasktuple.env.pool_id = (cuda_id, self._num_workers, len(self.task_scheduler)) + + result = self.execute_subtasktuples(subtasktuples, job_id, trial_id) + + if self._dump_all_results: + history_id = f'history_{job_id}' + if trial_id is not None: + history_id += f'_{trial_id}' + + self.saver[history_id] = result + + queue.put(result) + + + def execute_subtasktuples(self, subtasktuples, job_id, trial_id=None): """Execute given subtasktuples.""" if self._differentiable is None: fn_execute = self.execute_pipeline else: fn_execute = self.execute_differentiable - if self._num_trials is None: - return fn_execute(subtasktuples, counter) - else: - metric_values = [] - for ii in range(self._num_trials): - result = fn_execute(subtasktuples, counter, ii) - metric_values.append(result['metric_value']) - - result['metric_values'] = metric_values - result['metric_value'] = sum(metric_values) / len(metric_values) + return fn_execute(subtasktuples, job_id, trial_id) - return result - - def execute_pipeline(self, subtasktuples, counter, trial=None): + def execute_pipeline(self, subtasktuples, job_id, trial_id=None): """Execute pipeline.""" result = { 'task_ids': [], 'subtask_ids': [], - 'job_ids': [], - 'trial_ids': [], + 'job_id': None, + 'trial_id': None, 'subtask_hps': [], 'metric_value': None } + + result['job_id'] = job_id + result['trial_id'] = trial_id for subtasktuple in subtasktuples: task_id = subtasktuple.task_id @@ -116,15 +260,13 @@ def execute_pipeline(self, subtasktuples, counter, trial=None): subtask_env.subtask_id = subtask_id subtask_env.saver = self._saver subtask_env.storegate = self._storegate - subtask_env.job_id = counter - subtask_env.trial_id = trial + subtask_env.job_id = job_id + subtask_env.trial_id = trial_id subtask_env.set_hps(subtask_hps) self._execute_subtask(subtasktuple) result['task_ids'].append(task_id) result['subtask_ids'].append(subtask_id) - result['job_ids'].append(counter) - result['trial_ids'].append(trial) result['subtask_hps'].append(subtask_hps) self._metric.storegate = self._storegate @@ -132,17 +274,20 @@ def execute_pipeline(self, subtasktuples, counter, trial=None): return result - def execute_differentiable(self, subtasktuples, counter, trial=None): + def execute_differentiable(self, subtasktuples, job_id, trial_id=None): """Execute connection model.""" result = { 'task_ids': [], 'subtask_ids': [], - 'job_ids': [], - 'trial_ids': [], + 'job_id': None, + 'trial_id': None, 'subtask_hps': [], 'metric_value': None } + result['job_id'] = job_id + result['trial_id'] = trial_id + if self._diff_pretrain: for subtasktuple in subtasktuples: task_id = subtasktuple.task_id @@ -153,15 +298,13 @@ def execute_differentiable(self, subtasktuples, counter, trial=None): subtask_env.subtask_id = subtask_id subtask_env.saver = self._saver subtask_env.storegate = self._storegate - subtask_env.job_id = counter - subtask_env.trial_id = trial + subtask_env.job_id = job_id + subtask_env.trial_id = trial_id subtask_env.set_hps(subtask_hps) self._execute_subtask(subtasktuple) result['task_ids'].append(task_id) result['subtask_ids'].append(subtask_id) - result['job_ids'].append(counter) - result['trial_ids'].append(trial) result['subtask_hps'].append(subtask_hps) subtasks = [v.env for v in subtasktuples] @@ -232,7 +375,13 @@ def _print_result(self, result): metric_data = [] for index, idata in enumerate(data): if index == 0: - metric_data.append(idata + [f'{result["metric_value"]}']) + metric_value = f'{result["metric_value"]}' + if 'metric_values' in result: + metric_std = np.array(result['metric_values']) + metric_std = metric_std.std() + metric_value += f' +- {metric_std}' + + metric_data.append(idata + [f'{metric_value}']) else: metric_data.append(idata + ['']) diff --git a/multiml/task/pytorch/pytorch_base.py b/multiml/task/pytorch/pytorch_base.py index c4d3992..4610a66 100644 --- a/multiml/task/pytorch/pytorch_base.py +++ b/multiml/task/pytorch/pytorch_base.py @@ -665,10 +665,12 @@ def _select_pred_data(self, y_pred): return [y_pred[index] for index in self._pred_index] def _get_pbar_description(self, epoch, phase): - if self.trial_id is None: + if (self.job_id is None) and (self.trial_id is None): return f'Epoch [{epoch: >4}/{self._num_epochs}] {phase.ljust(5)}' + elif (self.trial_id is None): + return f'Epoch [{epoch: >4}/{self._num_epochs},{self.job_id: >2}] {phase.ljust(5)}' else: - return f'Epoch [{epoch: >4}/{self._num_epochs},{self.trial_id+1: >2}] {phase.ljust(5)}' + return f'Epoch [{epoch: >4}/{self._num_epochs},{self.job_id: >2},{self.trial_id+1: >2}] {phase.ljust(5)}' def _disable_tqdm(self): disable_tqdm = True