Skip to content

Commit

Permalink
Merge 8bb8ec7 into 591063f
Browse files Browse the repository at this point in the history
  • Loading branch information
orbitfold committed Jul 31, 2020
2 parents 591063f + 8bb8ec7 commit aabeea4
Show file tree
Hide file tree
Showing 15 changed files with 1,818 additions and 51 deletions.
1,408 changes: 1,408 additions & 0 deletions docs/mega_tutorial.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/tutorial_files/easyvvuq_fusion_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3537,7 +3537,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
"version": "3.6.9"
}
},
"nbformat": 4,
Expand Down
18 changes: 18 additions & 0 deletions docs/tutorial_files/kubernetes/epidemic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: v1
kind: Pod
metadata:
name: epidemic
spec:
restartPolicy: Never
containers:
- name: epidemic
image: orbitfold/easyvvuq:latest
command: ["/bin/sh", "-c"]
args: ["python3 /EasyVVUQ/docs/epidemic/epidemic.py /config/epidemic_in.json out.csv && cat out.csv"]
volumeMounts:
- name: config-volume
mountPath: /config
volumes:
- name: config-volume
configMap:
name: example.json
2 changes: 2 additions & 0 deletions easyvvuq/actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .base import BaseAction
from .execute_local import ExecuteLocal
from .execute_kubernetes import ExecuteKubernetes
from .action_statuses import ActionStatuses

__copyright__ = """
Expand Down
67 changes: 67 additions & 0 deletions easyvvuq/actions/action_statuses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
__copyright__ = """
Copyright 2020 Vytautas Jancauskas
This file is part of EasyVVUQ
EasyVVUQ is free software: you can redistribute it and/or modify
it under the terms of the Lesser GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EasyVVUQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
__license__ = "LGPL"


class ActionStatuses:
"""A class that tracks statuses of a list of actions.
"""

def __init__(self):
self.active = []
self.finished = []
self.failed = []

def add(self, action_status):
"""Add a new action status to the list.
Parameters
----------
action_status : ActionStatus
an object representing an actions status
"""
self.active.append(action_status)

def poll(self):
"""Iterate over active actions, finalize finished ones,
sort finished actions into finished and failed. An action
is considered finished if it has finished executed and reports
success. It is considered failed if it has reported failure and
is considered active (running) otherwise.
"""
active_ = []
for status in self.active:
if status.finished():
if status.succeeded():
status.finalise()
self.finished.append(status)
else:
self.failed.append(status)
else:
active_.append(status)
self.active = active_

def stats(self):
"""Return the number of active, finished and failed jobs.
"""
return {'active': len(self.active),
'finished': len(self.finished),
'failed': len(self.failed)}
180 changes: 180 additions & 0 deletions easyvvuq/actions/execute_kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Provides element to execute a simulation on a Kubernetes cluster
and retrieve the output.
Examples
--------
"""

import os
import logging
import yaml
import uuid
from kubernetes.client.api import core_v1_api
from kubernetes import config
from kubernetes.client import Configuration, V1ConfigMap, V1ObjectMeta
from . import BaseAction

__copyright__ = """
Copyright 2020 Vytautas Jancauskas
This file is part of EasyVVUQ
EasyVVUQ is free software: you can redistribute it and/or modify
it under the terms of the Lesser GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EasyVVUQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
__license__ = "LGPL"

logger = logging.getLogger(__name__)


class ActionStatusKubernetes():
"""Provides a way to track the status of an on-going Kubernetes
action.
Parameters
----------
api : CoreV1Api
will be used to communicate with the cluster
pod_name : str
pod identifier
config_names : list of str
list of ConfigMap identifiers
namespace : str
Kubernetes namespace
outfile : str
a filename to write the output of the simulation
"""

def __init__(self, api, pod_name, config_names, namespace, outfile):
self.core_v1 = api
self.pod_name = pod_name
self.config_names = config_names
self.namespace = namespace
self.outfile = outfile
self._succeeded = False

def finished(self):
"""Will return True if the pod has finished, otherwise will return False.
"""
resp = self.core_v1.read_namespaced_pod(
name=self.pod_name, namespace=self.namespace)
if resp.status.phase not in ['Pending', 'Running']:
if resp.status.phase == 'Succeeded':
self._succeeded = True
return True
else:
return False

def finalise(self):
"""Will read the logs from the Kubernetes pod, output them to a file and
delete the Kubernetes resources we have allocated.
"""
if not (self.finished() and self.succeeded()):
raise RuntimeError("Cannot finalise an Action that hasn't finished.")
log_ = self.core_v1.read_namespaced_pod_log(
self.pod_name, namespace=self.namespace)
with open(self.outfile, 'w') as fd:
fd.write(log_)
for _, id_ in self.config_names:
self.core_v1.delete_namespaced_config_map(
id_, namespace=self.namespace)
self.core_v1.delete_namespaced_pod(
self.pod_name, namespace=self.namespace)

def succeeded(self):
"""Will return True if the pod has finished successfully, otherwise will return False.
If the job hasn't finished yet will return False.
"""
return self._succeeded


class ExecuteKubernetes(BaseAction):
""" Provides an action element to run a shell command in a specified
directory.
Parameters
----------
pod_config : str
Filename of the YAML file with the Kubernetes Pod configuration.
input_file_names : list of str
A list of input file names for your simulation.
output_file_name : str
An output file name for the output of the simulation.
"""

def __init__(self, pod_config, input_file_names, output_file_name):
if os.name == 'nt':
msg = ('Local execution is provided for testing on Posix systems'
'only. We detect you are using Windows.')
logger.error(msg)
raise NotImplementedError(msg)
with open(pod_config, 'r') as fd:
self.dep = yaml.load(fd, Loader=yaml.BaseLoader)
self.input_file_names = input_file_names
self.output_file_name = output_file_name
config.load_kube_config()
c = Configuration()
c.assert_hostname = False
Configuration.set_default(c)
self.core_v1 = core_v1_api.CoreV1Api()

def create_volumes(self, file_names, dep):
"""Create descriptions of Volumes that will hold the input files.
"""
volumes = [{'name': id_ + '-volume', 'configMap': {'name': id_}}
for _, id_ in file_names]
volume_mounts = [{'name': id_ + '-volume',
'mountPath': os.path.join('/config/', os.path.basename(file_name)),
'subPath': os.path.basename(file_name),
'readOnly': True}
for file_name, id_ in file_names]
dep['spec']['volumes'] = volumes
dep['spec']['containers'][0]['volumeMounts'] = volume_mounts

def create_config_maps(self, file_names):
"""Create Kubernetes ConfigMaps for the input files to the simulation.
"""
for file_name, id_ in file_names:
with open(file_name, 'r') as fd:
data = fd.read()
metadata = V1ObjectMeta(
name=id_,
namespace='default'
)
configmap = V1ConfigMap(
api_version='v1',
kind='ConfigMap',
data={os.path.basename(file_name): data},
metadata=metadata
)
self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)

def act_on_dir(self, target_dir):
"""Executes a dockerized simulation on input files found in `target_dir`.
target_dir : str
Directory in which to execute simulation.
"""
file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4()))
for input_file_name in self.input_file_names]
dep = dict(self.dep)
self.create_config_maps(file_names)
self.create_volumes(file_names, dep)
dep['metadata']['name'] = str(uuid.uuid4())
self.core_v1.create_namespaced_pod(body=dep, namespace="default")
return ActionStatusKubernetes(
self.core_v1, dep['metadata']['name'], file_names, 'default',
os.path.join(target_dir, self.output_file_name))
10 changes: 8 additions & 2 deletions easyvvuq/campaign.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from easyvvuq.constants import default_campaign_prefix, Status
from easyvvuq.data_structs import RunInfo, CampaignInfo, AppInfo
from easyvvuq.sampling import BaseSamplingElement
from easyvvuq.actions import ActionStatuses

__copyright__ = """
Expand Down Expand Up @@ -630,16 +631,21 @@ def apply_for_each_run_dir(self, action, status=Status.ENCODED):
action : the action to be applied to each run directory
The function to be applied to each run directory. func() will
be called with the run directory path as its only argument.
status : Status
Will apply the action only to those runs whose status is as specified
Returns
-------
action_statuses: ActionStatuses
An object containing ActionStatus instances to track action execution
"""

# Loop through all runs in this campaign with status ENCODED, and
# run the specified action on each run's dir
action_statuses = ActionStatuses()
for run_id, run_data in self.campaign_db.runs(status=status, app_id=self._active_app['id']):
logger.info("Applying " + action.__module__ + " to " + run_data['run_dir'])
action.act_on_dir(run_data['run_dir'])
action_statuses.add(action.act_on_dir(run_data['run_dir']))
return action_statuses

def collate(self):
"""Combine the output from all runs associated with the current app.
Expand Down
9 changes: 2 additions & 7 deletions easyvvuq/sampling/replica_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def __init__(self, sampler, ensemble_col='ensemble'):
self.history.append(sample)
self.size = len(self.history)
self.cycle = cycle(self.history)
self.ensemble = 0
self.counter = 0

def is_finite(self):
Expand All @@ -51,12 +50,8 @@ def n_samples(self):

def __next__(self):
params = dict(next(self.cycle))
if self.counter < self.size - 1:
self.counter += 1
else:
self.counter = 0
self.ensemble += 1
params[self.ensemble_col] = self.ensemble
params[self.ensemble_col] = self.counter
self.counter = (self.counter + 1) % self.size
return params

def is_restartable(self):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ jsonpickle
cerberus
dask[complete]
dask_jobqueue
dask-kubernetes
cloudpickle
scikit-learn
jinja2
fsspec
msmb_theme
kubernetes
6 changes: 6 additions & 0 deletions tests/kubernetes/epidemic.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"grid_size" : 10,
"n" : 20,
"duration" : 28,
"mortality" : 0.1
}
11 changes: 11 additions & 0 deletions tests/kubernetes/epidemic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: Pod
metadata:
name: epidemic
spec:
restartPolicy: Never
containers:
- name: epidemic
image: orbitfold/easyvvuq:latest
command: ["/bin/sh", "-c"]
args: ["python3 /EasyVVUQ/docs/epidemic/epidemic.py /config/epidemic_in.json out.csv && cat out.csv"]
25 changes: 25 additions & 0 deletions tests/test_actions_action_statuses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest
from unittest.mock import MagicMock
from easyvvuq.actions import ActionStatuses


def test_action_status_kubernetes():
statuses = ActionStatuses()
status1, status2, status3 = (MagicMock(), MagicMock(), MagicMock())
status1.finished.return_value = False
status2.finished.return_value = True
status3.finished.return_value = True
status1.succeeded.return_value = False
status2.succeeded.return_value = False
status3.succeeded.return_value = True
statuses.add(status1)
statuses.add(status2)
statuses.add(status3)
statuses.poll()
stats = statuses.stats()
assert(stats['active'] == 1)
assert(stats['finished'] == 1)
assert(stats['failed'] == 1)
assert(not status1.finalise.called)
assert(not status2.finalise.called)
assert(status3.finalise.called)

0 comments on commit aabeea4

Please sign in to comment.