Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update get_command_from_result for ns-3.35+ #66

Closed
wants to merge 11 commits into from
12 changes: 10 additions & 2 deletions sem/__init__.py
Expand Up @@ -4,11 +4,19 @@
from .lptrunner import LptRunner
from .gridrunner import BUILD_GRID_PARAMS, SIMULATION_GRID_PARAMS
from .database import DatabaseManager
from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files
<<<<<<< HEAD
from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files, get_command_from_result, CallbackBase
=======
from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files, get_command_from_result
>>>>>>> 627d998c7c55cc1901c8d9fbf7f443da5fe4d02e
from .cli import cli

__all__ = ('CampaignManager', 'SimulationRunner', 'ParallelRunner', 'LptRunner',
'DatabaseManager', 'list_param_combinations', 'automatic_parser',
'only_load_some_files')
<<<<<<< HEAD
'only_load_some_files', 'get_command_from_result', 'CallbackBase')
=======
'only_load_some_files', 'get_command_from_result')
>>>>>>> 627d998c7c55cc1901c8d9fbf7f443da5fe4d02e

name = 'sem'
2 changes: 2 additions & 0 deletions sem/cli.py
Expand Up @@ -235,9 +235,11 @@ def command(results_dir, result_id):

click.echo("Simulation command:")
click.echo(sem.utils.get_command_from_result(campaign.db.get_script(),
campaign.dir,
result))
click.echo("Debug command:")
click.echo(sem.utils.get_command_from_result(campaign.db.get_script(),
campaign.dir,
result,
debug=True))

Expand Down
21 changes: 17 additions & 4 deletions sem/manager.py
Expand Up @@ -290,7 +290,7 @@ def check_and_fill_parameters(self, param_list, needs_rngrun):
# Simulation running #
######################

def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
def run_simulations(self, param_list, show_progress=True, callbacks: list = [], stop_on_errors=True):
"""
Run several simulations specified by a list of parameter combinations.

Expand All @@ -305,6 +305,10 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
can be either a string or a number).
show_progress (bool): whether or not to show a progress bar with
percentage and expected remaining time.
callbacks (list): list of objects extending CallbackBase to be
triggered during the run.
stop_on_errors (bool): whether or not to stop the execution of the simulations
if an error occurs.
"""

# Make sure we have a runner to run simulations with.
Expand All @@ -315,7 +319,7 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
" for this CampaignManager.")

# Return if the list is empty
if param_list == []:
if not param_list:
return

self.check_and_fill_parameters(param_list, needs_rngrun=True)
Expand All @@ -339,12 +343,14 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
# computation is performed on this line.
results = self.runner.run_simulations(param_list,
self.db.get_data_dir(),
callbacks=callbacks,
stop_on_errors=stop_on_errors)

# Wrap the result generator in the progress bar generator.
if show_progress:
result_generator = tqdm(results, total=len(param_list),
unit='simulation',
smoothing=0,
desc='Running simulations')
else:
result_generator = results
Expand Down Expand Up @@ -377,8 +383,7 @@ def run_and_save_results(self, result_generator, batch_results=True):
self.db.insert_results(results_batch)
self.db.write_to_disk()

def get_missing_simulations(self, param_list, runs=None,
with_time_estimate=False):
def get_missing_simulations(self, param_list, runs=None, with_time_estimate=False):
"""
Return a list of the simulations among the required ones that are not
available in the database.
Expand All @@ -389,6 +394,7 @@ def get_missing_simulations(self, param_list, runs=None,
runs (int): an integer representing how many repetitions are wanted
for each parameter combination, None if the dictionaries in
param_list already feature the desired RngRun value.
with_time_estimate (bool): a boolean representing ...
"""

params_to_simulate = []
Expand Down Expand Up @@ -451,6 +457,7 @@ def get_missing_simulations(self, param_list, runs=None,

def run_missing_simulations(self, param_list, runs=None,
condition_checking_function=None,
callbacks=[],
stop_on_errors=True):
"""
Run the simulations from the parameter list that are not yet available
Expand All @@ -470,6 +477,10 @@ def run_missing_simulations(self, param_list, runs=None,
runs (int): the number of runs to perform for each parameter
combination. This parameter is only allowed if the param_list
specification doesn't feature an 'RngRun' key already.
callbacks (list): list of objects extending CallbackBase to be
triggered during the run.
stop_on_errors (bool): whether or not to stop the execution of the simulations
if an error occurs.
"""
# Expand the parameter specification
param_list = list_param_combinations(param_list)
Expand Down Expand Up @@ -503,10 +514,12 @@ def run_missing_simulations(self, param_list, runs=None,
self.get_missing_simulations(param_list,
runs,
with_time_estimate=True),
callbacks=callbacks,
stop_on_errors=stop_on_errors)
else:
self.run_simulations(
self.get_missing_simulations(param_list, runs),
callbacks=callbacks,
stop_on_errors=stop_on_errors)

#####################
Expand Down
25 changes: 22 additions & 3 deletions sem/parallelrunner.py
@@ -1,28 +1,46 @@
from .runner import SimulationRunner
from multiprocessing import Pool

from .utils import CallbackBase
from multiprocessing.pool import ThreadPool as Pool
# We use ThreadPool to share the process memory among the different simulations to enable the use of callbacks.
# This may be improved eventually using a grain-fined solution that checks the presence or not of callbacks

class ParallelRunner(SimulationRunner):

"""
A Runner which can perform simulations in parallel on the current machine.
"""
data_folder: str = None
stop_on_errors: bool = False
callbacks: [CallbackBase] = []

def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False):
"""
This function runs multiple simulations in parallel.

Args:
parameter_list (list): list of parameter combinations to simulate.
data_folder (str): folder in which to create output folders.
callbacks (list): list of callbacks to be triggered
stop_on_errors (bool): check whether simulation has to stop on errors or not
"""

if callbacks is not None:
for cb in callbacks:
cb.on_simulation_start(len(list(enumerate(parameter_list))))
cb.controlled_by_parent = True

self.data_folder = data_folder
self.stop_on_errors = stop_on_errors
self.callbacks = callbacks
with Pool(processes=self.max_parallel_processes) as pool:
for result in pool.imap_unordered(self.launch_simulation,
parameter_list):
yield result

if callbacks is not None:
for cb in callbacks:
cb.on_simulation_end()

def launch_simulation(self, parameter):
"""
Launch a single simulation, using SimulationRunner's facilities.
Expand All @@ -35,4 +53,5 @@ def launch_simulation(self, parameter):
"""
return next(SimulationRunner.run_simulations(self, [parameter],
self.data_folder,
callbacks=self.callbacks,
stop_on_errors=self.stop_on_errors))
40 changes: 33 additions & 7 deletions sem/runner.py
Expand Up @@ -8,6 +8,7 @@
import sys
from importlib.machinery import SourceFileLoader
import types
from .utils import CallbackBase

from tqdm import tqdm

Expand Down Expand Up @@ -72,7 +73,7 @@ def __init__(self, path, script, optimized=True, skip_configuration=False,
build_status_fname = ".lock-ns3_%s_build" % sys.platform
build_status_path = os.path.join(path, build_status_fname)
else:
build_status_fname = "build-status.py"
build_status_fname = "build.py"
if optimized:
build_status_path = os.path.join(path,
'build/optimized/build-status.py')
Expand Down Expand Up @@ -289,17 +290,25 @@ def get_available_parameters(self):
# Simulation running #
######################

def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False):
"""
Run several simulations using a certain combination of parameters.

Yields results as simulations are completed.
Yield results as simulations are completed.

Args:
parameter_list (list): list of parameter combinations to simulate.
data_folder (str): folder in which to save subfolders containing
simulation output.
callbacks (list): list of callbacks to be triggered
stop_on_errors (bool): if true, when a simulation outputs an error the whole campaign will be stopped
"""

# Log simulation start if not already done by parent class
if callbacks is not None:
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_start(len(list(enumerate(parameter_list))))

for _, parameter in enumerate(parameter_list):

Expand All @@ -314,13 +323,19 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
parameter.items()]

# Run from dedicated temporary folder
current_result['meta']['id'] = str(uuid.uuid4())
sim_uuid = str(uuid.uuid4())
current_result['meta']['id'] = sim_uuid
temp_dir = os.path.join(data_folder, current_result['meta']['id'])
os.makedirs(temp_dir)

start = time.time() # Time execution
stdout_file_path = os.path.join(temp_dir, 'stdout')
stderr_file_path = os.path.join(temp_dir, 'stderr')

if callbacks is not None:
for cb in callbacks:
cb.on_run_start(parameter, sim_uuid)

with open(stdout_file_path, 'w') as stdout_file, open(
stderr_file_path, 'w') as stderr_file:
return_code = subprocess.call(command, cwd=temp_dir,
Expand All @@ -329,11 +344,16 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
stderr=stderr_file)
end = time.time() # Time execution

if callbacks is not None:
for cb in callbacks:
cb.on_run_end(sim_uuid, return_code, end - start)

if return_code != 0:

with open(stdout_file_path, 'r') as stdout_file, open(
stderr_file_path, 'r') as stderr_file:
complete_command = sem.utils.get_command_from_result(self.script, current_result)
complete_command_debug = sem.utils.get_command_from_result(self.script, current_result, debug=True)
complete_command = sem.utils.get_command_from_result(self.script, self.path, current_result)
complete_command_debug = sem.utils.get_command_from_result(self.script, self.path, current_result, debug=True)
error_message = ('\nSimulation exited with an error.\n'
'Params: %s\n'
'Stderr: %s\n'
Expand All @@ -349,9 +369,15 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
complete_command_debug))
if stop_on_errors:
raise Exception(error_message)
print(error_message)
print(error_message)

current_result['meta']['elapsed_time'] = end-start
current_result['meta']['exitcode'] = return_code

yield current_result

# Log simulation start if not already done by parent class
if callbacks is not None:
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_end()