/
localDocker.py
288 lines (251 loc) · 9.67 KB
/
localDocker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
#
# localDocker.py - Implements the Tango VMMS interface to run Tango jobs in
# docker containers. In this context, VMs are docker containers.
#
import random
import subprocess
import re
import time
import logging
import threading
import os
import sys
import shutil
import config
from tangoObjects import TangoMachine
def timeout(command, time_out=1):
"""timeout - Run a unix command with a timeout. Return -1 on
timeout, otherwise return the return value from the command, which
is typically 0 for success, 1-255 for failure.
"""
# Launch the command
p = subprocess.Popen(
command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT
)
# Wait for the command to complete
t = 0.0
while t < time_out and p.poll() is None:
time.sleep(config.Config.TIMER_POLL_INTERVAL)
t += config.Config.TIMER_POLL_INTERVAL
# Determine why the while loop terminated
if p.poll() is None:
try:
os.kill(p.pid, 9)
except OSError:
pass
returncode = -1
else:
returncode = p.poll()
return returncode
def timeoutWithReturnStatus(command, time_out, returnValue=0):
"""timeoutWithReturnStatus - Run a Unix command with a timeout,
until the expected value is returned by the command; On timeout,
return last error code obtained from the command.
"""
p = subprocess.Popen(
command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT
)
t = 0.0
while t < time_out:
ret = p.poll()
if ret is None:
time.sleep(config.Config.TIMER_POLL_INTERVAL)
t += config.Config.TIMER_POLL_INTERVAL
elif ret == returnValue:
return ret
else:
p = subprocess.Popen(
command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT
)
return ret
#
# User defined exceptions
#
class LocalDocker(object):
def __init__(self):
"""Checks if the machine is ready to run docker containers.
Initialize boot2docker if running on OS X.
"""
try:
self.log = logging.getLogger("LocalDocker")
# Check import docker constants are defined in config
if len(config.Config.DOCKER_VOLUME_PATH) == 0:
raise Exception("DOCKER_VOLUME_PATH not defined in config.")
except Exception as e:
self.log.error(str(e))
exit(1)
def instanceName(self, id, name):
"""instanceName - Constructs a VM instance name. Always use
this function when you need a VM instance name. Never generate
instance names manually.
"""
return "%s-%s-%s" % (config.Config.PREFIX, id, name)
def getVolumePath(self, instanceName):
volumePath = config.Config.DOCKER_VOLUME_PATH
# Last empty string to cause trailing '/'
volumePath = os.path.join(volumePath, instanceName, "")
return volumePath
def getDockerVolumePath(self, dockerPath, instanceName):
# Last empty string to cause trailing '/'
volumePath = os.path.join(dockerPath, instanceName, "")
return volumePath
def domainName(self, vm):
"""Returns the domain name that is stored in the vm
instance.
"""
return vm.domain_name
#
# VMMS API functions
#
def initializeVM(self, vm):
"""initializeVM - Nothing to do for initializeVM"""
return vm
def waitVM(self, vm, max_secs):
"""waitVM - Nothing to do for waitVM"""
return
def copyIn(self, vm, inputFiles):
"""copyIn - Create a directory to be mounted as a volume
for the docker containers. Copy input files to this directory.
"""
instanceName = self.instanceName(vm.id, vm.image)
volumePath = self.getVolumePath(instanceName)
# Create a fresh volume
os.makedirs(volumePath)
for file in inputFiles:
# Create output directory if it does not exist
os.makedirs(os.path.dirname(volumePath), exist_ok=True)
shutil.copy(file.localFile, volumePath + file.destFile)
self.log.debug(
"Copied in file %s to %s" % (file.localFile, volumePath + file.destFile)
)
return 0
def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork):
"""runJob - Run a docker container by doing the follows:
- mount directory corresponding to this job to /home/autolab
in the container
- run autodriver with corresponding ulimits and timeout as
autolab user
"""
instanceName = self.instanceName(vm.id, vm.image)
volumePath = self.getVolumePath(instanceName)
if os.getenv("DOCKER_TANGO_HOST_VOLUME_PATH"):
volumePath = self.getDockerVolumePath(
os.getenv("DOCKER_TANGO_HOST_VOLUME_PATH"), instanceName
)
args = ["docker", "run", "--name", instanceName, "-v"]
args = args + ["%s:%s" % (volumePath, "/home/mount")]
if vm.cores:
args = args + [f"--cpus={vm.cores}"]
if vm.memory:
args = args + ["-m", f"{vm.memory}m"]
if disableNetwork:
args = args + ["--network", "none"]
args = args + [vm.image]
args = args + ["sh", "-c"]
autodriverCmd = (
"autodriver -u %d -f %d -t %d -o %d autolab > output/feedback 2>&1"
% (
config.Config.VM_ULIMIT_USER_PROC,
config.Config.VM_ULIMIT_FILE_SIZE,
runTimeout,
config.Config.MAX_OUTPUT_FILE_SIZE,
)
)
args = args + [
'cp -r mount/* autolab/; su autolab -c "%s"; \
cp output/feedback mount/feedback'
% autodriverCmd
]
self.log.debug("Running job: %s" % str(args))
ret = timeout(args, runTimeout * 2)
self.log.debug("runJob returning %d" % ret)
return ret
def copyOut(self, vm, destFile):
"""copyOut - Copy the autograder feedback from container to
destFile on the Tango host. Then, destroy that container.
Containers are never reused.
"""
instanceName = self.instanceName(vm.id, vm.image)
volumePath = self.getVolumePath(instanceName)
shutil.move(volumePath + "feedback", destFile)
self.log.debug("Copied feedback file to %s" % destFile)
self.destroyVM(vm)
return 0
def destroyVM(self, vm):
"""destroyVM - Delete the docker container."""
instanceName = self.instanceName(vm.id, vm.image)
volumePath = self.getVolumePath("")
# Do a hard kill on corresponding docker container.
# Return status does not matter.
timeout(["docker", "rm", "-f", instanceName], config.Config.DOCKER_RM_TIMEOUT)
# Destroy corresponding volume if it exists.
if instanceName in os.listdir(volumePath):
shutil.rmtree(volumePath + instanceName)
self.log.debug("Deleted volume %s" % instanceName)
return
def safeDestroyVM(self, vm):
"""safeDestroyVM - Delete the docker container and make
sure it is removed.
"""
start_time = time.time()
while self.existsVM(vm):
if time.time() - start_time > config.Config.DESTROY_SECS:
self.log.error("Failed to safely destroy container %s" % vm.name)
return
self.destroyVM(vm)
return
def getVMs(self):
"""getVMs - Executes and parses `docker ps`. This function
is a lot of parsing and can break easily.
"""
# Get all volumes of docker containers
machines = []
volumePath = self.getVolumePath("")
for volume in os.listdir(volumePath):
if re.match("%s-" % config.Config.PREFIX, volume):
machine = TangoMachine()
machine.vmms = "localDocker"
machine.name = volume
volume_l = volume.split("-")
machine.id = volume_l[1]
machine.image = volume_l[2]
machines.append(machine)
return machines
def existsVM(self, vm):
"""existsVM - Executes `docker inspect CONTAINER`, which returns
a non-zero status upon not finding a container.
"""
instanceName = self.instanceName(vm.id, vm.name)
ret = timeout(["docker", "inspect", instanceName])
return ret == 0
def getImages(self):
"""getImages - Executes `docker images` and returns a list of
images that can be used to boot a docker container with. This
function is a lot of parsing and so can break easily.
"""
result = set()
cmd = "docker images"
o = subprocess.check_output(cmd, shell=True).decode("utf-8")
o_l = o.split("\n")
o_l.pop()
o_l.reverse()
o_l.pop()
for row in o_l:
row_l = row.split(" ")
result.add(re.sub(r".*/([^/]*)", r"\1", row_l[0]))
return list(result)
def getPartialOutput(self, vm):
"""getPartialOutput - Get the partial output of a job.
It does not check if the docker container exists before executing
as the command will not fail even if the container does not exist.
Gets the first MAX_OUTPUT_FILE_SIZE bytes of the feedback file
"""
instanceName = self.instanceName(vm.id, vm.image)
cmd = "docker exec %s head -c %s autograde/output.log" % (
instanceName,
config.Config.MAX_OUTPUT_FILE_SIZE,
)
output = subprocess.check_output(
cmd, stderr=subprocess.STDOUT, shell=True
).decode("utf-8")
return output