Skip to content

Commit

Permalink
Merge pull request #246 from datmo/run-data
Browse files Browse the repository at this point in the history
adding data option for runs and workspaces
  • Loading branch information
shabazpatel committed Jul 29, 2018
2 parents 6d5d3c1 + 3a0df38 commit e61d3cc
Show file tree
Hide file tree
Showing 19 changed files with 522 additions and 44 deletions.
17 changes: 13 additions & 4 deletions datmo/cli/command/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

from datmo.config import Config
from datmo.core.util.i18n import get as __
from datmo.core.util.exceptions import ClassMethodNotFound
from datmo.core.util.exceptions import ClassMethodNotFound, PathDoesNotExist
from datmo.cli.parser import get_datmo_parser
from datmo.core.controller.task import TaskController
from datmo.core.util.logger import DatmoLogger
from datmo.core.util.misc_functions import parameterized

from datmo.core.util.misc_functions import parameterized, parse_paths

class BaseCommand(object):
def __init__(self, cli_helper):
Expand Down Expand Up @@ -99,7 +98,7 @@ def execute(self):
method_result = method(**command_args)
return method_result

def task_run_helper(self, task_dict, snapshot_dict, error_identifier):
def task_run_helper(self, task_dict, snapshot_dict, error_identifier, data_paths=None):
"""
Run task with given parameters and provide error identifier
Expand All @@ -111,6 +110,8 @@ def task_run_helper(self, task_dict, snapshot_dict, error_identifier):
input snapshot dictionary for task run controller
error_identifier : str
identifier to print error
data_paths : list
list of data paths being passed for task run
Returns
-------
Expand All @@ -125,6 +126,14 @@ def task_run_helper(self, task_dict, snapshot_dict, error_identifier):
# Pass in the task
status = "NOT STARTED"
try:
if data_paths:
try:
_, _, task_dict['data_file_path_map'], task_dict['data_directory_path_map'] = \
parse_paths(self.task_controller.home, data_paths, '/data')
except PathDoesNotExist as e:
self.cli_helper.echo(__("error", "cli.run.parse.paths", str(e)))
return False

updated_task_obj = self.task_controller.run(
task_obj.id, snapshot_dict=snapshot_dict, task_dict=task_dict)
status = "SUCCESS"
Expand Down
6 changes: 5 additions & 1 deletion datmo/cli/command/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,11 @@ def run(self, **kwargs):
else:
task_dict['command_list'] = kwargs['cmd']

data_paths = kwargs['data']
# Run task and return Task object result
task_obj = self.task_run_helper(task_dict, snapshot_dict,
"cli.run.run")
"cli.run.run",
data_paths=data_paths)
if not task_obj:
return False
# Creating the run object
Expand Down Expand Up @@ -405,6 +407,8 @@ def rerun(self, **kwargs):
"interactive": task_obj.interactive,
"mem_limit": task_obj.mem_limit,
"command_list": command,
"data_file_path_map": task_obj.data_file_path_map,
"data_directory_path_map": task_obj.data_directory_path_map,
"workspace": task_obj.workspace
}
# Run task and return Task object result
Expand Down
69 changes: 69 additions & 0 deletions datmo/cli/command/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,75 @@ def test_run(self):
# test when all is passed to stop all
self.run_command.execute()

@pytest_docker_environment_failed_instantiation(test_datmo_dir)
def test_run_data_dir(self):
# TODO: Adding test with `--interactive` argument and terminate inside container
self.__set_variables()
# Test success case
test_filename = "script.py"
test_command = ["python", test_filename]
test_dockerfile = os.path.join(self.temp_dir, "Dockerfile")
test_mem_limit = "4g"
# Test success for run with directory being passed
test_data_dir_1 = os.path.join(tempfile.mkdtemp(dir=test_datmo_dir), "data1")
os.mkdir(test_data_dir_1)
test_data_dir_2 = os.path.join(tempfile.mkdtemp(dir=test_datmo_dir), "data2")
os.mkdir(test_data_dir_2)
with open(os.path.join(test_data_dir_1, "file.txt"), "wb") as f:
f.write(to_bytes('my initial line in 1\n'))
with open(os.path.join(test_data_dir_2, "file.txt"), "wb") as f:
f.write(to_bytes('my initial line in 2\n'))
test_filename = "script.py"
test_filepath = os.path.join(self.temp_dir, test_filename)
with open(test_filepath, "wb") as f:
f.write(to_bytes("import os\n"))
f.write(to_bytes("print('hello')\n"))
f.write(to_bytes("import shutil\n"))

f.write(
to_bytes(
"with open(os.path.join('/data', 'data1', 'file.txt'), 'a') as f:\n"
))
f.write(to_bytes(" f.write('my test file in 1')\n"))

f.write(
to_bytes(
"with open(os.path.join('/data', 'data2', 'file.txt'), 'a') as f:\n"
))
f.write(to_bytes(" f.write('my test file in 2')\n"))

self.run_command.parse([
"run", "--environment-paths", test_dockerfile,
"--data", test_data_dir_1, "--data", test_data_dir_2,
"--mem-limit", test_mem_limit, test_command
])

# test proper execution of run command
result = self.run_command.execute()
time.sleep(1)
assert result
assert isinstance(result, RunObject)
assert result.logs
assert result.status == "SUCCESS"
assert result.start_time
assert result.end_time
assert result.duration
assert result.core_snapshot_id
assert result.core_snapshot_id == result.after_snapshot_id
assert result.environment_id
assert "my initial line in 1" in open(os.path.join(test_data_dir_1,
"file.txt"), "r").read()
assert "my test file in 1" in open(os.path.join(test_data_dir_1,
"file.txt"), "r").read()
assert "my initial line in 2" in open(os.path.join(test_data_dir_2,
"file.txt"), "r").read()
assert "my test file in 2" in open(os.path.join(test_data_dir_2,
"file.txt"), "r").read()
# teardown
self.run_command.parse(["stop", "--all"])
# test when all is passed to stop all
self.run_command.execute()

@pytest_docker_environment_failed_instantiation(test_datmo_dir)
def test_run_string_command(self):
# TODO: Adding test with `--interactive` argument and terminate inside container
Expand Down
16 changes: 12 additions & 4 deletions datmo/cli/command/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ def notebook(self, **kwargs):
"mem_limit": kwargs["mem_limit"],
"workspace": "notebook"
}
data_paths = kwargs['data']
self.cli_helper.echo(__("info", "cli.workspace.run.notebook"))
# Run task and return Task object result
return self.task_run_helper(task_dict, snapshot_dict,
"cli.workspace.notebook")
"cli.workspace.notebook",
data_paths=data_paths)

@Helper.notify_environment_active(TaskController)
@Helper.notify_no_project_found
Expand All @@ -52,10 +54,12 @@ def jupyterlab(self, **kwargs):
"mem_limit": kwargs["mem_limit"],
"workspace": "jupyterlab"
}
data_paths = kwargs['data']
self.cli_helper.echo(__("info", "cli.workspace.run.jupyterlab"))
# Run task and return Task object result
return self.task_run_helper(task_dict, snapshot_dict,
"cli.workspace.jupyterlab")
"cli.workspace.jupyterlab",
data_paths=data_paths)


@Helper.notify_environment_active(TaskController)
Expand All @@ -77,9 +81,11 @@ def terminal(self, **kwargs):
"command_list": ["/bin/bash"]
}

data_paths = kwargs['data']
# Run task and return Task object result
return self.task_run_helper(task_dict, snapshot_dict,
"cli.workspace.terminal")
"cli.workspace.terminal",
data_paths=data_paths)

@Helper.notify_environment_active(TaskController)
@Helper.notify_no_project_found
Expand All @@ -104,7 +110,9 @@ def rstudio(self, **kwargs):
kwargs["mem_limit"],
"workspace": "rstudio"
}
data_paths = kwargs['data']
self.cli_helper.echo(__("info", "cli.workspace.run.rstudio"))
# Run task and return Task object result
return self.task_run_helper(task_dict, snapshot_dict,
"cli.workspace.rstudio")
"cli.workspace.rstudio",
data_paths=data_paths)
50 changes: 50 additions & 0 deletions datmo/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ def get_datmo_parser():
help=
"maximum amount of memory the notebook environment can use (these options take a positive integer, followed by a suffix of b, k, m, g, to indicate bytes, kilobytes, megabytes, or gigabytes)"
)
notebook_parser.add_argument(
"--data",
dest="data",
default=None,
action="append",
type=str,
help=
"list of absolute or relative filepath and/or dirpaths for data; can specify destination names"
" with '>' (e.g. /path/to/dir, /path/to/dir>newdir, /path/to/file)"
)

# Jupyterlab
jupyterlab_parser = subparsers.add_parser(
Expand Down Expand Up @@ -81,6 +91,16 @@ def get_datmo_parser():
help=
"maximum amount of memory the jupyterlab environment can use (these options take a positive integer, followed by a suffix of b, k, m, g, to indicate bytes, kilobytes, megabytes, or gigabytes)"
)
jupyterlab_parser.add_argument(
"--data",
dest="data",
default=None,
action="append",
type=str,
help=
"list of absolute or relative filepath and/or dirpaths for data; can specify destination names"
" with '>' (e.g. /path/to/dir, /path/to/dir>newdir, /path/to/file)"
)

# Terminal
terminal_parser = subparsers.add_parser("terminal", help="To run terminal")
Expand Down Expand Up @@ -112,6 +132,16 @@ def get_datmo_parser():
help=
"maximum amount of memory the terminal environment can use (these options take a positive integer, followed by a suffix of b, k, m, g, to indicate bytes, kilobytes, megabytes, or gigabytes)"
)
terminal_parser.add_argument(
"--data",
dest="data",
default=None,
action="append",
type=str,
help=
"list of absolute or relative filepath and/or dirpaths for data; can specify destination names"
" with '>' (e.g. /path/to/dir, /path/to/dir>newdir, /path/to/file)"
)

# Rstudio
rstudio_parser = subparsers.add_parser(
Expand Down Expand Up @@ -139,6 +169,16 @@ def get_datmo_parser():
help=
"maximum amount of memory the rstudio environment can use (these options take a positive integer, followed by a suffix of b, k, m, g, to indicate bytes, kilobytes, megabytes, or gigabytes)"
)
rstudio_parser.add_argument(
"--data",
dest="data",
default=None,
action="append",
type=str,
help=
"list of absolute or relative filepath and/or dirpaths for data; can specify destination names"
" with '>' (e.g. /path/to/dir, /path/to/dir>newdir, /path/to/file)"
)

# Run
run_parser = subparsers.add_parser("run", help="run module")
Expand Down Expand Up @@ -193,6 +233,16 @@ def get_datmo_parser():
nargs="?",
default=None,
help="command to run within environment")
run_parser.add_argument(
"--data",
dest="data",
default=None,
action="append",
type=str,
help=
"list of absolute or relative filepath and/or dirpaths for data; can specify destination names"
" with '>' (e.g. /path/to/dir, /path/to/dir>newdir, /path/to/file)"
)

# List runs
ls_runs_parser = subparsers.add_parser(
Expand Down
8 changes: 5 additions & 3 deletions datmo/core/controller/file/driver/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,15 @@ def create_collection(self, paths):
self.ensure_collections_dir()
temp_collection_path = get_datmo_temp_path(self.root)

_, _, files_rel, dirs_rel = parse_paths(self.root, paths, temp_collection_path)

filehash = self.calculate_hash_paths(paths, temp_collection_path)

# Move contents to folder with filehash as name and remove temp_collection_path
collection_path = os.path.join(self.datmo_directory, "collections",
filehash)
if os.path.isdir(collection_path):
return filehash
return filehash, files_rel, dirs_rel
# raise FileStructureError("exception.file.create_collection", {
# "exception": "File collection with id already exists."
# })
Expand All @@ -246,11 +248,11 @@ def create_collection(self, paths):

# Removing temp collection path
shutil.rmtree(temp_collection_path)
return filehash
return filehash, files_rel, dirs_rel

def calculate_hash_paths(self, paths, directory):
try:
files, dirs = parse_paths(self.root, paths, directory)
files, dirs, _, _ = parse_paths(self.root, paths, directory)
except PathDoesNotExist as e:
raise PathDoesNotExist(
__("error",
Expand Down

0 comments on commit e61d3cc

Please sign in to comment.