Skip to content

Commit

Permalink
Do not set extra attributes to Application object, use Run object ins…
Browse files Browse the repository at this point in the history
…tead.

Fixes issue #535
  • Loading branch information
arcimboldo committed Aug 18, 2016
1 parent 7fbcc93 commit b43ad26
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
24 changes: 12 additions & 12 deletions gc3libs/backends/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __init__(self, name,

self.region = ec2_region

# Mapping of job.ec2_instance_id => LRMS
# Mapping of job.execution.ec2_instance_id => LRMS
self.subresources = {}

auth = self._auth_fn()
Expand Down Expand Up @@ -647,7 +647,7 @@ def get_user_data_for_job(self, job):

@same_docstring_as(LRMS.cancel_job)
def cancel_job(self, app):
resource = self._get_subresource(self._get_vm(app.ec2_instance_id))
resource = self._get_subresource(self._get_vm(app.execution.ec2_instance_id))
return resource.cancel_job(app)

@same_docstring_as(LRMS.get_resource_status)
Expand Down Expand Up @@ -766,22 +766,22 @@ def get_resource_status(self):
@same_docstring_as(LRMS.get_results)
def get_results(self, app, download_dir, overwrite=False,
changed_only=True):
subresource = self._get_subresource(self._get_vm(app.ec2_instance_id))
subresource = self._get_subresource(self._get_vm(app.execution.ec2_instance_id))
return subresource.get_results(app, download_dir,
overwrite=overwrite,
changed_only=changed_only)

@same_docstring_as(LRMS.update_job_state)
def update_job_state(self, app):
if app.ec2_instance_id not in self.subresources:
if app.execution.ec2_instance_id not in self.subresources:
try:
self.subresources[app.ec2_instance_id] = self._get_subresource(
self._get_vm(app.ec2_instance_id))
self.subresources[app.execution.ec2_instance_id] = self._get_subresource(
self._get_vm(app.execution.ec2_instance_id))
except InstanceNotFound as ex:
gc3libs.log.error(
"Changing state of task '%s' to TERMINATED since EC2 "
"instance '%s' does not exist anymore.",
app.execution.lrms_jobid, app.ec2_instance_id)
app.execution.lrms_jobid, app.execution.ec2_instance_id)
app.execution.state = Run.State.TERMINATED
raise ex
except UnrecoverableError as ex:
Expand All @@ -791,7 +791,7 @@ def update_job_state(self, app):
app.execution.state = Run.State.UNKNOWN
raise ex

return self.subresources[app.ec2_instance_id].update_job_state(app)
return self.subresources[app.execution.ec2_instance_id].update_job_state(app)

def submit_job(self, job):
"""
Expand Down Expand Up @@ -868,7 +868,7 @@ def submit_job(self, job):
or vm.instance_type != instance_type):
continue
resource.submit_job(job)
job.ec2_instance_id = vm_id
job.execution.ec2_instance_id = vm_id
job.changed = True
gc3libs.log.info(
"Job successfully submitted to remote resource %s.",
Expand Down Expand Up @@ -930,7 +930,7 @@ def submit_job(self, job):
@same_docstring_as(LRMS.peek)
def peek(self, app, remote_filename, local_file, offset=0, size=None):
resource = self._get_subresource(
self._get_vm(app.ec2_instance_id))
self._get_vm(app.execution.ec2_instance_id))
return resource.peek(app, remote_filename, local_file, offset, size)

def validate_data(self, data_file_list=None):
Expand Down Expand Up @@ -960,7 +960,7 @@ def free(self, app):

# freeing the resource from the application is now needed as
# the same instanc may run multiple applications
resource = self._get_subresource(self._get_vm(app.ec2_instance_id))
resource = self._get_subresource(self._get_vm(app.execution.ec2_instance_id))
resource.free(app)

# FIXME: current approach in terminating running instances:
Expand All @@ -969,7 +969,7 @@ def free(self, app):
resource.get_resource_status()
if len(resource.job_infos) == 0:
# turn VM off
vm = self._get_vm(app.ec2_instance_id)
vm = self._get_vm(app.execution.ec2_instance_id)
gc3libs.log.info("VM instance %s at %s is no longer needed."
" Terminating.", vm.id, vm.public_dns_name)
del self.subresources[vm.id]
Expand Down
34 changes: 17 additions & 17 deletions gc3libs/backends/openstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __init__(self, name,
if self.subresource_type not in available_subresource_types:
raise UnrecoverableError("Invalid resource type: %s" % self.type)

# Mapping of job.os_instance_id => LRMS
# Mapping of job.execution.instance_id => LRMS
self.subresources = {}

auth = self._auth_fn()
Expand Down Expand Up @@ -627,7 +627,7 @@ def get_user_data_for_job(self, job):
def cancel_job(self, app):
try:
subresource = self._get_subresource(
self._get_vm(app.os_instance_id))
self._get_vm(app.execution.os_instance_id))
return subresource.cancel_job(app)
except InstanceNotFound:
# ignore -- if this VM exists no more, we need not cancel any job
Expand Down Expand Up @@ -762,18 +762,18 @@ def get_results(self, app, download_dir, overwrite=False,
changed_only=True):
try:
subresource = self._get_subresource(
self._get_vm(app.os_instance_id))
self._get_vm(app.execution.os_instance_id))
except InstanceNotFound:
gc3libs.log.error(
"Changing state of task '%s' to TERMINATED since OpenStack"
" instance '%s' does not exist anymore.",
app.execution.lrms_jobid, app.os_instance_id)
app.execution.lrms_jobid, app.execution.os_instance_id)
app.execution.state = Run.State.TERMINATED
app.execution.signal = Run.Signals.RemoteError
app.execution.history.append(
"State changed to TERMINATED since OpenStack"
" instance '%s' does not exist anymore."
% (app.os_instance_id,))
% (app.execution.os_instance_id,))
raise UnrecoverableDataStagingError(
"VM where job was running is no longer available")
except UnrecoverableError as err:
Expand All @@ -793,21 +793,21 @@ def get_results(self, app, download_dir, overwrite=False,
@same_docstring_as(LRMS.update_job_state)
def update_job_state(self, app):
self._connect()
if app.os_instance_id not in self.subresources:
if app.execution.os_instance_id not in self.subresources:
try:
self.subresources[app.os_instance_id] = self._get_subresource(
self._get_vm(app.os_instance_id))
self.subresources[app.execution.os_instance_id] = self._get_subresource(
self._get_vm(app.execution.os_instance_id))
except InstanceNotFound:
gc3libs.log.error(
"Changing state of task '%s' to TERMINATED since OpenStack"
" instance '%s' does not exist anymore.",
app.execution.lrms_jobid, app.os_instance_id)
app.execution.lrms_jobid, app.execution.os_instance_id)
app.execution.state = Run.State.TERMINATED
app.execution.signal = Run.Signals.RemoteError
app.execution.history.append(
"State changed to TERMINATED since OpenStack"
" instance '%s' does not exist anymore."
% (app.os_instance_id,))
% (app.execution.os_instance_id,))
raise
except UnrecoverableError as err:
gc3libs.log.error(
Expand All @@ -820,7 +820,7 @@ def update_job_state(self, app):
" an OpenStack API error (%s: %s)."
% (err.__class__.__name__, err))
raise
return self.subresources[app.os_instance_id].update_job_state(app)
return self.subresources[app.execution.os_instance_id].update_job_state(app)

def submit_job(self, job):
"""
Expand Down Expand Up @@ -889,7 +889,7 @@ def submit_job(self, job):
if vm.image['id'] != image_id:
continue
subresource.submit_job(job)
job.os_instance_id = vm_id
job.execution.instance_id = vm_id
job.changed = True
gc3libs.log.info(
"Job successfully submitted to remote resource %s.",
Expand Down Expand Up @@ -954,18 +954,18 @@ def submit_job(self, job):
def peek(self, app, remote_filename, local_file, offset=0, size=None):
try:
subresource = self._get_subresource(
self._get_vm(app.os_instance_id))
self._get_vm(app.execution.os_instance_id))
except InstanceNotFound:
gc3libs.log.error(
"Changing state of task '%s' to TERMINATED since OpenStack"
" instance '%s' does not exist anymore.",
app.execution.lrms_jobid, app.os_instance_id)
app.execution.lrms_jobid, app.execution.os_instance_id)
app.execution.state = Run.State.TERMINATED
app.execution.signal = Run.Signals.RemoteError
app.execution.history.append(
"State changed to TERMINATED since OpenStack"
" instance '%s' does not exist anymore."
% (app.os_instance_id,))
% (app.execution.os_instance_id,))
raise UnrecoverableDataStagingError(
"VM where job was running is no longer available.")
return subresource.peek(app, remote_filename, local_file, offset, size)
Expand Down Expand Up @@ -999,7 +999,7 @@ def free(self, app):
# the same instanc may run multiple applications
try:
subresource = self._get_subresource(
self._get_vm(app.os_instance_id))
self._get_vm(app.execution.os_instance_id))
except InstanceNotFound:
# ignore -- if the instance is no more, there is
# nothing we should free
Expand All @@ -1012,7 +1012,7 @@ def free(self, app):
subresource.get_resource_status()
if len(subresource.job_infos) == 0:
# turn VM off
vm = self._get_vm(app.os_instance_id)
vm = self._get_vm(app.execution.os_instance_id)

gc3libs.log.info("VM instance %s at %s is no longer needed."
" Terminating.", vm.id, vm.preferred_ip)
Expand Down

0 comments on commit b43ad26

Please sign in to comment.