diff --git a/.travis.yml b/.travis.yml index 42fd00fe7..927aebeb6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ env: - OMPI_MCA_rmaps_base_oversubscribe=yes matrix: - MPI=mpich - - MPI=openmpi + #- MPI=openmpi matrix: allow_failures: @@ -45,30 +45,31 @@ before_install: - export PATH="$HOME/miniconda/bin:$PATH" - hash -r - conda config --set always_yes yes --set changeps1 no - - conda update -q conda + - conda update -q -y conda - conda info -a # For debugging any issues with conda - conda config --add channels conda-forge - conda create --yes --name condaenv python=$TRAVIS_PYTHON_VERSION - source activate condaenv install: + - conda install gcc_linux-64 - conda install $MPI - - pip install numpy - - pip install scipy - - pip install mpi4py - - pip install petsc petsc4py + - conda install --no-update-deps scipy # includes numpy + - conda install --no-update-deps mpi4py + - conda install --no-update-deps petsc4py petsc + - conda install --no-update-deps nlopt + # pip install these as the conda installs downgrade pytest on python3.4 - pip install pytest - pip install pytest-cov - - pip install pytest-timeout + - pip install pytest-timeout - pip install mock - pip install coveralls - - conda install --no-deps nlopt # For confirmation of MPI library being used. - python conda/find_mpi.py #locate compilers - mpiexec --version #Show MPI library details - pip install -e . -# Run test +# Run test (-z show output) script: - libensemble/tests/run-tests.sh -z diff --git a/conda/build-conda-package.sh b/conda/build-conda-package.sh deleted file mode 100755 index 7edfd57ab..000000000 --- a/conda/build-conda-package.sh +++ /dev/null @@ -1,5 +0,0 @@ -#To be implemented - -echo -e "\nBuild conda package is not yet implemented - this is a placeholder" -echo -e "To install dependencies run ./conda-install-deps.sh\n" - diff --git a/conda/conda-install-deps.sh b/conda/conda-install-deps.sh index 3c9fe5803..4fdcf577d 100755 --- a/conda/conda-install-deps.sh +++ b/conda/conda-install-deps.sh @@ -1,27 +1,62 @@ #!/usr/bin/env bash -#Note - This assumes miniconda - with anaconda some will already be installed -#You must have installed miniconda: https://conda.io/docs/install/quick.html +# Install dependencies for libEnsemble and tests in Conda. +# Note: libEnsemble itself is currently not in Conda +# To replicate travis builds - see directory run_travis_locally/ -#To create and activate new environment called py3.6 -#> conda create --name py3.6 python=3.6 -#> source activate py3.6 +# Note - This assumes miniconda - with anaconda some will already be installed +# You must have installed miniconda: https://conda.io/docs/install/quick.html -#To come out of env: source deactivate -#To see envs: conda info --envs +# To isolate from local installs run this before activating environment +# export PYTHONNOUSERSITE=1 + +# To create and activate new environment called py3.6 +# > conda create --name py3.6 python=3.6 +# > source activate py3.6 +# To enable running this script without prompts: +# > conda config --set always_yes yes +# Then source this script: . ./conda-install-deps.sh + +# To come out of env: source deactivate +# To see envs: conda info --envs + +# You may need to add the conda-forge chanel +# conda config --add channels conda-forge #-------------------------------------------------------------------------- -#Install packages for libensemble. Do in this order!!! - conda install --yes mpi4py - conda install --yes -c conda-forge petsc4py - conda install --yes -c conda-forge nlopt - conda install --yes pytest pytest-cov - conda install --yes scipy - - #To use dev version of mpi4py - #conda install --yes cython - #pip install git+https://bitbucket.org/mpi4py/mpi4py.git@master - - echo -e "\nDo 'conda list' to see installed packages" - echo -e "Do './run-tests.sh' to run the tests\n" +# Install packages for libensemble. Do in this order!!! + +# This should work for mpich as of v0.4.1 +# For other MPI libraries, some packages, such as mpi4py and PETSc may +# need to be pip installed. + +export MPI=MPICH +export LIBE_BRANCH=master + +conda install gcc_linux-64 || return +conda install $MPI || return +#conda install numpy || return #scipy includes numpy +conda install --no-update-deps scipy || return +conda install --no-update-deps mpi4py || return +conda install --no-update-deps petsc4py petsc || return +conda install --no-update-deps nlopt || return + +# pip install these as the conda installs downgrade pytest on python3.4 +pip install pytest || return +pip install pytest-cov || return +pip install pytest-timeout || return +pip install mock || return +# pip install coveralls || return # for online + +# Install libEnsemble + +# From source +git clone -b $LIBE_BRANCH https://github.com/Libensemble/libensemble.git || return +cd libensemble/ || return +pip install -e . || return +# OR +# pip install libEnsemble + +echo -e "\nDo 'conda list' to see installed packages" +echo -e "Do 'libensemble/tests/run-tests.sh' to run the tests\n" diff --git a/conda/replicate-travis-local.sh b/conda/replicate-travis-local.sh deleted file mode 100755 index a8094762c..000000000 --- a/conda/replicate-travis-local.sh +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env bash -# Script to run same steps as travis CI - at time of writing. -# Note: Travis steps are wrapped in some checks as a precaution -# Note: This does NOT guarantee Travis will pass -# Future - Separate kernel and call from travis and here. -# - Maybe use docker container like travis - - TRAVIS_PYTHON_VERSION=3.5 - RUN_TESTS_SCRIPT=../code/tests/run-tests.sh - - if [ -n $1 ]; then - TRAVIS_PYTHON_VERSION=$1 - fi; - - # Checks ----------------------------------------------------------------------------------------- - cont=true - - #Check can find run-tests.sh - normally run in conda/ dir. - if [ ! -e $RUN_TESTS_SCRIPT ]; then - echo -e 'Cannot find $RUN_TESTS_SCRIPT - aborting' - cont=false - fi; - - #Check for existing miniconda - if [ $cont = 'true' ]; then - # Prompt if miniconda already installed - up to user to delete - won't do automatically - # If miniconda2/miniconda3 etc dirs exist this should not remove - but will prepend on path. - miniconda_exists=false - #miniconda_name=miniconda - if [ -d ~/miniconda ]; then - miniconda_exists=true - echo -e 'A miniconda installation already exists - script will not delete it - carry on without re-install y/n?' - read var - if [ $var = 'y' ] || [ $var = 'Y' ]; then - echo -e 'Miniconda will still check for updates - but existing packages will not be removed' - cont=true; - else - echo -e 'Aborting - run: Remove ~/miniconda and re-run' - cont=false - fi; - else - cont=true; - fi; - fi; - - #Check if env called condaenv exists in this miniconda - if [ $cont = 'true' ]; then - if [ $miniconda_exists = 'true' ]; then - conda info -e - if [[ $(conda info -e | egrep condaenv) ]]; then - echo -e 'Environment condaenv already exists - remove y/n?' - read var - if [ $var = 'y' ] || [ $var = 'Y' ]; then - echo -e 'Removing condaenv' - if [[ $(conda remove --name condaenv --all) ]]; then - cont=true; - else - echo -e 'Failed to remove condaenv' - cont=false - fi; - else - echo -e 'Aborting - run: Remove env condaenv and re-run' - cont=false - fi; - fi; - fi; - fi; - - # Install and run tests -------------------------------------------------------------------------- - - if [ $cont = 'true' ]; then - echo -e "\nRunning Travis CI steps for python $TRAVIS_PYTHON_VERSION\n" - - if [ $miniconda_exists = "false" ]; then - #Do this conditionally because it saves some downloading if the version is the same. - if [[ "$TRAVIS_PYTHON_VERSION" == "2.7" ]]; then - wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O miniconda.sh; - else - wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh; - fi; - fi; - - bash miniconda.sh -b -p $HOME/miniconda - export PATH="$HOME/miniconda/bin:$PATH" - hash -r - conda config --set always_yes yes --set changeps1 no - conda update -q conda - conda info -a # For debugging any issues with conda - conda create --yes --name condaenv python=$TRAVIS_PYTHON_VERSION - source activate condaenv - - # Currently mpi4py 2.0.0 fails with mpi_init error on some platforms need dev version from source. - # Means installing dependencies sep. including MPI lib. - #install: - conda install --yes mpi4py - conda install --yes -c conda-forge petsc4py - conda install --yes -c conda-forge nlopt - conda install --yes pytest pytest-cov - conda install --yes -c conda-forge coveralls - conda install --yes scipy - conda install --yes cython - pip install git+https://bitbucket.org/mpi4py/mpi4py.git@master - # python setup.py install - - # Run test - #script: - $RUN_TESTS_SCRIPT -fi; diff --git a/conda/run_travis_locally/build_mpich_libE.sh b/conda/run_travis_locally/build_mpich_libE.sh new file mode 100755 index 000000000..83c2b7915 --- /dev/null +++ b/conda/run_travis_locally/build_mpich_libE.sh @@ -0,0 +1,83 @@ +# Conda build of dependencies using mpich. +# Source script to maintain environment after running. Script stops if installs fail. +# Note for other MPIs may need to install some packages from source (eg. petsc) + +# -x echo commands +set -x # problem with this - loads of conda internal commands shown - overwhelming. + +export PYTHON_VERSION=3.7 # default - override with -p +export LIBE_BRANCH="develop" # default - override with -b + +export MPI=MPICH +export HYDRA_LAUNCHER=fork + +# Allow user to optionally set python version and branch +# E.g: ". ./build_mpich_libE.sh -p 3.4 -b feature/myfeature" +while getopts ":p:b:" opt; do + case $opt in + p) + echo "Parameter supplied for Python version: $OPTARG" >&2 + PYTHON_VERSION=$OPTARG + ;; + b) + echo "Parameter supplied for branch name: $OPTARG" >&2 + LIBE_BRANCH=${OPTARG} + ;; + \?) + echo "Invalid option supplied: -$OPTARG" >&2 + exit 1 + ;; + :) + echo "Option -$OPTARG requires an argument." >&2 + exit 1 + ;; + esac +done + +echo -e "\nBuilding libE with python $PYTHON_VERSION and branch ${LIBE_BRANCH}\n" + +sudo apt-get update + +# This works if not sourced but if sourced its no good. +# set -e + +sudo apt install gfortran || return +sudo apt install libblas-dev || return +sudo apt-get install liblapack-dev || return + +wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh || return +bash miniconda.sh -b -p $HOME/miniconda || return +export PATH="$HOME/miniconda/bin:$PATH" || return +conda update -q -y conda + +conda config --add channels conda-forge || return +conda config --set always_yes yes --set changeps1 no || return +conda create --yes --name condaenv python=$PYTHON_VERSION || return + +source activate condaenv || return +wait + +conda install gcc_linux-64 || return +conda install $MPI || return +#conda install numpy || return #scipy includes numpy +conda install --no-update-deps scipy || return +conda install --no-update-deps mpi4py || return +conda install --no-update-deps petsc4py petsc || return +conda install --no-update-deps nlopt || return + +# pip install these as the conda installs downgrade pytest on python3.4 +pip install pytest || return +pip install pytest-cov || return +pip install pytest-timeout || return +pip install mock || return +pip install coveralls || return + +# Not required on travis +git clone -b $LIBE_BRANCH https://github.com/Libensemble/libensemble.git || return +cd libensemble/ || return +pip install -e . || return + +libensemble/tests/run-tests.sh + +echo -e "\n\nScript completed...\n\n" +set +ex diff --git a/conda/run_travis_locally/readme.md b/conda/run_travis_locally/readme.md new file mode 100644 index 000000000..9e2d95dca --- /dev/null +++ b/conda/run_travis_locally/readme.md @@ -0,0 +1,165 @@ +# How to install Travis Python container locally and build libEnsemble + +## Installing Travis Locally + +This explains how to replicate the Travis container environment locally. + +This can save a lot of git pushes and trial-and-error when there is problems in Travis builds that are not +observed locally. + +#### References: + +[How to Run TravisCI locally on Docker](https://medium.com/google-developers/how-to-run-travisci-locally-on-docker-822fc6b2db2e) + +[Travis CI Docker instructions](https://docs.travis-ci.com/user/common-build-problems/#troubleshooting-locally-in-a-docker-image) + +I did it as follows on Ubuntu 18.04. + +### Install Docker + +Here are the lines for Ubuntu. Go one at a time. Taken from [here](https://www.digitalocean.com/community/tutorials/how-to-install-and-use-docker-on-ubuntu-18-04) (see for details or if issues with these lines). + + sudo apt update + sudo apt install apt-transport-https ca-certificates curl software-properties-common + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - + sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu bionic stable" + sudo apt update + apt-cache policy docker-ce + sudo apt install docker-ce + +Add user name to docker group so dont need sudo before docker: + + sudo usermod -aG docker ${USER} + su - ${USER} + id -nG # Confirm username added to docker users + + +### Install Travis Image for Python and create a container + +Install the Travis Docker image for Python and run the container (server): + + sudo docker run --name travis-debug-3.7-v1 -dit travisci/ci-garnet:packer-1512502276-986baf0 /sbin/init + +This might take a while if image is not downloaded. Once image is downloaded it will be run from cache. + +Note: +travisci/ci-garnet:packer-1512502276-986baf0 is the Travis Python image. +travis-debug-3.7-v1 is the name you are assigning to the new container made from the image. + +Alternative travisCI docker images can be found [here](https://hub.docker.com/r/travisci/ci-garnet/tags/). + + +### Start a shell session in the container + +Then open a shell in running container: + + sudo docker exec -it travis-debug-3.7-v1 bash -l + +Prompt should say travis@ rather than root@: + + +### Build and run libEnsemble for a given Python version + +You now want to copy the latest build script to the container and run it. + +The default build script is build_mpich_libE.sh. If this is not up to date, check +the installs against .travis.yml in the top level libEnsemble package directory. + + +#### Copy build script from host system to the running container + +Run the following **from your host machine environment**. This copies the given file into an existing +container named travis-debug-3.7-v1. Where /home/travis is target location in container +filesystem. + + docker cp build_mpich_libE.sh travis-debug-3.7-v1:/home/travis + +On the docker side you may need to set ownership of the script: + + chown travis:travis /home/travis/build_mpich_libE.sh + + +#### Run the libEnsemble build-and-run script + +Now, in the docker container, become user travis: + + su - travis + +Source the script, setting python version and libEnsemble branch if necessary (see script for defaults). +**Always source** to maintain environment variables after running (inc. miniconda path). The following +would run with Python 3.7 and test the libEnsemble branch hotfix/logging: + + . ./build_mpich_libE.sh -p 3.7 -b hotfix/logging + +Note: libEnsemble will be git cloned and checked out at the given branch. + +The script should build all dependencies and run tests. Having sourced the script, you should +be left in the same environment for debugging. The script should stop running if an install step fails. + +If the conda output is too verbose, remove the "set -x" line in the script. + + +### Saving images from built containers and re-starting containers + +To exit the container session (client). + exit # Travis user + exit # Exit running session (but container server is still running. + +Re-enter while background container still running: + + sudo docker exec -it travis-debug-3.7-v1 bash -l + su - travis + +Note that environment variables are not saved. + +To save the modified container as an image (with your changes to filesystem): + +Note: This will not by default save your environment variables + +First to check running containers (ie. running server - whether or not you are in a session): + + docker ps + +OR + + docker container ls + +The first column of output is the container ID. + +To save an image locally from this container: + + docker commit -m "Optional log message" / + +Where is your user name on the machine and is what you +want to call your new image. + +Now it should show if you do: + + docker images + +If it is saved you can stop the container (server) thats running and restart eg. + + docker stop travis-debug-3.7-v1 + +where travis-debug-3.7-v1 is the name you gave the container on the docker run command. + +You can restart from your new image with docker run and docker exec or to run server and run session in one: + + docker run -it / /bin/bash + +where / is as used above to save (or from first column in "docker images" output). + + +### Notes + +The CPU cores available will be all those on your machine, not just what Travis supports. +Parallel/threaded code run-time errors may well depend on timing of processes/threads and +so may not be replicated. At time of writing, Travis provides 2 cores. Resources in docker +container can be [restricted](https://docs.docker.com/config/containers/resource_constraints/). + +An alternative to this process is to log in to your Travis build and debug. For public +repositories, this requires contacting Travis support for a token that can only be used by +the given user. See [here](https://docs.travis-ci.com/user/running-build-in-debug-mode/). + + + diff --git a/docs/conf.py b/docs/conf.py index c8ec95e5b..fcbeefae9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -90,9 +90,9 @@ def __getattr__(cls, name): # built documents. # # The short X.Y version. -version = '0.3.0' +version = '0.4.1' # The full version, including alpha/beta/rc tags. -release = '0.3.0' +release = '0.4.1' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/data_structures/libE_specs.rst b/docs/data_structures/libE_specs.rst index a3973ca4b..f69768600 100644 --- a/docs/data_structures/libE_specs.rst +++ b/docs/data_structures/libE_specs.rst @@ -10,5 +10,3 @@ Specifications for libEnsemble:: libEnsemble communicator. Default: MPI.COMM_WORLD 'color' [int] : Communicator color. Default: 0 - 'queue_update_function' [func] : - Default: [] diff --git a/docs/index.rst b/docs/index.rst index 591d9e808..9be9a53a1 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -21,10 +21,11 @@ Library for managing ensemble-like collections of computations. :caption: User Guide: user_guide - libE_module + libE_module data_structures/data_structures user_funcs job_controller/jc_index + logging .. toctree:: :maxdepth: 2 diff --git a/docs/logging.rst b/docs/logging.rst new file mode 100644 index 000000000..fe1bf4ff5 --- /dev/null +++ b/docs/logging.rst @@ -0,0 +1,11 @@ +Logging +======= + +The libEnsemble logger uses the standard Python logging levels (DEBUG, INFO, WARNING, ERROR, CRITICAL). + +The default level is DEBUG as this can provide useful diagnostics. However, INFO level includes information such as how jobs are launched and when jobs are killed. libEnsemble produces logging to the file ensemble.log. + +The logging level can be changed at the top of the calling scripts E.g:: + + from libensemble import libE_logger + libE_logger.set_level('INFO') diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 2469a97ef..2790f0a2a 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -2,6 +2,20 @@ Release Notes ============= + +Release 0.4.1 +------------- + +:Date: February 20, 2019 + + +* Logging no longer uses root logger (Also added option to change libEnsemble log level) (#105) +* Added wait_on_run option for job controller launch to block until jobs have started (#111) +* persis_info can be passed to sim as well as gen functions (#112) +* Post-processing scripts added to create performance/utilization graphs (#102) +* New scaling test added (not part of current CI test suite) (#114) + + Release 0.4.0 ------------- diff --git a/docs/sim_gen_alloc_funcs.rst b/docs/sim_gen_alloc_funcs.rst index 9dc3bc24f..6f253d24a 100644 --- a/docs/sim_gen_alloc_funcs.rst +++ b/docs/sim_gen_alloc_funcs.rst @@ -91,7 +91,7 @@ alloc_f API The alloc_f calculations will be called by libEnsemble with the following API:: - Work, persis_info = alloc_f(W, H, sim_specs, gen_specs, persis_info) + Work, persis_info = alloc_f(W, H, sim_specs, gen_specs, alloc_specs, persis_info) Parameters: *********** @@ -108,6 +108,9 @@ Parameters: **gen_specs**: :obj:`dict` :doc:`(example)` + **alloc_specs**: :obj:`dict` + :doc:`(example)` + **persis_info**: :obj:`dict` :doc:`(example)` diff --git a/docs/user_guide.rst b/docs/user_guide.rst index 6c9df22d2..1def2a0a6 100644 --- a/docs/user_guide.rst +++ b/docs/user_guide.rst @@ -114,5 +114,6 @@ a summary file is produced for each worker in directory named *libe_stat_files*. the run these are concatenated into *libe_summary.txt*. If a run aborts, the *libe_stat_files* directory will remain. -**ensemble.log**: This is the logging output from libEnsemble. The default logging is DEBUG level, as +**ensemble.log**: This is the logging output from libEnsemble. The default logging is at DEBUG level, as this can provide useful diagnostics. If this file is not removed, multiple runs will append output. +For more info, see :doc:`Logging`. diff --git a/libensemble/__init__.py b/libensemble/__init__.py index dc3a0c548..a646b46a1 100644 --- a/libensemble/__init__.py +++ b/libensemble/__init__.py @@ -4,6 +4,8 @@ Library for managing ensemble-like collections of computations. """ -__version__ = "0.4.0" +__version__ = "0.4.1" __author__ = 'Jeffrey Larson, Stephen Hudson and David Bindel' __credits__ = 'Argonne National Laboratory' + +from libensemble import libE_logger diff --git a/libensemble/alloc_funcs/fast_alloc.py b/libensemble/alloc_funcs/fast_alloc.py index abd78ae2b..2c33f051f 100644 --- a/libensemble/alloc_funcs/fast_alloc.py +++ b/libensemble/alloc_funcs/fast_alloc.py @@ -5,7 +5,7 @@ avail_worker_ids, sim_work, gen_work, count_gens -def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): +def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info): """ This allocation function gives (in order) entries in ``H`` to idle workers to evaluate in the simulation function. The fields in ``sim_specs['in']`` @@ -24,7 +24,7 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): if persis_info['next_to_give'] < len(H): # Give sim work if possible - sim_work(Work, i, sim_specs['in'], [persis_info['next_to_give']]) + sim_work(Work, i, sim_specs['in'], [persis_info['next_to_give']], []) persis_info['next_to_give'] += 1 elif gen_count < gen_specs.get('num_active_gens', gen_count+1): @@ -32,6 +32,6 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): # Give gen work persis_info['total_gen_calls'] += 1 gen_count += 1 - gen_work(Work, i, gen_specs['in'], persis_info[i], []) + gen_work(Work, i, gen_specs['in'], [], persis_info[i]) return Work, persis_info diff --git a/libensemble/alloc_funcs/fast_alloc_and_pausing.py b/libensemble/alloc_funcs/fast_alloc_and_pausing.py new file mode 100644 index 000000000..7d448c49c --- /dev/null +++ b/libensemble/alloc_funcs/fast_alloc_and_pausing.py @@ -0,0 +1,130 @@ +from __future__ import division +from __future__ import absolute_import + +import numpy as np +import sys + +from libensemble.alloc_funcs.support import \ + avail_worker_ids, sim_work, gen_work, count_gens + +def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info): + """ + This allocation function gives (in order) entries in ``H`` to idle workers + to evaluate in the simulation function. The fields in ``sim_specs['in']`` + are given. If all entries in `H` have been given a be evaluated, a worker + is told to call the generator function, provided this wouldn't result in + more than ``gen_specs['num_active_gen']`` active generators. Also allows + for a 'batch_mode'. + + When there are multiple objective components, this allocation function + does not evaluate further components for some point in the following + scenarios: + + alloc_specs['stop_on_NaNs']: True --- after a NaN has been found in returned in some + objective component + allocated['stop_partial_fvec_eval']: True --- after the value returned from + combine_component_func is larger than a known upper bound on the objective. + + :See: + ``/libensemble/tests/regression_tests/test_chwirut_uniform_sampling_one_residual_at_a_time.py`` + """ + + Work = {} + gen_count = count_gens(W) + + if len(H)!=persis_info['H_len']: + # Something new is in the history. + persis_info['need_to_give'].update(H['sim_id'][persis_info['H_len']:].tolist()) + persis_info['H_len']=len(H) + + for i in avail_worker_ids(W): + + pt_ids_to_pause = set() + + # Find indices of H that are not yet allocated + if len(persis_info['need_to_give']): + # Pause entries in H if one component is evaluated at a time and there are + # any NaNs for some components. + if 'stop_on_NaNs' in alloc_specs and alloc_specs['stop_on_NaNs']: + pt_ids_to_pause.update(H['pt_id'][np.isnan(H['f_i'])]) + + # Pause entries in H if a partial combine_component_func evaluation is + # worse than the best, known, complete evaluation (and the point is not a + # local_opt point). + if 'stop_partial_fvec_eval' in alloc_specs and alloc_specs['stop_partial_fvec_eval']: + pt_ids = np.unique(H['pt_id']) + + for j,pt_id in enumerate(pt_ids): + if (pt_id in persis_info['has_nan']) or \ + (pt_id in persis_info['complete']): + continue + + a1 = H['pt_id']==pt_id + if np.any(np.isnan(H['f_i'][a1])): + persis_info['has_nan'].add(pt_id) + continue + + if np.all(H['returned'][a1]): + persis_info['complete'].add(pt_id) + + if len(persis_info['complete']) and len(pt_ids)>1: + complete_fvals_flag = np.zeros(len(pt_ids),dtype=bool) + sys.stdout.flush() + complete_fvals_flag[list(persis_info['complete'])] = True + + # Ensure combine_component_func calculates partial fevals correctly + # with H['f_i'] = 0 for non-returned point + possibly_partial_fvals = np.array([gen_specs['combine_component_func'](H['f_i'][H['pt_id']==j]) for j in pt_ids]) + + best_complete = np.nanmin(possibly_partial_fvals[complete_fvals_flag]) + + worse_flag = np.zeros(len(pt_ids),dtype=bool) + for j in range(len(pt_ids)): + if not np.isnan(possibly_partial_fvals[j]) and possibly_partial_fvals[j] > best_complete: + worse_flag[j] = True + + # Pause incompete evaluations with worse_flag==True + pt_ids_to_pause.update(pt_ids[np.logical_and(worse_flag,~complete_fvals_flag)]) + + if not pt_ids_to_pause.issubset(persis_info['already_paused']): + persis_info['already_paused'].update(pt_ids_to_pause) + sim_ids_to_remove = np.in1d(H['pt_id'],list(pt_ids_to_pause)) + H['paused'][sim_ids_to_remove] = True + + persis_info['need_to_give'].difference(np.where(sim_ids_to_remove)[0]) + + if len(persis_info['need_to_give']) == 0: + continue + + next_row = persis_info['need_to_give'].pop() + sim_work(Work, i, sim_specs['in'], [next_row], []) + + elif gen_count < gen_specs.get('num_active_gens', gen_count+1): + lw = persis_info['last_worker'] + + last_size = persis_info.get('last_size') + if len(H): + # Don't give gen instances in batch mode if points are unfinished + if (gen_specs.get('batch_mode') + and not all(np.logical_or(H['returned'][last_size:], + H['paused'][last_size:]))): + break + # Don't call APOSMM if there are runs going but none need advancing + if len(persis_info[lw]['run_order']): + runs_needing_to_advance = np.zeros(len(persis_info[lw]['run_order']),dtype=bool) + for run,inds in enumerate(persis_info[lw]['run_order'].values()): + runs_needing_to_advance[run] = np.all(H['returned'][inds]) + + if not np.any(runs_needing_to_advance): + break + + persis_info['last_size'] = len(H) + + # Give gen work + persis_info['total_gen_calls'] += 1 + gen_count += 1 + gen_work(Work, i, gen_specs['in'], range(len(H)), persis_info[lw]) + + persis_info['last_worker'] = i + + return Work, persis_info diff --git a/libensemble/alloc_funcs/fast_alloc_to_aposmm.py b/libensemble/alloc_funcs/fast_alloc_to_aposmm.py index 4d15a96d1..f55c33d80 100644 --- a/libensemble/alloc_funcs/fast_alloc_to_aposmm.py +++ b/libensemble/alloc_funcs/fast_alloc_to_aposmm.py @@ -6,7 +6,7 @@ from libensemble.alloc_funcs.support import \ avail_worker_ids, sim_work, gen_work, count_gens -def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): +def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info): """ This allocation function gives (in order) entries in ``H`` to idle workers to evaluate in the simulation function. The fields in ``sim_specs['in']`` @@ -27,7 +27,7 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): # Find indices of H that are not yet allocated if persis_info['next_to_give'] < len(H): # Give sim work if possible - sim_work(Work, i, sim_specs['in'], [persis_info['next_to_give']]) + sim_work(Work, i, sim_specs['in'], [persis_info['next_to_give']], []) persis_info['next_to_give'] += 1 elif gen_count < gen_specs.get('num_active_gens', gen_count+1): @@ -54,7 +54,7 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): # Give gen work persis_info['total_gen_calls'] += 1 gen_count += 1 - gen_work(Work, i, gen_specs['in'], persis_info[lw], range(len(H))) + gen_work(Work, i, gen_specs['in'], range(len(H)), persis_info[lw]) persis_info['last_worker'] = i diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index 47b557016..a52e3c370 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -6,7 +6,7 @@ avail_worker_ids, sim_work, gen_work, count_gens -def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): +def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info): """ Decide what should be given to workers. This allocation function gives any available simulation work first, and only when all simulations are @@ -58,7 +58,7 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): break # Assign resources and mark tasks as allocated to workers - sim_work(Work, i, sim_specs['in'], sim_ids_to_send) + sim_work(Work, i, sim_specs['in'], sim_ids_to_send, persis_info[i]) H['allocated'][sim_ids_to_send] = True # Update resource records @@ -81,6 +81,6 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, persis_info): # Give gen work gen_count += 1 - gen_work(Work, i, gen_specs['in'], persis_info[i], range(len(H))) + gen_work(Work, i, gen_specs['in'], range(len(H)), persis_info[i]) return Work, persis_info diff --git a/libensemble/alloc_funcs/inverse_bayes_allocf.py b/libensemble/alloc_funcs/inverse_bayes_allocf.py index e3e40f153..2e5477d75 100644 --- a/libensemble/alloc_funcs/inverse_bayes_allocf.py +++ b/libensemble/alloc_funcs/inverse_bayes_allocf.py @@ -8,7 +8,7 @@ avail_worker_ids, sim_work, gen_work, count_persis_gens -def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, persis_info): +def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, alloc_specs, persis_info): """ Starts up to gen_count number of persistent generators. These persistent generators produce points (x) in batches and subbatches. @@ -41,8 +41,8 @@ def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, persis_in k = H['batch'][-1] H['weight'][(n*(k-1)):(n*k)] = H['weight'][(n*k):(n*(k+1))] - gen_work(Work, i, ['like'], persis_info[i], - np.atleast_1d(inds_to_send_back), persistent=True) + gen_work(Work, i, ['like'], np.atleast_1d(inds_to_send_back), + persis_info[i], persistent=True) task_avail = ~H['given'] for i in avail_worker_ids(W, persistent=False): @@ -52,14 +52,14 @@ def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, persis_in sim_subbatches = H['subbatch'][task_avail] sim_inds = (sim_subbatches == np.min(sim_subbatches)) sim_ids_to_send = np.nonzero(task_avail)[0][sim_inds] - sim_work(Work, i, sim_specs['in'], np.atleast_1d(sim_ids_to_send)) + sim_work(Work, i, sim_specs['in'], np.atleast_1d(sim_ids_to_send), []) task_avail[sim_ids_to_send] = False elif gen_count == 0: # Finally, generate points since there is nothing else to do. gen_count += 1 - gen_work(Work, i, gen_specs['in'], persis_info[i], - [], persistent=True) + gen_work(Work, i, gen_specs['in'], [], persis_info[i], + persistent=True) return Work, persis_info diff --git a/libensemble/alloc_funcs/start_only_persistent.py b/libensemble/alloc_funcs/start_only_persistent.py index f6a9fdd89..6a0aa81b5 100644 --- a/libensemble/alloc_funcs/start_only_persistent.py +++ b/libensemble/alloc_funcs/start_only_persistent.py @@ -6,7 +6,7 @@ avail_worker_ids, sim_work, gen_work, count_persis_gens -def only_persistent_gens(W, H, sim_specs, gen_specs, persis_info): +def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info): """ This allocation function will give simulation work if possible, but otherwise start up to 1 persistent generator. If all points requested by @@ -29,7 +29,7 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, persis_info): last_ind = np.nonzero(gen_inds)[0][last_time_pos] gen_work(Work, i, sim_specs['in'] + [n[0] for n in sim_specs['out']], - persis_info[i], np.atleast_1d(last_ind), persistent=True) + np.atleast_1d(last_ind), persis_info[i], persistent=True) task_avail = ~H['given'] for i in avail_worker_ids(W, persistent=False): @@ -37,13 +37,13 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, persis_info): # perform sim evaluations from existing runs (if they exist). sim_ids_to_send = np.nonzero(task_avail)[0][0] # oldest point - sim_work(Work, i, sim_specs['in'], np.atleast_1d(sim_ids_to_send)) + sim_work(Work, i, sim_specs['in'], np.atleast_1d(sim_ids_to_send), persis_info[i]) task_avail[sim_ids_to_send] = False elif gen_count == 0: # Finally, generate points since there is nothing else to do. gen_count += 1 - gen_work(Work, i, gen_specs['in'], persis_info[i], - [], persistent=True) + gen_work(Work, i, gen_specs['in'], [], persis_info[i], + persistent=True) return Work, persis_info diff --git a/libensemble/alloc_funcs/start_persistent_local_opt_gens.py b/libensemble/alloc_funcs/start_persistent_local_opt_gens.py index f46dc8328..dd5ecfaf4 100644 --- a/libensemble/alloc_funcs/start_persistent_local_opt_gens.py +++ b/libensemble/alloc_funcs/start_persistent_local_opt_gens.py @@ -10,7 +10,7 @@ initialize_APOSMM, decide_where_to_start_localopt, update_history_dist -def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, persis_info): +def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info): """ This allocation function will: @@ -52,14 +52,14 @@ def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, persis_info): last_ind = np.nonzero(gen_inds)[0][last_time_pos] gen_work(Work, i, sim_specs['in'] + [n[0] for n in sim_specs['out']], - persis_info[i], np.atleast_1d(last_ind), persistent=True) + np.atleast_1d(last_ind), persis_info[i], persistent=True) persis_info[i]['run_order'].append(last_ind) for i in avail_worker_ids(W, persistent=False): # Find candidates to start local opt runs if a sample has been evaluated if np.any(np.logical_and(~H['local_pt'], H['returned'])): - _, _, _, _, r_k, mu, nu = initialize_APOSMM(H, gen_specs) - update_history_dist(H, gen_specs, c_flag=False) + n, _, _, _, r_k, mu, nu = initialize_APOSMM(H, gen_specs) + update_history_dist(H, n, gen_specs, c_flag=False) starting_inds = decide_where_to_start_localopt(H, r_k, mu, nu) else: starting_inds = [] @@ -70,7 +70,7 @@ def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, persis_info): ind = starting_inds[np.argmin(H['f'][starting_inds])] gen_work(Work, i, sim_specs['in'] + [n[0] for n in sim_specs['out']], - persis_info[i], np.atleast_1d(ind), persistent=True) + np.atleast_1d(ind), persis_info[i], persistent=True) H['started_run'][ind] = 1 H['num_active_runs'][ind] += 1 @@ -85,7 +85,7 @@ def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, persis_info): if not np.any(q_inds_logical): q_inds_logical = task_avail sim_ids_to_send = np.nonzero(q_inds_logical)[0][0] # oldest point - sim_work(Work, i, sim_specs['in'], np.atleast_1d(sim_ids_to_send)) + sim_work(Work, i, sim_specs['in'], np.atleast_1d(sim_ids_to_send), []) task_avail[sim_ids_to_send] = False elif (gen_count == 0 @@ -94,6 +94,6 @@ def start_persistent_local_opt_gens(W, H, sim_specs, gen_specs, persis_info): # Finally, generate points since there is nothing else to do gen_count += 1 - gen_work(Work, i, gen_specs['in'], persis_info[i], []) + gen_work(Work, i, gen_specs['in'], [], persis_info[i]) return Work, persis_info diff --git a/libensemble/alloc_funcs/support.py b/libensemble/alloc_funcs/support.py index c817c4ad6..927003ffe 100644 --- a/libensemble/alloc_funcs/support.py +++ b/libensemble/alloc_funcs/support.py @@ -23,16 +23,16 @@ def count_persis_gens(W): return sum(W['persis_state'] == EVAL_GEN_TAG) -def sim_work(Work, i, H_fields, H_rows, **libE_info): +def sim_work(Work, i, H_fields, H_rows, persis_info, **libE_info): "Add sim work record to work array." libE_info['H_rows'] = H_rows Work[i] = {'H_fields': H_fields, - 'persis_info': {}, + 'persis_info': persis_info, 'tag': EVAL_SIM_TAG, 'libE_info': libE_info} -def gen_work(Work, i, H_fields, persis_info, H_rows, **libE_info): +def gen_work(Work, i, H_fields, H_rows, persis_info, **libE_info): "Add gen work record to work array." libE_info['H_rows'] = H_rows Work[i] = {'H_fields': H_fields, diff --git a/libensemble/balsam_controller.py b/libensemble/balsam_controller.py index a9e0506e6..05c49fda2 100644 --- a/libensemble/balsam_controller.py +++ b/libensemble/balsam_controller.py @@ -17,8 +17,7 @@ from balsam.core import models logger = logging.getLogger(__name__ + '(' + MPIResources.get_my_name() + ')') -#For debug messages in this module - uncomment -#(see libE.py to change root logging level) +#To change logging level for just this module #logger.setLevel(logging.DEBUG) @@ -89,6 +88,9 @@ def poll(self): "Balsam state {}".format(balsam_state)) self.state = 'UNKNOWN' + logger.info("Job {} ended with state {}". + format(self.name, self.state)) + elif balsam_state in models.ACTIVE_STATES: self.state = 'RUNNING' self.workdir = self.workdir or self.process.working_directory @@ -110,7 +112,8 @@ def kill(self, wait_time=None): #Could have Wait here and check with Balsam its killed - #but not implemented yet. - + + logger.info("Killing job {}".format(self.name)) self.state = 'USER_KILLED' self.finished = True self.calc_job_timing() @@ -214,7 +217,7 @@ def register_calc(self, full_path, calc_type='sim', desc=None): def launch(self, calc_type, num_procs=None, num_nodes=None, ranks_per_node=None, machinefile=None, app_args=None, stdout=None, stderr=None, stage_inout=None, - hyperthreads=False, test=False): + hyperthreads=False, test=False, wait_on_run=False): """Creates a new job, and either launches or schedules to launch in the job controller @@ -283,8 +286,11 @@ def launch(self, calc_type, num_procs=None, num_nodes=None, add_job_args['stage_out_files'] = "*.out" job.process = dag.add_job(**add_job_args) + + if (wait_on_run): + self._wait_on_run(job) - logger.debug("Added job to Balsam database {}: " + logger.info("Added job to Balsam database {}: " "Worker {} nodes {} ppn {}". format(job.name, self.workerID, num_nodes, ranks_per_node)) diff --git a/libensemble/controller.py b/libensemble/controller.py index 2f87a6498..2f936ca98 100644 --- a/libensemble/controller.py +++ b/libensemble/controller.py @@ -19,8 +19,7 @@ from libensemble.resources import Resources logger = logging.getLogger(__name__ + '(' + Resources.get_my_name() + ')') -#For debug messages in this module - uncomment -#(see libE.py to change root logging level) +#To change logging level for just this module #logger.setLevel(logging.DEBUG) STATES = """ @@ -32,6 +31,11 @@ USER_KILLED FAILED""".split() +NOT_STARTED_STATES = ''' +CREATED +WAITING +'''.split() + class JobControllerException(Exception): "Raised for any exception in the JobController" @@ -182,7 +186,7 @@ def poll(self): self.errcode = self.process.returncode self.success = (self.errcode == 0) self.state = 'FINISHED' if self.success else 'FAILED' - logger.debug("Job {} completed with errcode {} ({})". + logger.info("Job {} completed with errcode {} ({})". format(self.name, self.errcode, self.state)) def kill(self, wait_time=60): @@ -204,7 +208,7 @@ def kill(self, wait_time=60): "Attempting to kill job {} that has no process ID - " "check jobs been launched".format(self.name)) - logger.debug("Killing job {}".format(self.name)) + logger.info("Killing job {}".format(self.name)) launcher.cancel(self.process, wait_time) self.state = 'USER_KILLED' self.finished = True @@ -227,6 +231,16 @@ class JobController: """ controller = None + + + def _wait_on_run(self, job): + '''Called by launch when wait_on_run is True''' + start = time.time() + while job.state in NOT_STARTED_STATES: + time.sleep(0.2) + job.poll() + logger.debug("Job {} polled as {} after {} seconds".format(job.name, job.state, time.time()-start)) + def __init__(self): """Instantiate a new JobController instance. @@ -328,9 +342,10 @@ def set_workerID(self, workerid): def poll(self, job): "Polls a job" - job.poll(job) + job.poll() def kill(self, job): "Kill a job" jassert(isinstance(job, Job), "Invalid job has been provided") job.kill(self.wait_time) + diff --git a/libensemble/gen_funcs/aposmm.py b/libensemble/gen_funcs/aposmm.py index 7764bccf8..c760b6228 100644 --- a/libensemble/gen_funcs/aposmm.py +++ b/libensemble/gen_funcs/aposmm.py @@ -26,10 +26,9 @@ def aposmm_logic(H,persis_info,gen_specs,_): """ - APOSMM as a libEnsemble generation function. Coordinates multiple local - optimization runs, starting from points which do not have a better point - nearby them. This generation function produces/requires the following - fields in ``H``: + APOSMM coordinates multiple local optimization runs, starting from points + which do not have a better point nearby (within a distance ``r_k``). This + generation function produces/requires the following fields in ``H``: - ``'x' [n floats]``: Parameters being optimized over - ``'x_on_cube' [n floats]``: Parameters scaled to the unit cube @@ -43,6 +42,7 @@ def aposmm_logic(H,persis_info,gen_specs,_): - ``'started_run' [bool]``: True if point has started a local optimization run - ``'num_active_runs' [int]``: Counts number of non-terminated local runs the point is in - ``'local_min' [float]``: True if point has been ruled a local minima + - ``'sim_id' [int]``: Row number of entry in history and optionally @@ -50,11 +50,11 @@ def aposmm_logic(H,persis_info,gen_specs,_): - ``'f_i' [float]``: Value of ith objective component (if calculated one at a time) - ``'fvec' [m floats]``: All objective components (if calculated together) - ``'obj_component' [int]``: Index corresponding to value in ``'f_i``' - - ``'pt_id' [int]``: Identify the point + - ``'pt_id' [int]``: Identify the point (useful when evaluating different objective components for a given ``'x'``) When using libEnsemble to do individual objective component evaluations, APOSMM will return ``gen_specs['components']`` copies of each point, but - each component=0 version of the point will only be considered when + the component=0 entry of each point will only be considered when - deciding where to start a run, - best nearby point, @@ -81,9 +81,10 @@ def aposmm_logic(H,persis_info,gen_specs,_): - ``'mu' [float]``: Distance from the boundary that all localopt starting points must satisfy - ``'nu' [float]``: Distance from identified minima that all starting points must satisfy - ``'single_component_at_a_time' [bool]``: True if single objective components will be evaluated at a time - - ``'rk_const' [float]``: + - ``'rk_const' [float]``: Multiplier in front of the r_k value + - ``'max_active_runs' [int]``: Upper bound on the number of runs APOSMM is advancing - And ``gen_specs`` convergence tolerances for NLopt and PETSc/TAO: + And ``gen_specs`` convergence tolerances for NLopt and PETSc/TAO localopt_methods: - ``'fatol' [float]``: - ``'ftol_abs' [float]``: @@ -93,6 +94,20 @@ def aposmm_logic(H,persis_info,gen_specs,_): - ``'xtol_abs' [float]``: - ``'xtol_rel' [float]``: + + As a default, APOSMM starts a local optimization runs from a point that: + + - is not in an active local optimization run, + - is more than ``mu`` from the boundary (in the unit-cube domain), + - is more than ``nu`` from identified minima (in the unit-cube domain), + - does not have a better point within a distance ``r_k`` of it. + + If the above results in more than ``'max_active_runs'`` being advanced, the + best point in each run is determined and the dist_to_better is computed + (with inf being the value for the best run). Then those + ``'max_active_runs'`` runs with largest dist_to_better are advanced + (breaking ties arbitrarily). + :Note: ``gen_specs['combine_component_func']`` must be defined when there are multiple objective components. @@ -132,6 +147,13 @@ def aposmm_logic(H,persis_info,gen_specs,_): exit_code: 0 if a new localopt point has been found, otherwise it's the NLopt/POUNDERS code samples_needed: counts the number of additional uniformly drawn samples needed + + Description of persistent variables used to maintain the state of APOSMM + + persis_info['total_runs']: Running count of started and completed localopt runs + persis_info['run_order']: Sequence of indices of points in each unfinished run + persis_info['old_runs']: Sequence of indices of points in each finished run + """ n, n_s, c_flag, O, r_k, mu, nu = initialize_APOSMM(H, gen_specs) @@ -143,7 +165,7 @@ def aposmm_logic(H,persis_info,gen_specs,_): else: global x_new, pt_in_run, total_pts_in_run # Used to generate a next local opt point - updated_inds = update_history_dist(H, gen_specs, c_flag) + updated_inds = update_history_dist(H, n, gen_specs, c_flag) starting_inds = decide_where_to_start_localopt(H, r_k, mu, nu) updated_inds.update(starting_inds) @@ -156,17 +178,16 @@ def aposmm_logic(H,persis_info,gen_specs,_): H['num_active_runs'][ind] += 1 persis_info['run_order'][new_run_num] = [ind] - persis_info['active_runs'].update([new_run_num]) persis_info['total_runs'] +=1 num_runs = len(persis_info['run_order']) if 'max_active_runs' in gen_specs and gen_specs['max_active_runs'] < num_runs: - run_vals = np.zeros((num_runs,2),dtype=int) + run_vals = np.zeros((num_runs,2),dtype=int) # Stores run number and sim_id (i.e., the row in H) of the best point in each run for i,run in enumerate(persis_info['run_order'].keys()): run_vals[i,0] = run run_vals[i,1] = persis_info['run_order'][run][np.nanargmin(H['f'][persis_info['run_order'][run]])] - P = squareform(pdist(H['x_on_cube'][run_vals[:,1]], 'euclidean')) + P = squareform(pdist(H['x_on_cube'][run_vals[:,1]], 'euclidean')) # Pairwise distance between the best points in each run dist_to_better = np.inf*np.ones(num_runs) for i in range(num_runs): @@ -176,14 +197,14 @@ def aposmm_logic(H,persis_info,gen_specs,_): k_sorted = np.argpartition(-dist_to_better,kth=gen_specs['max_active_runs']-1) # Take max_active_runs largest - persis_info['active_runs'] = set(run_vals[k_sorted[:gen_specs['max_active_runs']],0].astype(int)) + active_runs = set(run_vals[k_sorted[:gen_specs['max_active_runs']],0].astype(int)) else: - persis_info['active_runs'] = set(persis_info['run_order'].keys()) + active_runs = set(persis_info['run_order'].keys()) inactive_runs = set() # Find next point in any uncompleted runs using information stored in persis_info - for run in persis_info['active_runs']: + for run in active_runs: if not np.all(H['returned'][persis_info['run_order'][run]]): continue # Can't advance this run since all of it's points haven't been returned. @@ -205,7 +226,6 @@ def aposmm_logic(H,persis_info,gen_specs,_): persis_info['run_order'][run].append(O['sim_id'][matching_ind[0]]) for i in inactive_runs: - persis_info['active_runs'].remove(i) old_run = persis_info['run_order'].pop(i) # Deletes any information about this run persis_info['old_runs'][i] = old_run @@ -303,7 +323,7 @@ def add_points_to_O(O, pts, H, gen_specs, c_flag, persis_info, local_flag=0, sor return persis_info -def update_history_dist(H, gen_specs, c_flag): +def update_history_dist(H, n, gen_specs, c_flag): """ Updates distances/indices after new points that have been evaluated. @@ -311,8 +331,6 @@ def update_history_dist(H, gen_specs, c_flag): ``/libensemble/alloc_funcs/start_persistent_local_opt_gens.py`` """ - n = len(H['x_on_cube'][0]) - updated_inds = set() new_inds = np.where(~H['known_to_aposmm'])[0] @@ -390,9 +408,16 @@ def update_history_optimal(x_opt, H, run_inds): Updated the history after any point has been declared a local minimum """ - opt_ind = np.where(np.logical_and(np.equal(x_opt,H['x_on_cube']).all(1),~np.isinf(H['f'])))[0] - assert len(opt_ind) == 1, "Why isn't there exactly one optimal point?" - assert opt_ind in run_inds, "Why isn't the run optimum a point in the run?" + # opt_ind = np.where(np.logical_and(np.equal(x_opt,H['x_on_cube']).all(1),~np.isinf(H['f'])))[0] # This fails on some problems. x_opt is 1e-16 away from the point that was given and opt_ind is empty + # assert len(opt_ind) == 1, "Why isn't there exactly one optimal point?" + # assert opt_ind in run_inds, "Why isn't the run optimum a point in the run?" + + dists = np.linalg.norm(H['x_on_cube'][run_inds]-x_opt,axis=1) + ind = np.argmin(dists) + opt_ind = run_inds[ind] + + assert dists[ind] <= 1e-15, "Why is the closest point to x_opt not within 1e-15?" + assert np.min(dists[run_inds != opt_ind]) >= 1e-15, "Why are there two points from the run within 1e-15 of x_opt?" H['local_min'][opt_ind] = 1 H['num_active_runs'][run_inds] -= 1 @@ -846,70 +871,6 @@ def initialize_APOSMM(H, gen_specs): return n, n_s, c_flag, O, r_k, mu, nu -def queue_update_function(H, gen_specs, persis_info): - """ - A specific queue update function that stops evaluations under a variety of - conditions - - gen_specs['stop_on_NaNs'] - gen_specs['stop_partial_fvec_eval'] - H['paused'] - """ - - if len(H)==persis_info['H_len']: - return persis_info - else: - persis_info['H_len']=len(H) - - pt_ids_to_pause = set() - - # Pause entries in H if one component is evaluated at a time and there are - # any NaNs for some components. - if 'stop_on_NaNs' in gen_specs and gen_specs['stop_on_NaNs']: - pt_ids_to_pause.update(H['pt_id'][np.isnan(H['f_i'])]) - - # Pause entries in H if a partial combine_component_func evaluation is - # worse than the best, known, complete evaluation (and the point is not a - # local_opt point). - if 'stop_partial_fvec_eval' in gen_specs and gen_specs['stop_partial_fvec_eval']: - pt_ids = np.unique(H['pt_id']) - - complete_fvals_flag = np.zeros(len(pt_ids),dtype=bool) - for i,pt_id in enumerate(pt_ids): - if pt_id in persis_info['has_nan']: - continue - - a1 = H['pt_id']==pt_id - if np.any(np.isnan(H['f_i'][a1])): - persis_info['has_nan'].add(pt_id) - continue - - if np.all(H['returned'][a1]): - complete_fvals_flag[i] = True - persis_info['complete'].add(pt_id) - - # complete_fvals_flag = np.array([np.all(H['returned'][H['pt_id']==i]) for i in pt_ids],dtype=bool) - - if np.any(complete_fvals_flag) and len(pt_ids)>1: - # Ensure combine_component_func calculates partial fevals correctly - # with H['f_i'] = 0 for non-returned point - possibly_partial_fvals = np.array([gen_specs['combine_component_func'](H['f_i'][H['pt_id']==i]) for i in pt_ids]) - - best_complete = np.nanmin(possibly_partial_fvals[complete_fvals_flag]) - - worse_flag = np.zeros(len(pt_ids),dtype=bool) - for i in range(len(pt_ids)): - if not np.isnan(possibly_partial_fvals[i]) and possibly_partial_fvals[i] > best_complete: - worse_flag[i] = True - - # Pause incompete evaluations with worse_flag==True - pt_ids_to_pause.update(pt_ids[np.logical_and(worse_flag,~complete_fvals_flag)]) - - if not pt_ids_to_pause.issubset(persis_info['already_paused']): - persis_info['already_paused'].update(pt_ids_to_pause) - H['paused'][np.in1d(H['pt_id'],list(pt_ids_to_pause))] = True - - return persis_info def display_exception(e): print(e.__doc__) diff --git a/libensemble/libE.py b/libensemble/libE.py index 7c1769ccc..7cc1872ba 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -20,7 +20,7 @@ # (Set above libe imports so errors in import are captured) # LEVEL: DEBUG/INFO/WARNING/ERROR #logging.basicConfig(level=logging.INFO, format='%(name)s (%(levelname)s): %(message)s') -logging.basicConfig(filename='ensemble.log', level=logging.DEBUG, format='%(name)s (%(levelname)s): %(message)s') +#logging.basicConfig(filename='ensemble.log', level=logging.DEBUG, format='%(name)s (%(levelname)s): %(message)s') from libensemble.history import History from libensemble.libE_manager import manager_main @@ -30,7 +30,7 @@ from libensemble.message_numbers import ABORT_ENSEMBLE logger = logging.getLogger(__name__) -#For debug messages in this module - uncomment (see libE.py to change root logging level) +#To change logging level for just this module #logger.setLevel(logging.DEBUG) @@ -187,6 +187,8 @@ def check_inputs(libE_specs, alloc_specs, sim_specs, gen_specs, exit_criteria, H if 'color' not in libE_specs: libE_specs['color'] = 0 + + assert libE_specs['comm'].Get_size() > 1, "Manager only - must be at least one worker (2 MPI tasks)" assert isinstance(sim_specs, dict), "sim_specs must be a dictionary" assert isinstance(gen_specs, dict), "gen_specs must be a dictionary" diff --git a/libensemble/libE_logger.py b/libensemble/libE_logger.py new file mode 100644 index 000000000..7603f80aa --- /dev/null +++ b/libensemble/libE_logger.py @@ -0,0 +1,14 @@ +import logging + +logger = logging.getLogger(__package__ ) +logger.setLevel(logging.DEBUG) +fh = logging.FileHandler('ensemble.log') +formatter = logging.Formatter('%(name)s (%(levelname)s): %(message)s') +fh.setFormatter(formatter) +logger.addHandler(fh) + +#logger.debug("Testing top level logger") + +def set_level(level): + numeric_level = getattr(logging, level.upper(), 10) + logger.setLevel(numeric_level) diff --git a/libensemble/libE_manager.py b/libensemble/libE_manager.py index be35a40b8..0a90955a4 100644 --- a/libensemble/libE_manager.py +++ b/libensemble/libE_manager.py @@ -355,17 +355,10 @@ def _final_receive_and_kill(self, persis_info): # --- Main loop - def _queue_update(self, H, persis_info): - "Call queue update function from libE_specs (if defined)" - if 'queue_update_function' not in self.libE_specs or not len(H): - return persis_info - qfun = self.libE_specs['queue_update_function'] - return qfun(H, self.gen_specs, persis_info) - def _alloc_work(self, H, persis_info): "Call work allocation function from alloc_specs" alloc_f = self.alloc_specs['alloc_f'] - return alloc_f(self.W, H, self.sim_specs, self.gen_specs, persis_info) + return alloc_f(self.W, H, self.sim_specs, self.gen_specs, self.alloc_specs, persis_info) def run(self, persis_info): "Run the manager." @@ -379,7 +372,6 @@ def run(self, persis_info): ### Continue receiving and giving until termination test is satisfied while not self.term_test(): persis_info = self._receive_from_workers(persis_info) - persis_info = self._queue_update(self.hist.trim_H(), persis_info) if any(self.W['active'] == 0): Work, persis_info = self._alloc_work(self.hist.trim_H(), persis_info) diff --git a/libensemble/libE_worker.py b/libensemble/libE_worker.py index f15c780d9..a82bd1b55 100644 --- a/libensemble/libE_worker.py +++ b/libensemble/libE_worker.py @@ -18,7 +18,8 @@ UNSET_TAG, STOP_TAG, CALC_EXCEPTION from libensemble.message_numbers import \ MAN_SIGNAL_FINISH, \ - MAN_SIGNAL_REQ_RESEND, MAN_SIGNAL_REQ_PICKLE_DUMP + MAN_SIGNAL_REQ_RESEND, MAN_SIGNAL_REQ_PICKLE_DUMP, \ + calc_type_strings from libensemble.loc_stack import LocationStack from libensemble.calc_info import CalcInfo @@ -26,8 +27,7 @@ from libensemble.resources import Resources logger = logging.getLogger(__name__ + '(' + Resources.get_my_name() + ')') -#For debug messages in this module - uncomment -# (see libE.py to change root logging level) +#To change logging level for just this module #logger.setLevel(logging.DEBUG) @@ -250,8 +250,7 @@ def run(self, Work, calc_in): calc_stats.calc_type = calc_type try: - if calc_type == EVAL_GEN_TAG: - logger.debug("Running generator") + logger.debug("Running {}".format(calc_type_strings[calc_type])) calc = self._run_calc[calc_type] with calc_stats.timer: with self.loc_stack.loc(calc_type): diff --git a/libensemble/mpi_controller.py b/libensemble/mpi_controller.py index 03f4d6990..34e8bf0e0 100644 --- a/libensemble/mpi_controller.py +++ b/libensemble/mpi_controller.py @@ -12,8 +12,7 @@ from libensemble.controller import JobController, Job, jassert logger = logging.getLogger(__name__ + '(' + MPIResources.get_my_name() + ')') -#For debug messages in this module - uncomment -#(see libE.py to change root logging level) +#To change logging level for just this module #logger.setLevel(logging.DEBUG) @@ -120,7 +119,7 @@ def _get_mpi_specs(self, num_procs, num_nodes, ranks_per_node, def launch(self, calc_type, num_procs=None, num_nodes=None, ranks_per_node=None, machinefile=None, app_args=None, stdout=None, stderr=None, stage_inout=None, - hyperthreads=False, test=False): + hyperthreads=False, test=False, wait_on_run=False): """Creates a new job, and either launches or schedules launch. The created job object is returned. @@ -163,6 +162,9 @@ def launch(self, calc_type, num_procs=None, num_nodes=None, test: boolean, optional Whether this is a test - No job will be launched. Instead runline is printed to logger (At INFO level). + + wait_on_run: boolean, optional + Whether to wait for job to be polled as RUNNING (or other active/end state) before continuing. Returns @@ -197,13 +199,15 @@ def launch(self, calc_type, num_procs=None, num_nodes=None, logger.info('Test selected: Not launching job') logger.info('runline args are {}'.format(runline)) else: - logger.debug("Launching job {}: {}". + logger.info("Launching job {}: {}". format(job.name, " ".join(runline))) #One line job.launch_time = time.time() job.process = launcher.launch(runline, cwd='./', stdout=open(job.stdout, 'w'), stderr=open(job.stderr, 'w'), start_new_session=True) + if (wait_on_run): + self._wait_on_run(job) self.list_of_jobs.append(job) return job diff --git a/libensemble/mpi_resources.py b/libensemble/mpi_resources.py index ccd4bce69..81789ca62 100644 --- a/libensemble/mpi_resources.py +++ b/libensemble/mpi_resources.py @@ -13,8 +13,7 @@ def rassert(test, *args): logger = logging.getLogger(__name__) -#For debug messages in this module - uncomment -#(see libE.py to change root logging level) +#To change logging level for just this module #logger.setLevel(logging.DEBUG) diff --git a/libensemble/resources.py b/libensemble/resources.py index bacf5a5fd..d5d450802 100644 --- a/libensemble/resources.py +++ b/libensemble/resources.py @@ -10,7 +10,7 @@ import subprocess logger = logging.getLogger(__name__) -#For debug messages in this module - uncomment (see libE.py to change root logging level) +#To change logging level for just this module #logger.setLevel(logging.DEBUG) class ResourcesException(Exception): pass diff --git a/libensemble/sim_funcs/one_d_func.py b/libensemble/sim_funcs/one_d_func.py new file mode 100644 index 000000000..996c3df23 --- /dev/null +++ b/libensemble/sim_funcs/one_d_func.py @@ -0,0 +1,24 @@ +""" +This module contains an example 1d function +""" +from __future__ import division +from __future__ import absolute_import + +__all__ = ['one_d_example'] + +import numpy as np + + +def one_d_example(x, persis_info, sim_specs, _): + """ + Evaluates the six hump camel function for a single point ``x``. + + :See: + ``/libensemble/libensemble/tests/regression_tests/test_fast_alloc.py`` + """ + + O = np.zeros(1,dtype=sim_specs['out']) + + O['f'] = np.linalg.norm(x) + + return O, persis_info diff --git a/libensemble/tests/regression_tests/test_1d_uniform_sampling.py b/libensemble/tests/regression_tests/test_1d_uniform_sampling.py new file mode 100644 index 000000000..6bfd5a736 --- /dev/null +++ b/libensemble/tests/regression_tests/test_1d_uniform_sampling.py @@ -0,0 +1,66 @@ +# """ +# Runs libEnsemble on the 6-hump camel problem. Documented here: +# https://www.sfu.ca/~ssurjano/camel6.html +# +# Execute via the following command: +# mpiexec -np 4 python3 test_6-hump_camel_uniform_sampling.py +# The number of concurrent evaluations of the objective function will be 4-1=3. +# """ + +from __future__ import division +from __future__ import absolute_import + +from mpi4py import MPI # for libE communicator +import sys, os # for adding to path +import numpy as np + +# Import libEnsemble main +from libensemble.libE import libE + +# Import sim_func +from libensemble.sim_funcs.one_d_func import one_d_example as sim_f + +# Import gen_func +from libensemble.gen_funcs.uniform_sampling import uniform_random_sample + +script_name = os.path.splitext(os.path.basename(__file__))[0] + +#State the objective function, its arguments, output, and necessary parameters (and their sizes) +sim_specs = {'sim_f': sim_f, # This is the function whose output is being minimized + 'in': ['x'], # These keys will be given to the above function + 'out': [('f',float), # This is the output from the function being minimized + ], + } + +# State the generating function, its arguments, output, and necessary parameters. +gen_specs = {'gen_f': uniform_random_sample, + 'in': ['sim_id'], + 'out': [('x',float,(1,))], + 'lb': np.array([-3]), + 'ub': np.array([ 3]), + 'gen_batch_size': 500, + 'save_every_k': 300 + } + +# Tell libEnsemble when to stop +exit_criteria = {'gen_max': 501} + +np.random.seed(1) +persis_info = {} +for i in range(MPI.COMM_WORLD.Get_size()): + persis_info[i] = {'rand_stream': np.random.RandomState(i)} + +# Perform the run +H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info) + +if MPI.COMM_WORLD.Get_rank() == 0: + short_name = script_name.split("test_", 1).pop() + filename = short_name + '_results_History_length=' + str(len(H)) + '_evals=' + str(sum(H['returned'])) + '_ranks=' + str(MPI.COMM_WORLD.Get_size()) + print("\n\n\nRun completed.\nSaving results to file: " + filename) + np.save(filename, H) + + assert len(H)>= 501 + + print("\nlibEnsemble with Uniform random sampling has generated enough points") + + diff --git a/libensemble/tests/regression_tests/test_6-hump_camel_aposmm_LD_MMA.py b/libensemble/tests/regression_tests/test_6-hump_camel_aposmm_LD_MMA.py index 98b531f3a..880efe00b 100644 --- a/libensemble/tests/regression_tests/test_6-hump_camel_aposmm_LD_MMA.py +++ b/libensemble/tests/regression_tests/test_6-hump_camel_aposmm_LD_MMA.py @@ -96,8 +96,7 @@ persis_info = {'next_to_give':0} persis_info['total_gen_calls'] = 0 persis_info['last_worker'] = 0 - persis_info[0] = {'active_runs': set(), - 'run_order': {}, + persis_info[0] = {'run_order': {}, 'old_runs': {}, 'total_runs': 0, 'rand_stream': np.random.RandomState(1)} diff --git a/libensemble/tests/regression_tests/test_branin_aposmm.py b/libensemble/tests/regression_tests/test_branin_aposmm.py index e9b197b17..1b2c9484b 100644 --- a/libensemble/tests/regression_tests/test_branin_aposmm.py +++ b/libensemble/tests/regression_tests/test_branin_aposmm.py @@ -90,7 +90,6 @@ persis_info[i] = {'rand_stream': np.random.RandomState(i)} persis_info[1]['total_runs'] = 0 -persis_info[1]['active_runs'] = set() persis_info[1]['run_order'] = {} persis_info[1]['old_runs'] = {} persis_info[1]['total_runs'] = 0 diff --git a/libensemble/tests/regression_tests/test_chwirut_aposmm_one_residual_at_a_time.py b/libensemble/tests/regression_tests/test_chwirut_aposmm_one_residual_at_a_time.py index cd5268a4d..2a95edbf4 100644 --- a/libensemble/tests/regression_tests/test_chwirut_aposmm_one_residual_at_a_time.py +++ b/libensemble/tests/regression_tests/test_chwirut_aposmm_one_residual_at_a_time.py @@ -20,10 +20,10 @@ from libensemble.sim_funcs.chwirut1 import chwirut_eval # Import gen_func -from libensemble.gen_funcs.aposmm import aposmm_logic, queue_update_function +from libensemble.gen_funcs.aposmm import aposmm_logic # Import alloc_func -from libensemble.alloc_funcs.fast_alloc_to_aposmm import give_sim_work_first as alloc_f +from libensemble.alloc_funcs.fast_alloc_and_pausing import give_sim_work_first as alloc_f script_name = os.path.splitext(os.path.basename(__file__))[0] @@ -75,8 +75,6 @@ 'combine_component_func': lambda x: np.sum(np.power(x,2)), 'num_active_gens': 1, 'batch_mode': True, - 'stop_on_NaNs': True, - 'stop_partial_fvec_eval': True, } np.random.RandomState(0) @@ -85,11 +83,16 @@ exit_criteria = {'sim_max': max_sim_budget, # must be provided 'elapsed_wallclock_time': 300 } -alloc_specs = {'out':[('allocated',bool)], 'alloc_f':alloc_f} -libE_specs = {'queue_update_function': queue_update_function} +alloc_specs = {'out':[('allocated',bool)], + 'alloc_f':alloc_f, + 'stop_on_NaNs': True, + 'stop_partial_fvec_eval': True, + } + np.random.seed(1) -persis_info = {'next_to_give':0} +persis_info = {} +persis_info['need_to_give'] = set() persis_info['total_gen_calls'] = 0 persis_info['complete'] = set() persis_info['has_nan'] = set() @@ -100,13 +103,12 @@ persis_info[i] = {'rand_stream': np.random.RandomState(i)} persis_info['last_worker'] = 0 -persis_info[0] = {'active_runs': set(), - 'run_order': {}, +persis_info[0] = {'run_order': {}, 'old_runs': {}, 'total_runs': 0, 'rand_stream': np.random.RandomState(1)} # Perform the run -H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) +H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs) if MPI.COMM_WORLD.Get_rank() == 0: assert flag == 0 diff --git a/libensemble/tests/regression_tests/test_chwirut_pounders.py b/libensemble/tests/regression_tests/test_chwirut_pounders.py index 7b4200b7e..75446e6df 100644 --- a/libensemble/tests/regression_tests/test_chwirut_pounders.py +++ b/libensemble/tests/regression_tests/test_chwirut_pounders.py @@ -20,7 +20,7 @@ from libensemble.sim_funcs.chwirut1 import chwirut_eval, EvaluateJacobian # Import gen_func -from libensemble.gen_funcs.aposmm import aposmm_logic, queue_update_function +from libensemble.gen_funcs.aposmm import aposmm_logic script_name = os.path.splitext(os.path.basename(__file__))[0] @@ -74,24 +74,18 @@ 'elapsed_wallclock_time': 300 } -libE_specs = {'queue_update_function': queue_update_function} np.random.seed(1) persis_info = {} -persis_info['complete'] = set() -persis_info['has_nan'] = set() -persis_info['already_paused'] = set() -persis_info['H_len'] = 0 for i in range(MPI.COMM_WORLD.Get_size()): persis_info[i] = {'rand_stream': np.random.RandomState(i)} -persis_info[1] = {'active_runs': set(), - 'run_order': {}, +persis_info[1] = {'run_order': {}, 'old_runs': {}, 'total_runs': 0, 'rand_stream': np.random.RandomState(1)} # Perform the run -H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs) +H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info) if MPI.COMM_WORLD.Get_rank() == 0: assert flag == 0 diff --git a/libensemble/tests/regression_tests/test_chwirut_uniform_sampling_one_residual_at_a_time.py b/libensemble/tests/regression_tests/test_chwirut_uniform_sampling_one_residual_at_a_time.py index 8862a0959..1e082b7bd 100644 --- a/libensemble/tests/regression_tests/test_chwirut_uniform_sampling_one_residual_at_a_time.py +++ b/libensemble/tests/regression_tests/test_chwirut_uniform_sampling_one_residual_at_a_time.py @@ -20,11 +20,11 @@ from libensemble.sim_funcs.chwirut1 import chwirut_eval # Import gen_func -from libensemble.gen_funcs.aposmm import aposmm_logic, queue_update_function +from libensemble.gen_funcs.aposmm import aposmm_logic from libensemble.gen_funcs.uniform_sampling import uniform_random_sample_obj_components # Import alloc_func -from libensemble.alloc_funcs.fast_alloc_to_aposmm import give_sim_work_first as alloc_f +from libensemble.alloc_funcs.fast_alloc_and_pausing import give_sim_work_first as alloc_f script_name = os.path.splitext(os.path.basename(__file__))[0] @@ -58,19 +58,21 @@ 'combine_component_func': lambda x: np.sum(np.power(x,2)), 'num_active_gens': 1, 'batch_mode': True, - 'stop_on_NaNs': True, - 'stop_partial_fvec_eval': True, } exit_criteria = {'sim_max': max_sim_budget, # must be provided 'elapsed_wallclock_time': 300 } -alloc_specs = {'out':[('allocated',bool)], 'alloc_f':alloc_f} +alloc_specs = {'out':[('allocated',bool)], + 'alloc_f':alloc_f, + 'stop_on_NaNs': True, + 'stop_partial_fvec_eval': True, + } -libE_specs = {'queue_update_function': queue_update_function} np.random.seed(1) -persis_info = {'next_to_give':0} +persis_info = {} +persis_info['need_to_give'] = set() persis_info['total_gen_calls'] = 0 persis_info['complete'] = set() persis_info['has_nan'] = set() @@ -81,13 +83,12 @@ persis_info[i] = {'rand_stream': np.random.RandomState(i)} persis_info['last_worker'] = 0 -persis_info[0] = {'active_runs': set(), - 'run_order': {}, +persis_info[0] = {'run_order': {}, 'old_runs': {}, 'total_runs': 0, 'rand_stream': np.random.RandomState(1)} # Perform the run -H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) +H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs) if MPI.COMM_WORLD.Get_rank() == 0: assert flag == 0 diff --git a/libensemble/tests/scaling_tests/forces/build_forces.sh b/libensemble/tests/scaling_tests/forces/build_forces.sh new file mode 100755 index 000000000..1bd2be404 --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/build_forces.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +# GCC +mpicc -o forces.x forces.c -lm + +# Intel +# mpicc -o forces.x forces.c + +# Cray +# cc -O3 -o forces.x forces.c diff --git a/libensemble/tests/scaling_tests/forces/cleanup.sh b/libensemble/tests/scaling_tests/forces/cleanup.sh new file mode 100755 index 000000000..800764402 --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/cleanup.sh @@ -0,0 +1,2 @@ +rm -r sim_* *.npy ensemble.log libe_stat_files/ forces.stat libe_summary.txt + diff --git a/libensemble/tests/scaling_tests/forces/forces.c b/libensemble/tests/scaling_tests/forces/forces.c new file mode 100755 index 000000000..a88277fe1 --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/forces.c @@ -0,0 +1,371 @@ +/* -------------------------------------------------------------------- + Naive Electostatics Code Example + This is designed only as an artificial, highly conifurable test + code for a libEnsemble sim func. + + Particles position and charge are initiated by a random stream. + Particles are replicated on all ranks. + Each rank computes forces for a subset of particles. + Particle force arrays are allreduced across ranks. + + Run executable on N procs: + + mpirun -np N ./forces.x + + Author: S Hudson. +-------------------------------------------------------------------- */ + +#include +#include +#include +#include +#include + +#define min(a,b) \ + ({ __typeof__ (a) _a = (a); \ + __typeof__ (b) _b = (b); \ + _a < _b ? _a : _b; }) + +// Flags 0 or 1 +#define PRINT_PARTICLE_DECOMP 0 +#define PRINT_ALL_PARTICLES 0 + +static FILE* stat_fp; + + +typedef struct particle { + double p[3]; // Particle position + double f[3]; // Particle force + double q; // Particle charge +} particle; + + +// Seed RNG +int seed_rand(int seed) { + srand(seed); + return 0; +} + +// Return a random number from a persistent stream +//TODO Use parallel RNG - As replicated data can currently do on master rank. +double get_rand() { + double randnum; + randnum = (double)rand()/(double)(RAND_MAX + 1.0); //[0,1) + return randnum; +} + + +// Particles start at random locations in 10x10x10 cube +int build_system(int n, particle* parr) { + int q_range_low = -10; + int q_range_high = 10; + double extent = 10.0; + int i, dim; + + for(i=0; i=2) { + num_particles = atoi(argv[1]); // no. of particles + } + + if (argc >=3) { + num_steps = atoi(argv[2]); // no. of timesteps + } + + if (argc >=4) { + rand_seed = atoi(argv[3]); // RNG seed + seed_rand(rand_seed); + } + + particle* parr = malloc(num_particles * sizeof(particle)); + build_system(num_particles, parr); + //printf("\n"); + + ierr = MPI_Init(&argc, &argv); + ierr = MPI_Comm_rank(MPI_COMM_WORLD, &rank); + ierr = MPI_Comm_size(MPI_COMM_WORLD, &num_procs); + + if (rank == 0) { + printf("Particles: %d\n",num_particles); + printf("Timesteps: %d\n",num_steps); + printf("MPI Ranks: %d\n",num_procs); + printf("Random seed: %d\n",rand_seed); + } + + k = num_particles / num_procs; + m = num_particles % num_procs; //Remainder = no. procs with extra particle + p_lower = rank * k + min(rank, m); + p_upper = (rank + 1) * k + min(rank + 1, m); + local_n = p_upper - p_lower; + + if (PRINT_PARTICLE_DECOMP) { + MPI_Barrier(MPI_COMM_WORLD); + printf("Proc: %d has %d particles\n", rank, local_n); + } + MPI_Barrier(MPI_COMM_WORLD); + fflush(stdout); + + if (rank == 0) { + open_stat_file(); + } + + start = clock(); + for (step=0; step + + +#### Running with libEnsemble. + +A random sample of seeds is taken and used as imput to the sim func (forces miniapp). + +Modify build_forces.sh for target platform and run to build forces.x + + ./build_forces.sh + +To run with one manager and N-1 workers: + + mpirun -np N python run_libe_forces.py + +Application parameters can be adjusted in the file run_libe_forces.py. + +To remove output before the next run: + + ./cleanup.sh + + +### Using batch scripts + +theta_submit_balsam.sh: + +Example Theta submission script to run libEnsemble in central mode with MPI worker concurrency is included. + +To use script directly you will need to replace the following templated values: + + in the COBALT -A directive with your project ID. + + is the name of your conda environment (balsam must be installed if using MPI mode on Theta). + + The name of an initialized balsam database (with max_connections enough for number of workers) + +If these are set to true, the scripts must be in the directory above the run dir (or modify paths at bottom of script). +export LIBE_PLOTS=true # Require plot scripts (see at end) +export BALSAM_PLOTS=true # Require plot scripts (see at end) + + +To adjust the node/core/worker count. + +Note: The NUM_WORKERS variable is only currently used if libEnsemble is running on one node, in which case it should be one less than the number of nodes in the job allocation (leaving one dedicated node to run libEnsemble). If more workers are used then the variables NUM_NODES +and RANKS_PER_NODE need to be explicitly set (these are for libensemble which will require one task for the manager and the rest will be workers). The total node allocation (in the COBALT -n directive) will need to be the number of nodes for libEnsemble + number of nodes for each worker to launch jobs to. + + + diff --git a/libensemble/tests/scaling_tests/forces/run_libe_forces.py b/libensemble/tests/scaling_tests/forces/run_libe_forces.py new file mode 100644 index 000000000..3a46f3cd1 --- /dev/null +++ b/libensemble/tests/scaling_tests/forces/run_libe_forces.py @@ -0,0 +1,92 @@ +from __future__ import division +from __future__ import absolute_import + +import mpi4py +mpi4py.rc.recv_mprobe = False # Disable matching probes +from mpi4py import MPI +import sys, os +import numpy as np + +USE_BALSAM = False + +from forces_simf import run_forces # From current dir + +# Import libEnsemble modules +from libensemble.libE import libE +from libensemble.gen_funcs.uniform_sampling import uniform_random_sample + +# Get this script name (for output at end) +script_name = os.path.splitext(os.path.basename(__file__))[0] + +sim_app = os.path.join(os.getcwd(),'forces.x') +#print('sim_app is ',sim_app) + +# Normally would be pre-compiled +if not os.path.isfile('forces.x'): + if os.path.isfile('build_forces.sh'): + import subprocess + subprocess.check_call(['./build_forces.sh']) + +# Normally the sim_dir will exist with common input which is copied for each worker. Here it starts empty. +# Create if no ./sim dir. See sim_specs['sim_dir'] +if not os.path.isdir('./sim'): + os.mkdir('./sim') + +#Create job_controller and register sim to it. +if USE_BALSAM: + from libensemble.balsam_controller import BalsamJobController + jobctrl = BalsamJobController() +else: + from libensemble.mpi_controller import MPIJobController + jobctrl = MPIJobController() +jobctrl.register_calc(full_path=sim_app, calc_type='sim') + + +#todo - clarify difference sim 'in' and 'keys' + +#State the objective function, its arguments, output, and necessary parameters (and their sizes) +sim_specs = {'sim_f': run_forces, # This is the function whose output is being minimized (sim func) + 'in': ['x'], # Name of input data structure for sim func + 'out': [ # Output from sim func + ('energy',float), + ], + 'keys': ['seed'], # Key/keys for input data + 'sim_dir': './sim', # Simulation input dir to be copied for each worker (*currently empty) + 'simdir_basename': 'forces', # User attribute to name sim directories (forces_***) + 'cores': 2, # User attribute to set number of cores for sim func runs (optional) + 'sim_particles': 1e3, # User attribute for number of particles in simulations + 'sim_timesteps': 5, # User attribute for number of timesteps in simulations + 'sim_kill_minutes': 10.0 # User attribute for max time for simulations + } + +# State the generating function, its arguments, output, and necessary parameters. +gen_specs = {'gen_f': uniform_random_sample,# Generator function + 'in': ['sim_id'], # Generator input + 'out': [('x',float,(1,))], # Name, type and size of data produced (must match sim_specs 'in') + 'lb': np.array([0]), # List of lower bounds for random sample array (1D) + 'ub': np.array([32767]), # List of upper bounds for random sample array (1D) + 'gen_batch_size': 1000, # How many random samples to generate in one call + 'batch_mode': True, # If true wait for sims to process before generate more + 'num_active_gens':1, # Only one active generator at a time. + 'save_every_k': 1000 # Save every K steps + } + +# Maximum number of simulations +sim_max = 5 +exit_criteria = {'sim_max': sim_max} + +# Create a different random number stream for each worker (used by uniform_random_sample) +np.random.seed(1) +persis_info = {} +for i in range(MPI.COMM_WORLD.Get_size()): + persis_info[i] = {'rand_stream': np.random.RandomState(i)} + +H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info=persis_info) + +#Save results to numpy file +if MPI.COMM_WORLD.Get_rank() == 0: + short_name = script_name.split("test_", 1).pop() + filename = short_name + '_results_History_length=' + str(len(H)) + '_evals=' + str(sum(H['returned'])) + print("\n\nRun completed.\nSaving results to file: " + filename) + np.save(filename, H) + diff --git a/libensemble/tests/unit_tests/test_allocation_funcs.py b/libensemble/tests/unit_tests/test_allocation_funcs.py index cacb0eb65..17089d93f 100644 --- a/libensemble/tests/unit_tests/test_allocation_funcs.py +++ b/libensemble/tests/unit_tests/test_allocation_funcs.py @@ -25,7 +25,7 @@ def test_decide_work_and_resources(): # Don't give out work when all workers are active W['active'] = 1 - Work, persis_info = al['alloc_f'](W, hist.H, sim_specs, gen_specs, {}) + Work, persis_info = al['alloc_f'](W, hist.H, sim_specs, gen_specs, al, {}) assert len(Work) == 0 # diff --git a/libensemble/tests/unit_tests/test_aposmm_logic.py b/libensemble/tests/unit_tests/test_aposmm_logic.py index af7fb0fd6..180f2ed79 100644 --- a/libensemble/tests/unit_tests/test_aposmm_logic.py +++ b/libensemble/tests/unit_tests/test_aposmm_logic.py @@ -86,36 +86,6 @@ def test_initialize_APOSMM(): al.initialize_APOSMM(hist.H,gen_specs_0) -def test_queue_update_function(): - - gen_specs_0 = {} - gen_specs_0['stop_on_NaNs'] = True - gen_specs_0['combine_component_func'] = np.linalg.norm - H = np.zeros(10, dtype=[('f_i',float),('returned',bool),('pt_id',int),('sim_id',int),('paused',bool)]) - - H['sim_id'] = np.arange(0,10) - H['pt_id'] = np.sort(np.concatenate([np.arange(0,5),np.arange(0,5)])) - - H['returned'][0:10:2] = 1 # All of the first components have been evaluated - H['returned'][1] = 1 - - H['f_i'][4] = np.nan - - persis_info = {} - persis_info['total_gen_calls'] = 0 - persis_info['complete'] = set() - persis_info['has_nan'] = set() - persis_info['already_paused'] = set() - persis_info['H_len'] = 0 - - _ = al.queue_update_function(H, gen_specs_0,persis_info) - assert np.all(H['paused'][4:6]) - - persis_info['H_len'] = 6 - gen_specs_0['stop_partial_fvec_eval'] = True - H['f_i'][6:10:2] = 0.5 - _ = al.queue_update_function(H, gen_specs_0,persis_info) - assert np.all(H['paused'][4:]) if __name__ == "__main__": @@ -129,5 +99,3 @@ def test_queue_update_function(): print('done') test_initialize_APOSMM() print('done') - test_queue_update_function() - print('done') diff --git a/libensemble/tests/unit_tests/test_jobcontroller.py b/libensemble/tests/unit_tests/test_jobcontroller.py index f94c2c266..d6483ef22 100644 --- a/libensemble/tests/unit_tests/test_jobcontroller.py +++ b/libensemble/tests/unit_tests/test_jobcontroller.py @@ -7,6 +7,8 @@ import pytest import socket from libensemble.controller import JobController +from libensemble.controller import NOT_STARTED_STATES + USE_BALSAM = False @@ -164,6 +166,20 @@ def test_launch_and_poll(): assert job.finished, "job.finished should be True. Returned " + str(job.finished) assert job.state == 'FINISHED', "job.state should be FINISHED. Returned " + str(job.state) +def test_launch_wait_on_run(): + """ Test of launching job with wait_on_run """ + print("\nTest: {}\n".format(sys._getframe().f_code.co_name)) + setup_job_controller() + jobctl = JobController.controller + cores = NCORES + args_for_sim = 'sleep 0.2' + job = jobctl.launch(calc_type='sim', num_procs=cores, app_args=args_for_sim, wait_on_run=True) + assert job.state not in NOT_STARTED_STATES, "Job should not be in a NOT_STARTED state. State: " + str(job.state) + jobctl.poll(job) + if not job.finished: + job = polling_loop(jobctl, job) + assert job.finished, "job.finished should be True. Returned " + str(job.finished) + assert job.state == 'FINISHED', "job.state should be FINISHED. Returned " + str(job.state) def test_kill_on_file(): """ Test of killing job based on something in output file""" @@ -488,6 +504,7 @@ def test_job_failure(): if __name__ == "__main__": #setup_module(__file__) test_launch_and_poll() + test_launch_wait_on_run() test_kill_on_file() test_kill_on_timeout() test_launch_and_poll_multijobs() diff --git a/libensemble/tests/unit_tests/test_libE_main.py b/libensemble/tests/unit_tests/test_libE_main.py index e59a02c59..7cd7d0956 100644 --- a/libensemble/tests/unit_tests/test_libE_main.py +++ b/libensemble/tests/unit_tests/test_libE_main.py @@ -9,6 +9,15 @@ import libensemble.tests.unit_tests.setup as setup from mpi4py import MPI +class Fake_MPI: + def Get_size(self): + return 2 + def Get_rank(self): + return 0 + def Barrier(self): + return 0 +fake_mpi = Fake_MPI() + al = {} libE_specs = {'comm':MPI.COMM_WORLD} fname_abort = 'libE_history_at_abort_0.npy' @@ -24,7 +33,8 @@ def test_manager_exception(): with mock.patch('libensemble.libE.comms_abort') as abortMock: abortMock.side_effect = Exception with pytest.raises(Exception, message='Expected exception'): - libE({'out':[('f',float)]},{'out':[('x',float)]},{'sim_max':1},libE_specs={'comm': MPI.COMM_WORLD}) + #libE({'out':[('f',float)]},{'out':[('x',float)]},{'sim_max':1},libE_specs={'comm': MPI.COMM_WORLD}) + libE({'out':[('f',float)]},{'out':[('x',float)]},{'sim_max':1},libE_specs={'comm': fake_mpi}) # Check npy file dumped assert os.path.isfile(fname_abort), "History file not dumped" os.remove(fname_abort) @@ -44,10 +54,22 @@ def test_exception_raising_manager(): # assert H==[] def test_checking_inputs(): - + # Don't take more points than there is space in history. sim_specs, gen_specs, exit_criteria = setup.make_criteria_and_specs_0() - + + # Should fail because only got a manager + H0 = {} + libE_specs = {'comm':MPI.COMM_WORLD} + try: + check_inputs(libE_specs, al, sim_specs, gen_specs, exit_criteria,H0) + except AssertionError: + assert 1 + else: + assert 0 + + libE_specs={'comm': fake_mpi} + H0 = np.zeros(3,dtype=sim_specs['out'] + gen_specs['out'] + [('returned',bool)]) # Should fail because H0 has points with 'return'==False try: diff --git a/libensemble/tests/unit_tests/test_logger.py b/libensemble/tests/unit_tests/test_logger.py new file mode 100644 index 000000000..e932e49e7 --- /dev/null +++ b/libensemble/tests/unit_tests/test_logger.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python + +""" +Unit test of libensemble log functions. +""" + +import logging +from libensemble import libE_logger + +def test_setlogger(): + + libE_logger.set_level('INFO') + level = logging.getLogger('libensemble').getEffectiveLevel() + assert level==20, "Log level should be 20. Found: " + str(level) + + libE_logger.set_level('WARNING') + level = logging.getLogger('libensemble').getEffectiveLevel() + assert level==30, "Log level should be 30. Found: " + str(level) + + libE_logger.set_level('ERROR') + level = logging.getLogger('libensemble').getEffectiveLevel() + assert level==40, "Log level should be 40. Found: " + str(level) + + libE_logger.set_level('DEBUG') + level = logging.getLogger('libensemble').getEffectiveLevel() + assert level==10, "Log level should be 10. Found: " + str(level) + +if __name__ == "__main__": + test_setlogger() diff --git a/libensemble/timer.py b/libensemble/timer.py index 3e9f79d43..77877a246 100644 --- a/libensemble/timer.py +++ b/libensemble/timer.py @@ -38,12 +38,12 @@ def __str__(self): @property def date_start(self): """Return a string representing the start datetime.""" - return time.strftime("%Y-%m-%d %H:%M", time.localtime(self.tstart)) + return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.tstart)) @property def date_end(self): """Return a string representing the end datetime.""" - return time.strftime("%Y-%m-%d %H:%M", time.localtime(self.tend)) + return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.tend)) @property def elapsed(self): diff --git a/postproc_scripts/balsam/plot_jobs_v_time.py b/postproc_scripts/balsam/plot_jobs_v_time.py new file mode 100644 index 000000000..575172407 --- /dev/null +++ b/postproc_scripts/balsam/plot_jobs_v_time.py @@ -0,0 +1,21 @@ +import matplotlib +matplotlib.use('Agg') + +from balsam.core import models +from matplotlib import pyplot as plt + +fig, ax = plt.subplots() + +plt.title('Balsam: Jobs completed v Time') +plt.xlabel('Time of Day (H:M)') +plt.ylabel('Num. Jobs Completed (Accum)') + +times, throughputs = models.throughput_report() +plt.step(times, throughputs, where='post') + +import matplotlib.dates as mdates +myFmt = mdates.DateFormatter('%H:%M') +ax.xaxis.set_major_formatter(myFmt) +fig.autofmt_xdate() + +plt.savefig('balsam_jobs_v_time.png') diff --git a/postproc_scripts/balsam/plot_util_v_time.py b/postproc_scripts/balsam/plot_util_v_time.py new file mode 100644 index 000000000..d2fcc19c1 --- /dev/null +++ b/postproc_scripts/balsam/plot_util_v_time.py @@ -0,0 +1,21 @@ +import matplotlib +matplotlib.use('Agg') + +from balsam.core import models +from matplotlib import pyplot as plt + +fig, ax = plt.subplots() + +plt.title('Balsam Utilization: Running Jobs v Date/Time') +plt.xlabel('Time of Day (H:M)') +plt.ylabel('Num. Jobs Running') + +times, utilization = models.utilization_report() +plt.step(times, utilization, where='post') + +import matplotlib.dates as mdates +myFmt = mdates.DateFormatter('%H:%M') +ax.xaxis.set_major_formatter(myFmt) +fig.autofmt_xdate() + +plt.savefig('balsam_util_v_time.png') diff --git a/postproc_scripts/balsam/plot_waiting_v_time.py b/postproc_scripts/balsam/plot_waiting_v_time.py new file mode 100644 index 000000000..be306b9e2 --- /dev/null +++ b/postproc_scripts/balsam/plot_waiting_v_time.py @@ -0,0 +1,21 @@ +import matplotlib +matplotlib.use('Agg') + +from balsam.core import models +from matplotlib import pyplot as plt + +fig, ax = plt.subplots() + +plt.title('Balsam: Jobs in waiting state v Date/Time') +plt.xlabel('Time of Day (H:M)') +plt.ylabel('Num. Jobs Waiting') + +times, waiting = models.job_waiting_report() +plt.step(times, waiting, where='post') + +import matplotlib.dates as mdates +myFmt = mdates.DateFormatter('%H:%M') +ax.xaxis.set_major_formatter(myFmt) +fig.autofmt_xdate() + +plt.savefig('balsam_waiting_v_time.png') diff --git a/postproc_scripts/balsam/readme.rst b/postproc_scripts/balsam/readme.rst new file mode 100644 index 000000000..0ffc10e1a --- /dev/null +++ b/postproc_scripts/balsam/readme.rst @@ -0,0 +1,11 @@ +======================= +Balsam analysis scripts +======================= + +These scripts require an activated balsam database, and create plots as png files. + +* **plot_util_v_time.py**: Shows number of active Balsam launched jobs over time. + +* **plot_jobs_v_time.py**: Shows completed Balsam launched jobs over time. + +* **plot_waiting_v_time.py**: Shows number of jobs waiting in Balsam database over time. diff --git a/postproc_scripts/plot_libE_histogram.py b/postproc_scripts/plot_libE_histogram.py new file mode 100755 index 000000000..460760517 --- /dev/null +++ b/postproc_scripts/plot_libE_histogram.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python + +#infile='testfile.txt' +infile='libe_summary.txt' +time_key='Time:' +status_key='Status:' +sim_only = True # Ignore generator times + +#States - could add multiple lines - eg Failed. +ran_ok = ['Completed'] # list of ok states +run_killed = ['killed'] # Currently searches for this word in string + +import sys +import numpy as np +import matplotlib.pyplot as plt +#import csv + +#todo: Make general search for any word from keyword list in a list. +def search_for_killed(sublist): + for i, val in enumerate(sublist): + if val.endswith(':'): + break # New key word found + else: + if val in run_killed: + return True + return False + + +def append_to_list(mylst, glob_list, found_time): + # Assumes Time comes first - else have to modify + if found_time: + mylst.append(glob_list[-1]) + else: + print('Error Status found before time - exiting') + sys.exit() + +active_line_count = 0 +in_times=[] +in_times_ran=[] +in_times_kill=[] + +# Read straight from libEnsemble summary file. +with open(infile) as f: + for line in f: + lst = line.split() + found_time = False + found_status = False + for i, val in enumerate(lst): + if val == time_key: + if sim_only and lst[i-1] != 'sim': + break + in_times.append(lst[i+1]) + found_time = True + if val == status_key: + if lst[i+1] in ran_ok: + append_to_list(in_times_ran,in_times,found_time) # Assumes Time comes first + elif search_for_killed(lst[i+1:len(lst)]): + append_to_list(in_times_kill,in_times,found_time) # Assumes Time comes first + else: + print('Error: Unkown status - rest of line: {}'.format(lst[i+1:len(lst)])) + sys.exit() + found_status = True + if found_time and found_status: + active_line_count += 1 + break + +# Read from modified csv file. +#with open('histo.csv', newline='') as csvfile: + #cin = csv.reader(csvfile, delimiter=',') + #for row in cin: + #in_times.append(row[0]) + #if row[1] == 'killed': + #in_times_kill.append(row[0]) + #else: + #in_times_ran.append(row[0]) + +print('Processed {} calcs'.format(active_line_count)) + +times = np.asarray(in_times, dtype=float) +times_ran = np.asarray(in_times_ran, dtype=float) +times_kill = np.asarray(in_times_kill, dtype=float) + +num_bins = 40 +binwidth = (times.max() - times.min()) / num_bins +bins=np.arange(min(times), max(times) + binwidth, binwidth) + +#plt.hist(times, bins, histtype='bar', rwidth=0.8) +p1 = plt.hist(times_ran, bins, label='Completed') +p2 = plt.hist(times_kill, bins, label='Killed') + +#plt.title('Theta Opal/libEnsemble Times: 127 Workers - sim_max 508') +if sim_only: + calc_type = 'sim' +else: + calc_type = 'calc' +title = 'libEnsemble histogram of ' + calc_type + ' times' + ' (' + str(active_line_count) + ' user calcs)' + +plt.title(title) +plt.xlabel('Calc run-time (sec)') +plt.ylabel('Count') +plt.grid(True) +plt.legend(loc='upper left') + +#plt.show() +plt.savefig('hist_completed_v_killed.png') diff --git a/postproc_scripts/plot_libe_calcs_util_v_time.py b/postproc_scripts/plot_libe_calcs_util_v_time.py new file mode 100644 index 000000000..54ae944dd --- /dev/null +++ b/postproc_scripts/plot_libe_calcs_util_v_time.py @@ -0,0 +1,53 @@ +infile='libe_summary.txt' +import pandas as pd + +run_stats = [] +with open(infile) as f: + #content = f.readlines() + for line in f: + lst = line.split() + foundstart = False + foundend = False + for i, val in enumerate(lst): + if val == 'Start:': + startdate = lst[i+1] + starttime = lst[i+2] + foundstart = True + if val == 'End:': + enddate = lst[i+1] + endtime = lst[i+2] + foundend = True + if foundstart and foundend: + run_datetime = {'start': startdate + ' ' + starttime, 'end': enddate + ' ' + endtime} + run_stats.append(run_datetime) + break + +df = pd.DataFrame(run_stats) +df['start'] = pd.to_datetime(df['start']) +df['end'] = pd.to_datetime(df['end']) +df = df.sort_values(by='start') + +time_start = df['start'][0] +dend = df.sort_values(by='end') +time_end = dend['end'].iloc[-1] +date_series = pd.date_range(time_start, time_end, freq='1S') + +counts=[] +for i in range(len(date_series)): + # Inclusive/exclusive to avoid multiple accounting - need high resolution + count = sum((df['start'] <= date_series[i]) & (date_series[i] < df['end'])) + counts.append(count) + +df_list = pd.DataFrame(date_series, columns=['datetime']) + +#df_count = pd.DataFrame([counts], columns=['count']) # List goes to single row by default +df_count = pd.DataFrame({'count': counts}) # Transpose to columns like this + + +final = df_list.join(df_count) +#print(final) + +import matplotlib.pyplot as plt +final.plot(x='datetime',y='count') +#plt._show() +plt.savefig('calcs_util_v_time.png') diff --git a/postproc_scripts/readme.rst b/postproc_scripts/readme.rst new file mode 100644 index 000000000..fe5dadb24 --- /dev/null +++ b/postproc_scripts/readme.rst @@ -0,0 +1,11 @@ +================ +Analysis scripts +================ + +Note that all scripts produce a file rather than opening a plot interactively. + +The following scripts must be run in the directory with **libe_summary.txt** file and extract plot informatin from that file. The information currently only pertains to the time spent in user sim or gen functions (not just the submission of jobs within those). + +* **plot_libe_calcs_util_v_time.py**: Extract worker utilization v time plot (with one second sampling). Shows number of workers running user sim or gen functions at any time. + +* **plot_libE_histogram.py**: Create histogram showing the number of completed/killed user calculations binned by run-time. diff --git a/setup.py b/setup.py index 59b907857..e53956f53 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ def run_tests(self): setup( name='libensemble', - version='0.4.0', + version='0.4.1', description='Library for managing ensemble-like collections of computations', url='https://github.com/Libensemble/libensemble', author='Jeffrey Larson, Stephen Hudson and David Bindel',