Skip to content

Commit

Permalink
Merge pull request #100 from datmo/task-after-snapshot
Browse files Browse the repository at this point in the history
adding try<>finally to finally update task obj in task run
  • Loading branch information
asampat3090 committed May 9, 2018
2 parents fae94b6 + c66319e commit 5370a94
Showing 1 changed file with 65 additions and 62 deletions.
127 changes: 65 additions & 62 deletions datmo/core/controller/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,69 +245,72 @@ def run(self, task_id, snapshot_dict=None, task_dict=None):
os.path.join(self.home, file_collection_obj.path),
os.path.join(self.home, task_obj.task_dirpath))

# Set the parameters set in the task
if task_obj.detach and task_obj.interactive:
raise TaskInteractiveDetachException(
__("error", "controller.task.run.args.detach.interactive"))
environment_run_options = {
"command": task_obj.command,
"ports": [] if task_obj.ports is None else task_obj.ports,
"name": "datmo-task-" + task_obj.id,
"volumes": {
os.path.join(self.home, task_obj.task_dirpath): {
'bind': '/task/',
'mode': 'rw'
return_code, run_id, logs = 0, None, None
try:
# Set the parameters set in the task
if task_obj.detach and task_obj.interactive:
raise TaskInteractiveDetachException(
__("error", "controller.task.run.args.detach.interactive"))
environment_run_options = {
"command": task_obj.command,
"ports": [] if task_obj.ports is None else task_obj.ports,
"name": "datmo-task-" + task_obj.id,
"volumes": {
os.path.join(self.home, task_obj.task_dirpath): {
'bind': '/task/',
'mode': 'rw'
},
self.home: {
'bind': '/home/',
'mode': 'rw'
}
},
self.home: {
'bind': '/home/',
'mode': 'rw'
}
},
"detach": task_obj.detach,
"stdin_open": task_obj.interactive,
"tty": task_obj.interactive,
"api": False
}

# Run environment via the helper function
return_code, run_id, logs = \
self._run_helper(before_snapshot_obj.environment_id,
environment_run_options,
os.path.join(self.home, task_obj.log_filepath))

# Create the after snapshot after execution is completed with new filepaths
after_snapshot_dict = snapshot_dict.copy()
after_snapshot_dict[
'message'] = "autogenerated snapshot created after task %s is run" % task_obj.id

# Add in absolute filepaths from running task directory
absolute_task_dir_path = os.path.join(self.home, task_obj.task_dirpath)
absolute_filepaths = []
for item in os.listdir(absolute_task_dir_path):
path = os.path.join(absolute_task_dir_path, item)
if os.path.isfile(path) or os.path.isdir(path):
absolute_filepaths.append(path)
after_snapshot_dict.update({
"filepaths": absolute_filepaths,
"environment_id": before_snapshot_obj.environment_id,
})
after_snapshot_obj = self.snapshot.create(after_snapshot_dict)

# (optional) Remove temporary task directory path
# Update the task with post-execution parameters
end_time = datetime.utcnow()
duration = (end_time - task_obj.start_time).total_seconds()
return self.dal.task.update({
"id": task_obj.id,
"after_snapshot_id": after_snapshot_obj.id,
"run_id": run_id,
"logs": logs,
"status": "SUCCESS" if return_code == 0 else "FAILED",
"results": self._parse_logs_for_results(logs),
# "results": task_obj.results, # TODO: update during run
"end_time": end_time,
"duration": duration
})
"detach": task_obj.detach,
"stdin_open": task_obj.interactive,
"tty": task_obj.interactive,
"api": False
}

# Run environment via the helper function
return_code, run_id, logs = \
self._run_helper(before_snapshot_obj.environment_id,
environment_run_options,
os.path.join(self.home, task_obj.log_filepath))
finally:
# Create the after snapshot after execution is completed with new filepaths
after_snapshot_dict = snapshot_dict.copy()
after_snapshot_dict[
'message'] = "autogenerated snapshot created after task %s is run" % task_obj.id

# Add in absolute filepaths from running task directory
absolute_task_dir_path = os.path.join(self.home,
task_obj.task_dirpath)
absolute_filepaths = []
for item in os.listdir(absolute_task_dir_path):
path = os.path.join(absolute_task_dir_path, item)
if os.path.isfile(path) or os.path.isdir(path):
absolute_filepaths.append(path)
after_snapshot_dict.update({
"filepaths": absolute_filepaths,
"environment_id": before_snapshot_obj.environment_id,
})
after_snapshot_obj = self.snapshot.create(after_snapshot_dict)

# (optional) Remove temporary task directory path
# Update the task with post-execution parameters
end_time = datetime.utcnow()
duration = (end_time - task_obj.start_time).total_seconds()
return self.dal.task.update({
"id": task_obj.id,
"after_snapshot_id": after_snapshot_obj.id,
"run_id": run_id,
"logs": logs,
"status": "SUCCESS" if return_code == 0 else "FAILED",
"results": self._parse_logs_for_results(logs),
# "results": task_obj.results, # TODO: update during run
"end_time": end_time,
"duration": duration
})

def list(self, session_id=None, sort_key=None, sort_order=None):
query = {}
Expand Down

0 comments on commit 5370a94

Please sign in to comment.