Skip to content

Commit

Permalink
Merge pull request #15 from datmo/task-stop
Browse files Browse the repository at this point in the history
adding task stop command
  • Loading branch information
asampat3090 committed Apr 24, 2018
2 parents 8ef6025 + 973425a commit 1a0a1c9
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 77 deletions.
41 changes: 28 additions & 13 deletions datmo/cli/command/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ def run(self, **kwargs):
task_obj = self.task_controller.create(task_dict)

# Pass in the task
self.task_controller.run(task_obj.id, snapshot_dict=snapshot_dict)

try:
self.task_controller.run(task_obj.id, snapshot_dict=snapshot_dict)
except:
self.cli_helper.echo(__("error",
"cli.task.run",
task_obj.id))
return False
return task_obj.id

def ls(self, **kwargs):
Expand All @@ -80,19 +85,29 @@ def ls(self, **kwargs):
t.add_row([task_obj.id, task_obj.command, task_obj.status, task_obj.gpu,
task_obj.created_at.strftime("%Y-%m-%d %H:%M:%S")])
self.cli_helper.echo(t)

return True

# TODO: implement with proper task controller function
# def stop(self, **kwargs):
# id = kwargs.get('id', None)
# try:
# task_delete_dict = {"id": id}
# self.task_controller.delete(**task_delete_dict)
# except Exception:
# self.cli_helper.echo(__("error",
# "cli.task.delete"))
# return False
# return True
def stop(self, **kwargs):
id = kwargs.get('id', None)
task_stop_dict = {"id": id}
self.cli_helper.echo(__("info",
"cli.task.stop",
id))
try:
result = self.task_controller.stop(**task_stop_dict)
if not result:
self.cli_helper.echo(__("error",
"cli.task.stop",
id))
return result
except:
self.cli_helper.echo(__("error",
"cli.task.stop",
id))
return False





Expand Down
116 changes: 67 additions & 49 deletions datmo/cli/command/test/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,31 @@ def test_task_project_not_init(self):
def test_datmo_task_run(self):
self.__set_variables()

# Test failure case
self.task.parse([
"task",
"run"
])
failed = False
try:
_ = self.task.execute()
except:
failed = True
assert failed

# Test failure case execute
test_command = ["yo", "yo"]
self.task.parse([
"task",
"run",
test_command
])
result = self.task.execute()
assert not result

# Test success case
test_command = ["sh", "-c", "echo yo"]
test_gpu = True
test_gpu = True # TODO: implement in controller
test_ports = "8888:8888"
test_dockerfile = os.path.join(self.temp_dir, "Dockerfile")
test_interactive = True
Expand Down Expand Up @@ -127,54 +150,49 @@ def test_datmo_task_ls_invalid_arg(self):
exception_thrown = True
assert exception_thrown

# TODO: implement with proper task controller
# def test_datmo_task_stop(self):
# self.__set_variables()
#
# test_command = ["sh", "-c", "echo yo"]
# test_ports = "8888:8888"
# test_dockerfile = os.path.join(self.temp_dir, "Dockerfile")
#
# self.task.parse([
# "task",
# "run",
# "--gpu",
# "--ports", test_ports,
# "--env-def", test_dockerfile,
# "--interactive",
# test_command
# ])
#
# test_task_id = self.task.execute()
#
# self.task.parse([
# "task",
# "stop",
# "--id", test_task_id
# ])
#
# # test for desired side effects
# assert self.task.args.id == test_task_id
#
# # test when task id is passed to stop it
# task_stop_command = self.task.execute()
# assert task_stop_command == True
#
# # Passing wrong task id
# test_task_id = "task_id"
# self.task.parse([
# "task",
# "stop",
# "--id", test_task_id
# ])
#
# # test when wrong task id is passed to stop it
# failed = False
# try:
# self.task.execute()
# except:
# failed = True
# assert failed
def test_datmo_task_stop(self):
self.__set_variables()

test_command = ["sh", "-c", "echo yo"]
test_ports = "8888:8888"
test_dockerfile = os.path.join(self.temp_dir, "Dockerfile")

self.task.parse([
"task",
"run",
"--gpu",
"--ports", test_ports,
"--env-def", test_dockerfile,
"--interactive",
test_command
])

test_task_id = self.task.execute()

self.task.parse([
"task",
"stop",
"--id", test_task_id
])

# test for desired side effects
assert self.task.args.id == test_task_id

# test when task id is passed to stop it
task_stop_command = self.task.execute()
assert task_stop_command == True

# Passing wrong task id
test_task_id = "task_id"
self.task.parse([
"task",
"stop",
"--id", test_task_id
])

# test when wrong task id is passed to stop it
result = self.task.execute()
assert not result

def test_datmo_task_stop_invalid_arg(self):
self.__set_variables()
Expand Down
25 changes: 24 additions & 1 deletion datmo/core/controller/environment/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,27 @@ def delete(self, id):
delete_success = self.dal.environment.delete(environment_obj.id)

return file_collection_deleted and environment_artifacts_removed and \
delete_success
delete_success

def stop(self, run_id):
"""Stop the trace of running Environment
Parameters
----------
run_id : str
run id with specific environment to be stopped
Returns
-------
bool
True if success
Raises
------
DoesNotExistException
if the specified Environment does not exist.
"""
# Stop the instance(e.g. container) running using environment driver(e.g. docker)
stop_success = self.environment_driver.stop(run_id, force=True)

return stop_success
57 changes: 53 additions & 4 deletions datmo/core/controller/environment/test/test_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from datmo.core.controller.project import ProjectController
from datmo.core.controller.environment.environment import \
EnvironmentController
from datmo.core.util.exceptions import EntityNotFound, \
DoesNotExistException
from datmo.core.util.exceptions import EntityNotFound


class TestEnvironmentController():
Expand Down Expand Up @@ -167,7 +166,6 @@ def test_run(self):
"api": False
}


# Create environment_driver definition
env_def_path = os.path.join(self.project.home,
"Dockerfile")
Expand Down Expand Up @@ -260,4 +258,55 @@ def test_delete(self):
thrown = True

assert result == True and \
thrown == True
thrown == True

def test_stop(self):
self.project.init("test5", "test description")

# Create environment definition
definition_filepath = os.path.join(self.environment.home,
"Dockerfile")
with open(definition_filepath, "w") as f:
f.write(str("FROM datmo/xgboost:cpu"))

run_options = {
"command": ["sh", "-c", "echo yo"],
"ports": ["8888:8888"],
"name": None,
"volumes": None,
"detach": False,
"stdin_open": False,
"tty": False,
"gpu": False,
"api": False
}

# Create environment_driver definition
env_def_path = os.path.join(self.project.home,
"Dockerfile")
with open(env_def_path, "w") as f:
f.write(str("FROM datmo/xgboost:cpu"))

input_dict = {
"definition_filepath": definition_filepath,
}

# Create environment in the project
environment_obj = self.environment.create(input_dict)

log_filepath = os.path.join(self.project.home,
"task.log")

# Build environment in the project
_ = self.environment.build(environment_obj.id)

# Run environment in the project
_, run_id, _ = \
self.environment.run(environment_obj.id, run_options, log_filepath)

# Stop the running environment
return_code = self.environment.stop(run_id)
assert return_code

# teardown
self.environment.delete(environment_obj.id)
36 changes: 29 additions & 7 deletions datmo/core/controller/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TaskController(BaseController):
creates a Task object with the permanent parameters
_run_helper(environment_id, log_filepath, options)
helper for run to start environment and run with the appropriate parameters
run(self, task_id, dictionary=None)
run(self, id, dictionary=None)
runs the task and tracks the run, logs, inputs and outputs
list(session_id=None)
lists all tasks within the project given filters
Expand Down Expand Up @@ -141,7 +141,7 @@ def _run_helper(self, environment_id,

return return_code, container_id, logs

def run(self, task_id, snapshot_dict={"visible": False}, task_dict={}):
def run(self, id, snapshot_dict={"visible": False}, task_dict={}):
"""Run a task with parameters. If dictionary specified, create a new task with new run parameters.
Snapshot objects are created before and after the task to keep track of the state. During the run,
you can access task outputs using environment variable DATMO_TASK_DIR or `/task` which points to
Expand All @@ -150,7 +150,7 @@ def run(self, task_id, snapshot_dict={"visible": False}, task_dict={}):
Parameters
----------
task_id : str
id : str
id for the task you would like to run
snapshot_dict : dict
set of parameters to create a snapshot (see SnapshotController for details.
Expand All @@ -170,7 +170,7 @@ def run(self, task_id, snapshot_dict={"visible": False}, task_dict={}):
If there is any error in creating files for the task or downstream errors
"""
# Obtain Task to run
task_obj = self.dal.task.get_by_id(task_id)
task_obj = self.dal.task.get_by_id(id)

# Create Task directory for user during run
task_dirpath = os.path.join("datmo_tasks",
Expand Down Expand Up @@ -262,6 +262,28 @@ def list(self, session_id=None):
def delete(self, id):
if not id:
raise RequiredArgumentMissing(__("error",
"controller.task.delete.arg",
"id"))
return self.dal.task.delete(id)
"controller.task.delete.arg",
"id"))
return self.dal.task.delete(id)

def stop(self, id):
"""Stop and remove container for the task
Parameters
----------
id : str
id for the task you would like to stop
Returns
-------
return_code : bool
system return code of the stop
"""
if not id:
raise RequiredArgumentMissing(__("error",
"controller.task.stop.arg",
"id"))
task_obj = self.dal.task.get_by_id(id)
container_id = task_obj.container_id
return_code = self.environment.stop(container_id)
return return_code

0 comments on commit 1a0a1c9

Please sign in to comment.