Permalink
Browse files

nci connector

  • Loading branch information...
ianedwardthomas committed Dec 4, 2013
1 parent b367ffb commit a8404e9c488d4203f5ba71e467589437d7e8f678
@@ -248,6 +248,7 @@ def post_list(self, request, **kwargs):
'hrmc': self._post_to_hrmc,
'sweep': self._post_to_sweep_hrmc,
'sweep_make': self._post_to_sweep_make,
'sweep_vasp': self._post_to_sweep_vasp,
'copydir': self._post_to_copy,
'remotemake': self._post_to_remotemake}
@@ -373,6 +374,7 @@ def validate_input(self, data, directive_name):
# continue;
validator = subtype_validation[param.subtype][1]
logger.debug("validator=%s" % validator)
current_subtype = param.subtype
logger.debug(current_subtype)
if current_subtype == 'storage_bdpurl' or current_subtype == 'nectar_platform' or current_subtype == 'platform':
@@ -391,6 +393,13 @@ def _post_to_sweep_make(self, bundle, directive):
directive=directive,
subdirective="remotemake")
def _post_to_sweep_vasp(self, bundle, directive):
return self._post_to_sweep(bundle=bundle,
directive=directive,
subdirective="vasp")
def _post_to_sweep(self, bundle, directive, subdirective):
logger.debug("_post_to_sweep for %s" % subdirective)
platform = 'local'
@@ -802,6 +802,8 @@ def make_dynamic_field(parameter, **kwargs):
computation_ns += 'nectar'
elif comp_platform == 'sweep_make':
computation_ns += 'nci'
elif comp_platform == 'sweep_vasp':
computation_ns += 'nci'
else:
logger.warn("unknown computation platform")
@@ -291,6 +291,18 @@ def setup(self):
u'output_location': {'type':models.ParameterName.STRING, 'subtype':'storage_bdpurl', 'initial':'file://local@127.0.0.1/sweep', 'description':'Output Location', 'ranking': 2, 'help_text': 'Storage platform name with optional offset path: e.g., storage_home/myexperiment'}
}
],
u'http://rmit.edu.au/schemas/input/vasp':
[u'VASP Smart Connector',
{
u'ncpus': {'type':models.ParameterName.NUMERIC, 'subtype':'whole', 'initial':16, 'description':'Number of CPUs', 'ranking':1, 'help_text':''},
u'project': {'type':models.ParameterName.STRING, 'subtype': 'string', 'initial':'h72', 'description':'Project Identifier', 'ranking':2, 'help_text':''},
u'job_name': {'type':models.ParameterName.STRING, 'subtype': 'string', 'initial':'Si-FCC', 'description':'Job Name', 'ranking':3, 'help_text':''},
u'queue': {'type':models.ParameterName.STRING, 'subtype': 'string', 'initial':'express', 'description':'Task Queue to use', 'ranking':4, 'help_text':''},
u'walltime': {'type':models.ParameterName.STRING, 'subtype': 'string', 'initial':'00:10:00', 'description':'Wall Time', 'ranking':5, 'help_text':''},
u'mem': {'type':models.ParameterName.STRING, 'subtype': 'string', 'initial':'16GB', 'description':'Memory', 'ranking':6, 'help_text':''},
u'max_iteration': {'type':models.ParameterName.NUMERIC, 'subtype':'whole', 'description':'Maximum no. iterations', 'ranking':7, 'initial': 10, 'help_text':'Computation ends when either convergence or maximum iteration reached'},
}
],
u'http://rmit.edu.au/schemas/input/system/compplatform':
[u'Computation Platform',
{
@@ -748,7 +760,7 @@ def setup(self):
print "done"
sweep, _ = models.Directive.objects.get_or_create(name="sweep",
description="HRMC Sweep Connector")
defaults={'description': "HRMC Sweep Connector"})
sweep_stage, _ = models.Stage.objects.get_or_create(name="sweep",
description="Sweep Test",
@@ -764,7 +776,6 @@ def setup(self):
{
u'random_numbers': 'file://127.0.0.1/randomnums.txt'
},
})
# FIXME: tasks.progress_context does not load up composite stage settings
@@ -775,14 +786,39 @@ def setup(self):
self.setup_sweep_remotemake(nci_platform)
self.define_vasp(nci_platform)
self.setup_sweep_vasp(nci_platform)
self.setup_directive_args()
print "done"
def setup_sweep_remotemake(self, local_platform):
def setup_sweep_vasp(self, local_platform):
sweep, _ = models.Directive.objects.get_or_create(name="sweep_vasp",
defaults={'description': "VASP Sweep Connector"})
sweep_stage, _ = models.Stage.objects.get_or_create(name="sweep_make",
description="Sweep Test",
package="bdphpcprovider.smartconnectorscheduler.stages.sweep.Sweep",
order=100)
sweep_stage.update_settings({
# FIXME: move random_numbers into system schema
u'http://rmit.edu.au/schemas/system':
{
u'random_numbers': 'file://127.0.0.1/randomnums.txt'
},
})
# FIXME: tasks.progress_context does not load up composite stage settings
comm, _ = models.Command.objects.get_or_create(platform=local_platform,
directive=sweep, stage=sweep_stage)
def setup_sweep_remotemake(self, local_platform):
sweep, _ = models.Directive.objects.get_or_create(name="sweep_make",
description="Remote Make Sweep Connector")
defaults={'description': "Remote Make Sweep Connector"})
sweep_stage, _ = models.Stage.objects.get_or_create(name="sweep_make",
description="Sweep Test",
@@ -801,9 +837,8 @@ def setup_sweep_remotemake(self, local_platform):
comm, _ = models.Command.objects.get_or_create(platform=local_platform,
directive=sweep, stage=sweep_stage)
def setup_directive_args(self):
sweep, _ = models.Directive.objects.get_or_create(name="sweep")
sweep = models.Directive.objects.get(name="sweep")
RMIT_SCHEMA = "http://rmit.edu.au/schemas"
for i, sch in enumerate([
@@ -818,7 +853,7 @@ def setup_directive_args(self):
schema = models.Schema.objects.get(namespace=sch)
das, _ = models.DirectiveArgSet.objects.get_or_create(directive=sweep, order=i, schema=schema)
sweep_make, _ = models.Directive.objects.get_or_create(name="sweep_make")
sweep_make = models.Directive.objects.get(name="sweep_make")
RMIT_SCHEMA = "http://rmit.edu.au/schemas"
for i, sch in enumerate([
@@ -830,9 +865,101 @@ def setup_directive_args(self):
schema = models.Schema.objects.get(namespace=sch)
das, _ = models.DirectiveArgSet.objects.get_or_create(directive=sweep_make, order=i, schema=schema)
sweep_make = models.Directive.objects.get(name="sweep_vasp")
RMIT_SCHEMA = "http://rmit.edu.au/schemas"
for i, sch in enumerate([
RMIT_SCHEMA + "/input/system/compplatform",
RMIT_SCHEMA + "/input/system",
RMIT_SCHEMA + "/input/vasp",
RMIT_SCHEMA + "/input/mytardis",
RMIT_SCHEMA + "/input/sweep"
]):
schema = models.Schema.objects.get(namespace=sch)
das, _ = models.DirectiveArgSet.objects.get_or_create(directive=sweep_make, order=i, schema=schema)
def define_vasp(self, nci_platform):
vasp, _ = models.Directive.objects.get_or_create(
name="vasp", defaults={'description': "VASP Connector", 'hidden':True})
smartpack = "bdphpcprovider.smartconnectorscheduler.stages"
self.upload_makefile = smartpack + ".make.movement.MakeUploadStage"
self.download_makefile = smartpack + ".make.movement.MakeDownloadStage"
self.remotemake_stage = smartpack + ".make.remotemake.MakeRunStage"
self.make_finished_stage = smartpack + ".make.makefinished.MakeFinishedStage"
vasp_composite_stage, _ = models.Stage.objects.get_or_create(
name="vasp_connector",
description="VASP Connector",
package=self.parallel_package,
order=0)
vasp_composite_stage.update_settings({})
# TODO: need to build specific upload/download stages because no way
# adapt to different connectors yet...
# copies input files + makefile to remote system
upload_makefile_stage, _ = models.Stage.objects.get_or_create(
name="upload_makefile",
description="upload payload to remote",
package=self.upload_makefile,
parent=vasp_composite_stage,
order=1)
upload_makefile_stage.update_settings(
{
'http://rmit.edu.au/schemas/remotemake/config':
{
u'payload_destination': 'iet595/remotemake',
u'payload_source': 'file://127.0.0.1/local/vasppayload',
}
})
# executes make with run target
remotemake_stage, _ = models.Stage.objects.get_or_create(
name="make",
description="Makefile execution stage",
package=self.remotemake_stage,
parent=vasp_composite_stage,
order=2)
remotemake_stage.update_settings({})
# executes make with finished target and repeats until finished.
make_finished_stage, _ = models.Stage.objects.get_or_create(
name="makefinished",
description="Makefile execution stage",
package=self.make_finished_stage,
parent=vasp_composite_stage,
order=3)
logger.debug('make_finished_stage=%s' % str(make_finished_stage))
make_finished_stage.update_settings({})
# # copies input files + makefile to remote system
# download_makefile_stage, _ = models.Stage.objects.get_or_create(
# name="download_makefile",
# description="download payload to remote",
# package=self.download_makefile,
# parent=vasp_composite_stage,
# order=4)
# download_makefile_stage.update_settings({})
# FIXME: not clear wether we need to store platform in command
# as different stages make run on different platforms.
comm, _ = models.Command.objects.get_or_create(
platform=nci_platform,
directive=vasp,
stage=vasp_composite_stage)
# RMIT_SCHEMA = "http://rmit.edu.au/schemas"
# for i, sch in enumerate([
# RMIT_SCHEMA + "/input/system",
# RMIT_SCHEMA + "/input/mytardis",
# RMIT_SCHEMA + "/input/sweep"
# ]):
# schema = models.Schema.objects.get(namespace=sch)
# das, _ = models.DirectiveArgSet.objects.get_or_create(directive=vasp, order=i, schema=schema)
def define_remote_make(self, nci_platform):
remote_make, _ = models.Directive.objects.get_or_create(name="remotemake", description="Remote execution of a Makefile")
remote_make, _ = models.Directive.objects.get_or_create(name="remotemake", defaults={'description': "Remote execution of a Makefile"})
smartpack = "bdphpcprovider.smartconnectorscheduler.stages"
self.upload_makefile = smartpack + ".make.movement.MakeUploadStage"
self.download_makefile = smartpack + ".make.movement.MakeDownloadStage"
@@ -861,7 +988,7 @@ def define_remote_make(self, nci_platform):
'http://rmit.edu.au/schemas/remotemake/config':
{
u'payload_destination': 'iet595/remotemake',
u'payload_source': 'file://127.0.0.1/local/vasppayload',
u'payload_source': 'file://127.0.0.1/local/testpayload',
}
})
# executes make with run target
@@ -76,11 +76,14 @@ def process(self, run_settings):
settings = setup_settings(run_settings)
logger.debug("settings=%s" % settings)
_upload_payload(settings, settings['payload_source'])
map_initial_location = "%s/initial" % settings['input_location']
values_map = _load_values_map(settings, map_initial_location)
logger.debug("values_map=%s" % values_map)
_upload_payload(settings, settings['payload_source'], values_map)
_upload_variations_inputs(
settings,
settings['input_location'])
settings['input_location'], values_map)
smartconnector.info(run_settings, "1: upload done")
@@ -120,7 +123,7 @@ def _get_dest_bdp_url(settings):
str(settings['contextid'])))
def _upload_payload(settings, source_url):
def _upload_payload(settings, source_url, values_map):
encoded_s_url = get_url_with_pkey(settings, source_url)
logger.debug("encoded_s_url=%s" % encoded_s_url)
@@ -138,14 +141,30 @@ def _upload_payload(settings, source_url):
dest_url, is_relative_path=True, ip_address=settings['host'])
hrmcstages.copy_directories(encoded_s_url, encoded_d_url)
# context = _load_values_map(settings, source_url_initial)
# logger.debug("context=%s" % context)
for content_fname, content in _instantiate_context(
source_url,
settings,
values_map).items():
content_url = smartconnector.get_url_with_pkey(
settings,
os.path.join(dest_url, content_fname),
is_relative_path=True, ip_address=settings['host'])
logger.debug("content_url=%s" % content_url)
hrmcstages.put_file(content_url, content.encode('utf-8'))
logger.debug("done payload upload")
def _upload_variations_inputs(settings, source_url):
def _upload_variations_inputs(settings, source_url, values_map):
#variation_map = {'a': [3]}
variation_map = ast.literal_eval(settings['sweep_map'])
logger.debug("variation_map=%s" % variation_map)
# variation_map = ast.literal_eval(settings['sweep_map'])
# logger.debug("variation_map=%s" % variation_map)
bdp_username = settings['bdp_username']
@@ -159,9 +178,6 @@ def _upload_variations_inputs(settings, source_url):
encoded_s_url = get_url_with_pkey(settings, source_url_initial)
logger.debug("encoded_s_url=%s" % encoded_s_url)
context = _load_values_map(settings, source_url_initial)
logger.debug("context=%s" % context)
dest_url = _get_dest_bdp_url(settings)
computation_platform_url = settings['comp_platform_url']
@@ -179,7 +195,7 @@ def _upload_variations_inputs(settings, source_url):
for content_fname, content in _instantiate_context(
source_url_initial,
settings,
context).items():
values_map).items():
content_url = smartconnector.get_url_with_pkey(
settings,
@@ -188,7 +204,7 @@ def _upload_variations_inputs(settings, source_url):
logger.debug("content_url=%s" % content_url)
hrmcstages.put_file(content_url, content.encode('utf-8'))
_save_values(settings, dest_url, context)
_save_values(settings, dest_url, values_map)
logger.debug("done input upload")
Oops, something went wrong.

0 comments on commit a8404e9

Please sign in to comment.