Permalink
Browse files

added initial vasp run

  • Loading branch information...
ianedwardthomas committed Feb 27, 2014
1 parent c55f6ab commit c846159d66a35b30622c9715b7cefb189f95c613
Showing with 1,219 additions and 720 deletions.
  1. +241 −313 chiminey/corestages/execute.py
  2. +26 −25 chiminey/corestages/strategies/asynchronouswaitstrategy.py
  3. +22 −3 chiminey/corestages/strategies/cloudstrategy.py
  4. +202 −0 chiminey/corestages/strategies/clusterbootstrapstrategy.py
  5. +383 −0 chiminey/corestages/strategies/clusterschedulestrategy.py
  6. +52 −8 chiminey/corestages/strategies/clusterstrategy.py
  7. +1 −1 chiminey/corestages/strategies/strategy.py
  8. +1 −1 chiminey/corestages/strategies/synchronouswaitstrategy.py
  9. +29 −30 chiminey/corestages/sweep.py
  10. +1 −1 chiminey/corestages/wait.py
  11. +4 −1 chiminey/examples/hrmc2/hrmcconverge.py
  12. +11 −51 chiminey/examples/hrmc2/hrmcexecute.py
  13. +58 −46 chiminey/examples/hrmc2/hrmcparent.py
  14. +15 −17 chiminey/examples/hrmc2/hrmctransform.py
  15. +5 −4 chiminey/examples/vasp/vaspsweep.py
  16. +5 −209 chiminey/examples/vasp/vasptransform.py
  17. +9 −0 chiminey/smartconnectorscheduler/admin.py
  18. +1 −1 chiminey/smartconnectorscheduler/management/commands/coreinitial.py
  19. +9 −4 chiminey/smartconnectorscheduler/management/commands/createuser.py
  20. +2 −2 chiminey/smartconnectorscheduler/management/commands/vaspinitial.py
  21. +7 −3 chiminey/storage/storage.py
  22. +1 −0 input_hrmc/values
  23. +15 −0 payload_vasp/Makefile
  24. +7 −0 payload_vasp/bootstrap_done.sh
  25. +13 −0 payload_vasp/process_payload/Makefile
  26. +40 −0 payload_vasp/process_payload/process_running_done.sh
  27. +3 −0 payload_vasp/process_payload/process_schedule_done.sh
  28. +1 −0 payload_vasp/process_payload/start_process_schedule.sh
  29. +9 −0 payload_vasp/process_payload/start_running_process.sh
  30. +27 −0 payload_vasp/schedule_done.sh
  31. +6 −0 payload_vasp/start_bootstrap.sh
  32. +13 −0 payload_vasp/start_schedule.sh

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -30,16 +30,17 @@
logger = logging.getLogger(__name__)
RMIT_SCHEMA = "http://rmit.edu.au/schemas"
class AsynchronousWaitStrategy(Strategy):
def is_job_finished(self, ip_address, process_id, retry_left, settings, relative_path_suffix):
def is_job_finished(self, wait_class, ip_address, process_id, retry_left, settings, relative_path_suffix):
"""
Return True if package job on instance_id has is_job_finished
"""
# TODO: maybe this should be a reusable library method?
ip = ip_address
logger.debug("ip=%s" % ip)
curr_username = settings['username']
settings['username'] = 'root'
# settings['username'] = 'root'
#relative_path = settings['type'] + '@' + settings['payload_destination'] + "/" + process_id
relative_path = settings['type'] + '@' + os.path.join(relative_path_suffix, process_id)
destination = get_url_with_credentials(settings,
@@ -66,42 +67,42 @@ def is_job_finished(self, ip_address, process_id, retry_left, settings, relative
logger.debug('error is = %s' % e)
process_failed = False
node_failed = False
logger.debug('Is there error? %s' % self.failure_detector.failed_ssh_connection(e))
if self.failure_detector.failed_ssh_connection(e):
node = [x for x in self.created_nodes if x[1] == ip_address]
self.failed_processes = self.ftmanager.manage_failed_process(
logger.debug('Is there error? %s' % wait_class.failure_detector.failed_ssh_connection(e))
if wait_class.failure_detector.failed_ssh_connection(e):
node = [x for x in wait_class.created_nodes if x[1] == ip_address]
wait_class.failed_processes = wait_class.ftmanager.manage_failed_process(
settings, process_id, node[0], node[0][0], ip_address,
self.failed_nodes, self.executed_procs, self.current_processes,
self.all_processes, self.procs_2b_rescheduled)
#self.procs_2b_rescheduled.extend(rescheduled_prcs)
wait_class.failed_nodes, wait_class.executed_procs, wait_class.current_processes,
wait_class.all_processes, wait_class.procs_2b_rescheduled)
#wait_class.procs_2b_rescheduled.extend(rescheduled_prcs)
'''
if self.failure_detector.node_terminated(settings, node[0][0]):
if not self.failure_detector.recorded_failed_node(
self.failed_nodes, ip_address):
self.failed_nodes.append(node[0])
if wait_class.failure_detector.node_terminated(settings, node[0][0]):
if not wait_class.failure_detector.recorded_failed_node(
wait_class.failed_nodes, ip_address):
wait_class.failed_nodes.append(node[0])
node_failed = True
else:
if not retry_left:
process_failed = True
else:
process_lists = [self.executed_procs, self.current_processes,
self.all_processes]
self.ftmanager.decrease_max_retry(
process_lists = [wait_class.executed_procs, wait_class.current_processes,
wait_class.all_processes]
wait_class.ftmanager.decrease_max_retry(
process_lists, ip_address, process_id)
# Failure management
if node_failed or process_failed:
process_lists = [self.executed_procs,
self.current_processes, self.all_processes]
process_lists = [wait_class.executed_procs,
wait_class.current_processes, wait_class.all_processes]
if node_failed:
self.ftmanager.flag_all_processes(process_lists, ip_address)
wait_class.ftmanager.flag_all_processes(process_lists, ip_address)
elif process_failed:
self.ftmanager.flag_this_process(
wait_class.ftmanager.flag_this_process(
process_lists, ip_address, process_id)
self.failed_processes = self.ftmanager.\
get_total_failed_processes(self.executed_procs)
if self.reschedule_failed_procs:
self.ftmanager.collect_failed_processes(
self.executed_procs, self.procs_2b_rescheduled)
wait_class.failed_processes = wait_class.ftmanager.\
get_total_failed_processes(wait_class.executed_procs)
if wait_class.reschedule_failed_procs:
wait_class.ftmanager.collect_failed_processes(
wait_class.executed_procs, wait_class.procs_2b_rescheduled)
'''
else:
@@ -74,13 +74,32 @@ def create_resource(self, local_settings):
def set_bootstrap_settings(self, run_settings, local_settings):
super(CloudStrategy, self).set_bootstrap_settings(run_settings, local_settings)
bootstrap.set_bootstrap_settings(run_settings, local_settings)
try:
payload_source = getval(
run_settings, '%s/stages/setup/payload_source' % RMIT_SCHEMA)
except SettingNotFoundException:
pass
if payload_source:
bootstrap.set_bootstrap_settings(run_settings, local_settings)
def start_multi_bootstrap_task(self, settings, relative_path_suffix):
bootstrap.start_multi_bootstrap_task(settings, relative_path_suffix)
try:
payload_source = settings['payload_source']
except IndexError:
pass
if payload_source:
bootstrap.start_multi_bootstrap_task(settings, relative_path_suffix)
def complete_bootstrap(self, bootstrap_class, local_settings):
bootstrap.complete_bootstrap(bootstrap_class, local_settings)
try:
payload_source = local_settings['payload_source']
except IndexError:
pass
if payload_source:
bootstrap.complete_bootstrap(bootstrap_class, local_settings)
def set_schedule_settings(self, run_settings, local_settings):
super(CloudStrategy, self).set_schedule_settings(run_settings, local_settings)
@@ -0,0 +1,202 @@
# Copyright (C) 2014, RMIT University
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import ast
import logging
from chiminey.cloudconnection import get_registered_vms
from chiminey.reliabilityframework import FTManager
from chiminey import messages
from chiminey.runsettings import getval, update
from chiminey.smartconnectorscheduler.errors \
import InsufficientResourceError, NoRegisteredVMError, VMTerminatedError
from chiminey.storage import get_url_with_credentials, copy_directories, get_make_path
from chiminey.sshconnection import open_connection
from chiminey.compute import run_command_with_status, run_make
logger = logging.getLogger(__name__)
RMIT_SCHEMA = "http://rmit.edu.au/schemas"
def set_bootstrap_settings(run_settings, local_settings):
#logger.debug('in=%s' % run_settings)
update(local_settings, run_settings,
'%s/stages/setup/payload_source' % RMIT_SCHEMA,
'%s/stages/setup/payload_destination' % RMIT_SCHEMA,
'%s/stages/create/created_nodes' % RMIT_SCHEMA,
'%s/system/contextid' % RMIT_SCHEMA
)
local_settings['bdp_username'] = getval(run_settings, '%s/bdp_userprofile/username' % RMIT_SCHEMA)
#logger.debug('out=%s' % local_settings)
def start_multi_bootstrap_task(settings, relative_path_suffix):
"""
Run the package on each of the nodes in the group and grab
any output as needed
"""
#nodes = get_registered_vms(settings)
nodes = ast.literal_eval(settings['created_nodes'])
logger.debug("nodes=%s" % nodes)
requested_nodes = 0
maketarget_nodegroup_pair = {}
# TODO: need testcases for following code
if not maketarget_nodegroup_pair:
EMPTY_MAKE_TARGET = ''
requested_nodes = len(nodes)
maketarget_nodegroup_pair[EMPTY_MAKE_TARGET] = requested_nodes
else:
for i in maketarget_nodegroup_pair.keys():
requested_nodes += maketarget_nodegroup_pair[i]
if requested_nodes > len(nodes):
message = "Requested nodes %d; but available nodes %s " \
% (requested_nodes, len(nodes))
logger.exception(message)
raise InsufficientResourceError(message)
logger.info("Requested nodes %d: \nAvailable nodes %s "
% (requested_nodes, len(nodes)))
logger.debug('starting setup')
for make_target in maketarget_nodegroup_pair:
for i in range(0, maketarget_nodegroup_pair[make_target]):
instance = nodes[0]
node_ip = instance[1]
logger.debug("node_ip=%s" % node_ip)
logger.debug('constructing source')
source = get_url_with_credentials(settings, settings['payload_source'])
logger.debug('source=%s' % source)
#relative_path = '%s@%s' % (settings['type'], settings['payload_destination'])
relative_path = '%s@%s' % (settings['type'], relative_path_suffix)
destination = get_url_with_credentials(settings, relative_path,
is_relative_path=True,
ip_address=node_ip)
logger.debug("Source %s" % source)
logger.debug("Destination %s" % destination)
logger.debug("Relative path %s" % relative_path)
_start_bootstrap(instance, node_ip, settings, source, destination)
nodes.pop(0)
def _start_bootstrap(instance, ip, settings, source, destination):
"""
Start the task on the instance, then return
"""
logger.info("run_task %s" % str(instance))
copy_directories(source, destination)
makefile_path = get_make_path(destination)
# TODO, FIXME: need to have timeout for yum install make
# and then test can access, otherwise, loop.
install_make = 'yum install -y make'
command_out = ''
errs = ''
logger.debug("starting command for %s" % ip)
ssh = ''
try:
ssh = open_connection(ip_address=ip, settings=settings)
command_out, errs = run_command_with_status(ssh, install_make)
logger.debug("command_out1=(%s, %s)" % (command_out, errs))
run_make(ssh, makefile_path, 'start_bootstrap')
except Exception, e:#fixme: consider using reliability framework
logger.error(e)
raise
finally:
if ssh:
ssh.close()
def complete_bootstrap(bootstrap_class, local_settings):
try:
nodes = ast.literal_eval(local_settings['created_nodes'])
logger.debug("nodes=%s" % nodes)
running_created_nodes = [x for x in bootstrap_class.created_nodes if str(x[3]) == 'running']
if len(nodes) < len(running_created_nodes):
raise VMTerminatedError
except NoRegisteredVMError as e:
logger.debug('NoRegisteredVMError detected')
ftmanager = FTManager()
ftmanager.manage_failure(e, stage_class=bootstrap_class, settings=local_settings)
except VMTerminatedError as e:
logger.debug('VMTerminatedError detected')
ftmanager = FTManager()
ftmanager.manage_failure(e, stage_class=bootstrap_class, settings=local_settings)
for node in nodes:
node_ip = node[1]
if (node_ip in [x[1] for x in bootstrap_class.bootstrapped_nodes if x[1] == node_ip]):
continue
relative_path_suffix = bootstrap_class.get_relative_output_path(local_settings)
relative_path = "%s@%s" % (local_settings['type'],
relative_path_suffix)
destination = get_url_with_credentials(local_settings,
relative_path,
is_relative_path=True,
ip_address=node_ip)
logger.debug("Relative path %s" % relative_path)
logger.debug("Destination %s" % destination)
try:
fin = _is_bootstrap_complete(node_ip, local_settings, destination)
except IOError, e:
logger.error(e)
fin = False
except Exception as e:
logger.error(e)
fin = False
ftmanager = FTManager()
ftmanager.manage_failure(e, stage_class=bootstrap_class, vm_ip=node_ip,
vm_id=node[0], settings=local_settings)
logger.debug("fin=%s" % fin)
if fin:
print "done."
logger.debug("node=%s" % str(node))
logger.debug("bootstrapped_nodes=%s" % bootstrap_class.bootstrapped_nodes)
if not (node_ip in [x[1]
for x in bootstrap_class.bootstrapped_nodes
if x[1] == node_ip]):
logger.debug('new ip = %s' % node_ip)
bootstrap_class.bootstrapped_nodes.append(
[node[0], node_ip, node[2], 'running'])
else:
logger.info("We have already "
+ "bootstrapped node %s" % node_ip)
messages.info_context(local_settings['contextid'],
"bootstrapping nodes (%s nodes done)"
% len(bootstrap_class.bootstrapped_nodes))
else:
print "job still running on %s" % node_ip
def _is_bootstrap_complete(ip, settings, destination):
"""
Return True if package job on instance_id has is_job_finished
"""
ssh = open_connection(ip_address=ip, settings=settings)
makefile_path = get_make_path(destination)
(command_out, err) = run_make(ssh, makefile_path, 'bootstrap_done')
if command_out:
logger.debug("command_out = %s" % command_out)
for line in command_out:
if 'Environment Setup Completed' in line:
return True
else:
logger.warn(err)
return False
Oops, something went wrong.

0 comments on commit c846159

Please sign in to comment.