Skip to content

Commit

Permalink
[WIP] improve parameters support of the callbacks (#64)
Browse files Browse the repository at this point in the history
Improve parameters support of the callbacks

- adaptation of parallelrunner.py to support all versions of Python
- minor bugs fixed
- ad hoc test implementation
---------

Co-authored-by: pagmatt <mattpagg@gmail.com>
Co-authored-by: Andrea Lacava <lacava.a@norhteastern.edu>
  • Loading branch information
3 people committed Apr 26, 2023
1 parent c682847 commit 242f98a
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 42 deletions.
4 changes: 2 additions & 2 deletions sem/__init__.py
Expand Up @@ -4,11 +4,11 @@
from .lptrunner import LptRunner
from .gridrunner import BUILD_GRID_PARAMS, SIMULATION_GRID_PARAMS
from .database import DatabaseManager
from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files
from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files, CallbackBase
from .cli import cli

__all__ = ('CampaignManager', 'SimulationRunner', 'ParallelRunner', 'LptRunner',
'DatabaseManager', 'list_param_combinations', 'automatic_parser',
'only_load_some_files')
'only_load_some_files', 'CallbackBase')

name = 'sem'
17 changes: 14 additions & 3 deletions sem/manager.py
Expand Up @@ -305,6 +305,10 @@ def run_simulations(self, param_list, show_progress=True, callbacks: list = [],
can be either a string or a number).
show_progress (bool): whether or not to show a progress bar with
percentage and expected remaining time.
callbacks (list): list of objects extending CallbackBase to be
triggered during the run.
stop_on_errors (bool): whether or not to stop the execution of the simulations
if an error occurs.
"""

# Make sure we have a runner to run simulations with.
Expand All @@ -315,7 +319,7 @@ def run_simulations(self, param_list, show_progress=True, callbacks: list = [],
" for this CampaignManager.")

# Return if the list is empty
if param_list == []:
if not param_list:
return

self.check_and_fill_parameters(param_list, needs_rngrun=True)
Expand Down Expand Up @@ -378,8 +382,7 @@ def run_and_save_results(self, result_generator, batch_results=True):
self.db.insert_results(results_batch)
self.db.write_to_disk()

def get_missing_simulations(self, param_list, runs=None,
with_time_estimate=False):
def get_missing_simulations(self, param_list, runs=None, with_time_estimate=False):
"""
Return a list of the simulations among the required ones that are not
available in the database.
Expand All @@ -390,6 +393,7 @@ def get_missing_simulations(self, param_list, runs=None,
runs (int): an integer representing how many repetitions are wanted
for each parameter combination, None if the dictionaries in
param_list already feature the desired RngRun value.
with_time_estimate (bool): a boolean representing ...
"""

params_to_simulate = []
Expand Down Expand Up @@ -452,6 +456,7 @@ def get_missing_simulations(self, param_list, runs=None,

def run_missing_simulations(self, param_list, runs=None,
condition_checking_function=None,
callbacks=[],
stop_on_errors=True):
"""
Run the simulations from the parameter list that are not yet available
Expand All @@ -471,6 +476,10 @@ def run_missing_simulations(self, param_list, runs=None,
runs (int): the number of runs to perform for each parameter
combination. This parameter is only allowed if the param_list
specification doesn't feature an 'RngRun' key already.
callbacks (list): list of objects extending CallbackBase to be
triggered during the run.
stop_on_errors (bool): whether or not to stop the execution of the simulations
if an error occurs.
"""
# Expand the parameter specification
param_list = list_param_combinations(param_list)
Expand Down Expand Up @@ -504,10 +513,12 @@ def run_missing_simulations(self, param_list, runs=None,
self.get_missing_simulations(param_list,
runs,
with_time_estimate=True),
callbacks=callbacks,
stop_on_errors=stop_on_errors)
else:
self.run_simulations(
self.get_missing_simulations(param_list, runs),
callbacks=callbacks,
stop_on_errors=stop_on_errors)

#####################
Expand Down
29 changes: 19 additions & 10 deletions sem/parallelrunner.py
@@ -1,25 +1,33 @@
from .runner import SimulationRunner
from multiprocessing import Pool

from .utils import CallbackBase
from multiprocessing.pool import ThreadPool as Pool
# We use ThreadPool to share the process memory among the different simulations to enable the use of callbacks.
# This may be improved eventually using a grain-fined solution that checks the presence or not of callbacks

class ParallelRunner(SimulationRunner):

"""
A Runner which can perform simulations in parallel on the current machine.
"""
data_folder: str = None
stop_on_errors: bool = False
callbacks: [CallbackBase] = []

def run_simulations(self, parameter_list, data_folder, callbacks: list = [], stop_on_errors=False):
def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False):
"""
This function runs multiple simulations in parallel.
Args:
parameter_list (list): list of parameter combinations to simulate.
data_folder (str): folder in which to create output folders.
callbacks (list): list of callbacks to be triggered
stop_on_errors (bool): check whether simulation has to stop on errors or not
"""

for cb in callbacks:
cb.on_simulation_start(len(list(enumerate(parameter_list))))
cb.controlled_by_parent = True

if callbacks is not None:
for cb in callbacks:
cb.on_simulation_start(len(list(enumerate(parameter_list))))
cb.controlled_by_parent = True

self.data_folder = data_folder
self.stop_on_errors = stop_on_errors
Expand All @@ -29,10 +37,11 @@ def run_simulations(self, parameter_list, data_folder, callbacks: list = [], sto
parameter_list):
yield result

for cb in callbacks:
cb.on_simulation_end()
if callbacks is not None:
for cb in callbacks:
cb.on_simulation_end()

def launch_simulation(self, parameter, callbacks: list = []):
def launch_simulation(self, parameter):
"""
Launch a single simulation, using SimulationRunner's facilities.
Expand Down
34 changes: 21 additions & 13 deletions sem/runner.py
Expand Up @@ -8,6 +8,7 @@
import sys
from importlib.machinery import SourceFileLoader
import types
from .utils import CallbackBase

from tqdm import tqdm

Expand Down Expand Up @@ -289,22 +290,25 @@ def get_available_parameters(self):
# Simulation running #
######################

def run_simulations(self, parameter_list, data_folder, callbacks: list = [], stop_on_errors=False):
def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False):
"""
Run several simulations using a certain combination of parameters.
Yields results as simulations are completed.
Yield results as simulations are completed.
Args:
parameter_list (list): list of parameter combinations to simulate.
data_folder (str): folder in which to save subfolders containing
simulation output.
callbacks (list): list of callbacks to be triggered
stop_on_errors (bool): if true, when a simulation outputs an error the whole campaign will be stopped
"""

# Log simulation start if not already done by parent class
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_start(len(list(enumerate(parameter_list))))
if callbacks is not None:
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_start(len(list(enumerate(parameter_list))))

for _, parameter in enumerate(parameter_list):

Expand All @@ -319,16 +323,18 @@ def run_simulations(self, parameter_list, data_folder, callbacks: list = [], sto
parameter.items()]

# Run from dedicated temporary folder
current_result['meta']['id'] = str(uuid.uuid4())
sim_uuid = str(uuid.uuid4())
current_result['meta']['id'] = sim_uuid
temp_dir = os.path.join(data_folder, current_result['meta']['id'])
os.makedirs(temp_dir)

start = time.time() # Time execution
stdout_file_path = os.path.join(temp_dir, 'stdout')
stderr_file_path = os.path.join(temp_dir, 'stderr')

for cb in callbacks:
cb.on_run_start()
if callbacks is not None:
for cb in callbacks:
cb.on_run_start(parameter, sim_uuid)

with open(stdout_file_path, 'w') as stdout_file, open(
stderr_file_path, 'w') as stderr_file:
Expand All @@ -338,8 +344,9 @@ def run_simulations(self, parameter_list, data_folder, callbacks: list = [], sto
stderr=stderr_file)
end = time.time() # Time execution

for cb in callbacks:
cb.on_run_end(return_code, end-start)
if callbacks is not None:
for cb in callbacks:
cb.on_run_end(sim_uuid, return_code, end - start)

if return_code != 0:

Expand Down Expand Up @@ -370,6 +377,7 @@ def run_simulations(self, parameter_list, data_folder, callbacks: list = [], sto
yield current_result

# Log simulation start if not already done by parent class
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_end()
if callbacks is not None:
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_end()
36 changes: 23 additions & 13 deletions sem/utils.py
Expand Up @@ -316,7 +316,7 @@ class CallbackBase(ABC):
def __init__(self, verbose: int = 0):
super().__init__()
# Number of time the callback was called
self.controlled_by_parent = False
self.controlled_by_parent = False # type: bool
self.n_runs_over = 0 # type: int
self.n_runs_over_no_errors = 0 # type: int
self.n_runs_over_errors = 0 # type: int
Expand All @@ -329,7 +329,6 @@ def init_callback(self, controlled_by_parent) -> None:
Initialize the callback.
"""
self.controlled_by_parent = controlled_by_parent
self._init_callback()

def is_controlled_by_parent(self) -> bool:
"""
Expand All @@ -343,39 +342,50 @@ def on_simulation_start(self, n_runs_total) -> None:
self.n_runs_total = n_runs_total
self._on_simulation_start()

@abstractmethod
def _on_simulation_start(self) -> None:
pass

def on_run_start(self) -> None:
self._on_run_start()
def on_run_start(self, configuration, sim_uuid) -> None:
"""
Args:
configuration (dict): dictionary representing the combination of parameters simulated in this specific
sim_uuid (str): unique identifier string for the simulation. This value is used to name the result folder,
and it is referenced in the result JSON file.
"""
self._on_run_start(configuration, sim_uuid)

def _on_run_start(self) -> None:
@abstractmethod
def _on_run_start(self, configuration: dict, sim_uuid: str) -> None:
pass

@abstractmethod
def _on_run_end(self) -> bool:
def _on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool:
"""
:return: If the callback returns False, training is aborted early.
# TODO maybe it does not make a lot of sense since this will be eventually overridden by the callback user
"""
return True

def on_run_end(self, return_code: int, sim_time: int) -> bool:
def on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool:
"""
This method will be called when each simulation run finishes
# TODO maybe it does not make a lot of sense since this will be eventually overridden by the callback user
:return: If the callback returns False, a run has failed.
"""
self.n_runs_over += 1
self.run_sim_times.append(sim_time)
if (return_code == 0):
self.n_runs_over_no_errors = 0 # type: int
if return_code == 0:
self.n_runs_over_no_errors += 1 # type: int
else:
self.n_runs_over_errors = 0 # type: int
self._on_run_end()
self.n_runs_over_errors += 1 # type: int

return self._on_run_end(sim_uuid, return_code, sim_time)

def on_simulation_end(self) -> None:
self._on_simulation_end()
self._on_simulation_end()

@abstractmethod
def _on_simulation_end(self) -> None:
pass

Expand Down
40 changes: 39 additions & 1 deletion tests/test_utils.py
@@ -1,4 +1,4 @@
from sem import list_param_combinations, automatic_parser, stdout_automatic_parser
from sem import list_param_combinations, automatic_parser, stdout_automatic_parser, CallbackBase, CampaignManager
import json
import numpy as np
from operator import getitem
Expand Down Expand Up @@ -101,3 +101,41 @@ def test_automatic_parser(result):
[6, 7, 8, 9, 10]])
assert parsed['stderr'] == []


class TestCallback(CallbackBase):

# Prevent pytest from trying to collect this function as a test
__test__ = False

def __init__(self):
CallbackBase.__init__(self, verbose=2)
self.output = ''

def _on_simulation_start(self) -> None:
self.output += 'Starting the simulations!\n'

def _on_simulation_end(self) -> None:
self.output += 'Simulations are over!\n'

def _on_run_start(self, configuration: dict, sim_uuid: str) -> None:
self.output += 'Start single run!\n'

def _on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool:
self.output += f'Run ended! {return_code}\n'
return True


def test_callback(ns_3_compiled, config, parameter_combination):
cb = TestCallback()
n_runs = 10
expected_output = 'Starting the simulations!\n' + \
f'Start single run!\nRun ended! {0}\n' * \
n_runs + 'Simulations are over!\n'

campaign = CampaignManager.new(ns_3_compiled, config['script'], config['campaign_dir'],
runner_type='SimulationRunner', overwrite=True)
parameter_combination.update({'RngRun': [
run for run in range(n_runs)]})
campaign.run_missing_simulations(
param_list=[parameter_combination], callbacks=[cb])
assert expected_output == cb.output

0 comments on commit 242f98a

Please sign in to comment.