-
Notifications
You must be signed in to change notification settings - Fork 30
/
test_project.py
322 lines (273 loc) · 12.7 KB
/
test_project.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""
Tests for ProjectController
"""
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import os
import tempfile
import platform
import time
import timeout_decorator
from io import open
try:
to_unicode = unicode
except NameError:
to_unicode = str
try:
def to_bytes(val):
return bytes(val)
to_bytes("test")
except TypeError:
def to_bytes(val):
return bytes(val, "utf-8")
to_bytes("test")
from datmo.config import Config
from datmo.core.controller.project import ProjectController
from datmo.core.controller.snapshot import SnapshotController
from datmo.core.controller.environment.environment import EnvironmentController
from datmo.core.controller.task import TaskController
from datmo.core.entity.snapshot import Snapshot
from datmo.core.entity.task import Task
from datmo.core.util.exceptions import ValidationFailed
from datmo.core.util.misc_functions import check_docker_inactive, pytest_docker_environment_failed_instantiation
# provide mountable tmp directory for docker
tempfile.tempdir = "/tmp" if not platform.system() == "Windows" else None
test_datmo_dir = os.environ.get('TEST_DATMO_DIR', tempfile.gettempdir())
class TestProjectController():
def setup_method(self):
self.temp_dir = tempfile.mkdtemp(dir=test_datmo_dir)
Config().set_home(self.temp_dir)
self.project_controller = ProjectController()
self.environment_ids = []
def teardown_method(self):
if not check_docker_inactive(test_datmo_dir):
self.project_controller = ProjectController()
if self.project_controller.is_initialized:
self.environment_controller = EnvironmentController()
for env_id in list(set(self.environment_ids)):
if not self.environment_controller.delete(env_id):
raise Exception
def test_init_failure_none(self):
# Test failed case
failed = False
try:
self.project_controller.init(None, None)
except ValidationFailed:
failed = True
assert failed
def test_init_failure_empty_str(self):
# Test failed case
failed = False
try:
self.project_controller.init("", "")
except ValidationFailed:
failed = True
assert failed
assert not self.project_controller.code_driver.is_initialized
assert not self.project_controller.file_driver.is_initialized
def test_init_failure_git_code_driver(self):
# Create a HEAD.lock file in .git to make GitCodeDriver.init() fail
if self.project_controller.code_driver.type == "git":
git_dir = os.path.join(
self.project_controller.code_driver.filepath, ".git")
os.makedirs(git_dir)
with open(os.path.join(git_dir, "HEAD.lock"), "a+") as f:
f.write(to_bytes("test"))
failed = False
try:
self.project_controller.init("test1", "test description")
except Exception:
failed = True
assert failed
assert not self.project_controller.code_driver.is_initialized
assert not self.project_controller.file_driver.is_initialized
def test_init_success(self):
result = self.project_controller.init("test1", "test description")
# Tested with is_initialized
assert self.project_controller.model.name == "test1"
assert self.project_controller.model.description == "test description"
assert result and self.project_controller.is_initialized
# Changeable by user, not tested in is_initialized
assert self.project_controller.current_session.name == "default"
# TODO: Test lower level functions (DAL, JSONStore, etc for interruptions)
# def test_init_with_interruption(self):
# # Reinitializing after timed interruption during init
# @timeout_decorator.timeout(0.001, use_signals=False)
# def timed_init_with_interruption():
# result = self.project_controller.init("test1", "test description")
# return result
#
# failed = False
# try:
# timed_init_with_interruption()
# except timeout_decorator.timeout_decorator.TimeoutError:
# failed = True
# # Tested with is_initialized
# assert failed
#
# # Reperforming init after a wait of 2 seconds
# time.sleep(2)
# result = self.project_controller.init("test2", "test description")
# # Tested with is_initialized
# assert self.project_controller.model.name == "test2"
# assert self.project_controller.model.description == "test description"
# assert result and self.project_controller.is_initialized
#
# # Changeable by user, not tested in is_initialized
# assert self.project_controller.current_session.name == "default"
def test_init_reinit_failure_empty_str(self):
_ = self.project_controller.init("test1", "test description")
failed = True
try:
self.project_controller.init("", "")
except Exception:
failed = True
assert failed
assert self.project_controller.model.name == "test1"
assert self.project_controller.model.description == "test description"
assert self.project_controller.code_driver.is_initialized
assert self.project_controller.file_driver.is_initialized
def test_init_reinit_success(self):
_ = self.project_controller.init("test1", "test description")
# Test out functionality for re-initialize project
result = self.project_controller.init("anything", "else")
assert self.project_controller.model.name == "anything"
assert self.project_controller.model.description == "else"
assert result == True
def test_cleanup_no_environment(self):
self.project_controller.init("test2", "test description")
result = self.project_controller.cleanup()
assert not self.project_controller.code_driver.is_initialized
assert not self.project_controller.file_driver.is_initialized
# Ensure that containers built with this image do not exist
# assert not self.project_controller.environment_driver.list_containers(filters={
# "ancestor": image_id
# })
assert result == True
@pytest_docker_environment_failed_instantiation(test_datmo_dir)
def test_cleanup_with_environment(self):
self.project_controller.init("test2", "test description")
result = self.project_controller.cleanup()
assert not self.project_controller.code_driver.is_initialized
assert not self.project_controller.file_driver.is_initialized
assert not self.project_controller.environment_driver.list_images(
"datmo-test2")
# Ensure that containers built with this image do not exist
# assert not self.project_controller.environment_driver.list_containers(filters={
# "ancestor": image_id
# })
assert result == True
def test_status_basic(self):
self.project_controller.init("test3", "test description")
status_dict, latest_snapshot, ascending_unstaged_task_list = \
self.project_controller.status()
assert status_dict
assert isinstance(status_dict, dict)
assert status_dict['name'] == "test3"
assert status_dict['description'] == "test description"
assert isinstance(status_dict['config'], dict)
assert not latest_snapshot
assert not ascending_unstaged_task_list
self.task_controller = TaskController()
# Create and run a task and test if unstaged task is shown
first_task = self.task_controller.create()
# Create task_dict
task_command = ["sh", "-c", "echo accuracy:0.45"]
task_dict = {"command_list": task_command}
# Create a file so it can create a snapshot for the task
env_def_path = os.path.join(self.task_controller.home, "Dockerfile")
with open(env_def_path, "wb") as f:
f.write(to_bytes("FROM python:3.5-alpine"))
updated_first_task = self.task_controller.run(
first_task.id, task_dict=task_dict)
after_snapshot_obj = self.task_controller.dal.snapshot.get_by_id(
updated_first_task.after_snapshot_id)
environment_obj = self.task_controller.dal.environment.get_by_id(
after_snapshot_obj.environment_id)
self.environment_ids.append(environment_obj.id)
status_dict, latest_snapshot, ascending_unstaged_task_list = \
self.project_controller.status()
assert status_dict
assert isinstance(status_dict, dict)
assert status_dict['name'] == "test3"
assert status_dict['description'] == "test description"
assert isinstance(status_dict['config'], dict)
assert not latest_snapshot
assert ascending_unstaged_task_list
assert updated_first_task in ascending_unstaged_task_list
@pytest_docker_environment_failed_instantiation(test_datmo_dir)
def test_status_snapshot_task(self):
self.project_controller.init("test4", "test description")
self.snapshot_controller = SnapshotController()
self.task_controller = TaskController()
# Create files to add
self.snapshot_controller.file_driver.create("dirpath1", directory=True)
self.snapshot_controller.file_driver.create("dirpath2", directory=True)
self.snapshot_controller.file_driver.create("filepath1")
# Create environment_driver definition
env_def_path = os.path.join(self.snapshot_controller.home,
"Dockerfile")
with open(env_def_path, "wb") as f:
f.write(to_bytes("FROM python:3.5-alpine"))
environment_paths = [env_def_path]
# Create config
config_filepath = os.path.join(self.snapshot_controller.home,
"config.json")
with open(config_filepath, "wb") as f:
f.write(to_bytes(str("{}")))
# Create stats
stats_filepath = os.path.join(self.snapshot_controller.home,
"stats.json")
with open(stats_filepath, "wb") as f:
f.write(to_bytes(str("{}")))
input_dict = {
"message":
"my test snapshot",
"paths": [
os.path.join(self.snapshot_controller.home, "dirpath1"),
os.path.join(self.snapshot_controller.home, "dirpath2"),
os.path.join(self.snapshot_controller.home, "filepath1")
],
"environment_paths":
environment_paths,
"config_filename":
config_filepath,
"stats_filename":
stats_filepath,
}
# Create snapshot in the project
first_snapshot = self.snapshot_controller.create(input_dict)
status_dict, latest_snapshot, ascending_unstaged_task_list = \
self.project_controller.status()
assert status_dict
assert isinstance(status_dict, dict)
assert status_dict['name'] == "test4"
assert status_dict['description'] == "test description"
assert isinstance(status_dict['config'], dict)
assert isinstance(latest_snapshot, Snapshot)
assert latest_snapshot.id == first_snapshot.id
assert not ascending_unstaged_task_list
# Create and run a task and test if unstaged task is shown
first_task = self.task_controller.create()
# Create task_dict
task_command = ["sh", "-c", "echo accuracy:0.45"]
task_dict = {"command_list": task_command}
updated_first_task = self.task_controller.run(
first_task.id, task_dict=task_dict)
after_snapshot_obj = self.task_controller.dal.snapshot.get_by_id(
updated_first_task.after_snapshot_id)
environment_obj = self.task_controller.dal.environment.get_by_id(
after_snapshot_obj.environment_id)
self.environment_ids.append(environment_obj.id)
status_dict, latest_snapshot, ascending_unstaged_task_list = \
self.project_controller.status()
assert status_dict
assert isinstance(status_dict, dict)
assert status_dict['name'] == "test4"
assert status_dict['description'] == "test description"
assert isinstance(status_dict['config'], dict)
assert isinstance(latest_snapshot, Snapshot)
assert latest_snapshot.id == first_snapshot.id
assert isinstance(ascending_unstaged_task_list[0], Task)
assert ascending_unstaged_task_list[0].id == updated_first_task.id