Skip to content

Commit

Permalink
Merge pull request #28 from Autodesk/job-retrieval
Browse files Browse the repository at this point in the history
Retrieve jobs with engine.get_job
  • Loading branch information
avirshup committed Jun 11, 2018
2 parents 46e1859 + 7216593 commit 65a854b
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 145 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ codeship.aes
.cache
__pycache__
README.rst
.pytest_cache
.remote
25 changes: 25 additions & 0 deletions pyccc/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ class EngineBase(object):
This class defines the implementation only - you intantiate one of its subclasses
"""

USES_IMAGES = None
"bool: subclasses should set this to indicate whether they use the `job.image` field"

ABSPATHS = None
"""bool: subclasses should set this to indicate whether files can
be referenced via absolute path"""


hostname = 'not specified' # this should be overidden in subclass init methods

def __call__(self, *args, **kwargs):
Expand Down Expand Up @@ -59,6 +67,23 @@ def launch(self, image, command, **kwargs):
else:
return Job(self, image, command, **kwargs)

def get_job(self, jobid):
""" Return a Job object for this job.
The returned object will be suitable for retrieving output, but depending on the engine,
may not populate all fields used at launch time (such as `job.inputs`, `job.commands`, etc.)
Args:
jobid (Any): job id object
Returns:
pyccc.job.Job: job object for this job id
Raises:
pyccc.exceptions.JobNotFound: if no job could be located for this jobid
"""
raise NotImplementedError()

def submit(self, job):
"""
submit job to engine
Expand Down
78 changes: 63 additions & 15 deletions pyccc/engines/dockerengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

import subprocess

import docker
import docker.errors

from .. import docker_utils as du, DockerMachineError
from .. import utils, files, status
from .. import utils, files, status, exceptions
from . import EngineBase


class Docker(EngineBase):
""" A compute engine - uses a docker server to run jobs
"""
USES_IMAGES = True
ABSPATHS = True

def __init__(self, client=None, workingdir='/workingdir'):
""" Initialization:
Expand Down Expand Up @@ -61,6 +65,51 @@ def test_connection(self):
version = self.client.version()
return version

def get_job(self, jobid):
""" Return a Job object for the requested job id.
The returned object will be suitable for retrieving output, but depending on the engine,
may not populate all fields used at launch time (such as `job.inputs`, `job.commands`, etc.)
Args:
jobid (str): container id
Returns:
pyccc.job.Job: job object for this container
Raises:
pyccc.exceptions.JobNotFound: if no job could be located for this jobid
"""
import shlex
from pyccc.job import Job

job = Job(engine=self)
job.jobid = job.rundata.containerid = jobid
try:
jobdata = self.client.inspect_container(job.jobid)
except docker.errors.NotFound:
raise exceptions.JobNotFound(
'The daemon could not find containter "%s"' % job.jobid)

cmd = jobdata['Config']['Cmd']
entrypoint = jobdata['Config']['Entrypoint']

if len(cmd) == 3 and cmd[0:2] == ['sh', '-c']:
cmd = cmd[2]
elif entrypoint is not None:
cmd = entrypoint + cmd

if isinstance(cmd, list):
cmd = ' '.join(shlex.quote(x) for x in cmd)

job.command = cmd
job.env = jobdata['Config']['Env']
job.workingdir = jobdata['Config']['WorkingDir']
job.rundata.container = jobdata

return job


def submit(self, job):
""" Submit job to the engine
Expand All @@ -76,10 +125,10 @@ def submit(self, job):

container_args = self._generate_container_args(job)

job.container = self.client.create_container(job.imageid, **container_args)
self.client.start(job.container)
job.containerid = job.container['Id']
job.jobid = job.containerid
job.rundata.container = self.client.create_container(job.imageid, **container_args)
self.client.start(job.rundata.container)
job.rundata.containerid = job.rundata.container['Id']
job.jobid = job.rundata.containerid

def _generate_container_args(self, job):
container_args = dict(command="sh -c '%s'" % job.command,
Expand All @@ -104,7 +153,6 @@ def _generate_container_args(self, job):
bind = '%s:%s:%s' % (volume, mountpoint, mode)
else:
mountpoint = mount
mode = None
bind = '%s:%s' % (volume, mountpoint)

volumes.append(mountpoint)
Expand All @@ -117,29 +165,29 @@ def _generate_container_args(self, job):
return container_args

def wait(self, job):
stat = self.client.wait(job.container)
stat = self.client.wait(job.rundata.container)
if isinstance(stat, int): # i.e., docker<3
return stat
else: # i.e., docker>=3
return stat['StatusCode']

def kill(self, job):
self.client.kill(job.container)
self.client.kill(job.rundata.container)

def get_status(self, job):
inspect = self.client.inspect_container(job.containerid)
inspect = self.client.inspect_container(job.rundata.containerid)
if inspect['State']['Running']:
return status.RUNNING
else:
return status.FINISHED

def get_directory(self, job, path):
docker_host = du.kwargs_from_client(self.client)
remotedir = files.DockerArchive(docker_host, job.containerid, path)
remotedir = files.DockerArchive(docker_host, job.rundata.containerid, path)
return remotedir

def _list_output_files(self, job):
docker_diff = self.client.diff(job.container)
docker_diff = self.client.diff(job.rundata.container)
if docker_diff is None:
return {}

Expand All @@ -159,11 +207,11 @@ def _list_output_files(self, job):
else:
relative_path = filename

remotefile = files.LazyDockerCopy(docker_host, job.containerid, filename)
remotefile = files.LazyDockerCopy(docker_host, job.rundata.containerid, filename)
output_files[relative_path] = remotefile
return output_files

def _get_final_stds(self, job):
stdout = self.client.logs(job.container, stdout=True, stderr=False)
stderr = self.client.logs(job.container, stdout=False, stderr=True)
stdout = self.client.logs(job.rundata.container, stdout=True, stderr=False)
stderr = self.client.logs(job.rundata.container, stdout=False, stderr=True)
return stdout.decode('utf-8'), stderr.decode('utf-8')
47 changes: 25 additions & 22 deletions pyccc/engines/subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import subprocess
import locale

from pyccc import utils as utils, files
from pyccc import utils as utils, files, exceptions
from . import EngineBase, status


Expand All @@ -29,13 +29,15 @@ class Subprocess(EngineBase):
For now, don't return anything until job completes"""

hostname = 'local'
USES_IMAGES = False
ABSPATHS = False

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.term_encoding = locale.getpreferredencoding()

def get_status(self, job):
if job.subproc.poll() is None:
if job.rundata.subproc.poll() is None:
return status.RUNNING
else:
return status.FINISHED
Expand All @@ -50,37 +52,38 @@ def get_engine_description(self, job):
"""
Return a text description for the UI
"""
return 'Local subprocess %s' % job.subproc.pid
return 'Local subprocess %s' % job.rundata.subproc

def launch(self, image=None, command=None, **kwargs):
if command is None:
command = image
return super(Subprocess, self).launch('no_image', command, **kwargs)

def get_job(self, jobid):
raise NotImplementedError("Cannot retrieve jobs with the subprocess engine")

def submit(self, job):
self._check_job(job)
if job.workingdir is None:
job.workingdir = utils.make_local_temp_dir()
job.rundata.localdir = utils.make_local_temp_dir()

assert os.path.isabs(job.workingdir)
assert os.path.isabs(job.rundata.localdir)
if job.inputs:
for filename, f in job.inputs.items():
targetpath = self._check_file_is_under_workingdir(filename, job.workingdir)
targetpath = self._check_file_is_under_workingdir(filename, job.rundata.localdir)
f.put(targetpath)

subenv = os.environ.copy()
subenv['PYTHONIOENCODING'] = 'utf-8'
if job.env:
subenv.update(job.env)
job.subproc = subprocess.Popen(job.command,
shell=True,
cwd=job.workingdir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=subenv)
job.jobid = job.subproc.pid
job._started = True
return job.subproc.pid
job.rundata.subproc = subprocess.Popen(job.command,
shell=True,
cwd=job.rundata.localdir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=subenv)
job.jobid = job.rundata.subproc.pid
return job.rundata.subproc.pid

@staticmethod
def _check_file_is_under_workingdir(filename, wdir):
Expand All @@ -93,22 +96,22 @@ def _check_file_is_under_workingdir(filename, wdir):
wdir = os.path.realpath(wdir)
common = os.path.commonprefix([wdir, targetpath])
if len(common) < len(wdir):
raise ValueError(
raise exceptions.PathError(
"The subprocess engine does not support input files with absolute paths")
return p

def kill(self, job):
job.subproc.terminate()
job.rundata.subproc.terminate()

def wait(self, job):
return job.subproc.wait()
return job.rundata.subproc.wait()

def get_directory(self, job, path):
targetpath = self._check_file_is_under_workingdir(path, job.workingdir)
targetpath = self._check_file_is_under_workingdir(path, job.rundata.localdir)
return files.LocalDirectoryReference(targetpath)

def _list_output_files(self, job, dir=None):
if dir is None: dir = job.workingdir
if dir is None: dir = job.rundata.localdir
filenames = {}
for fname in os.listdir(dir):
abs_path = '%s/%s' % (dir, fname)
Expand All @@ -124,6 +127,6 @@ def _list_output_files(self, job, dir=None):

def _get_final_stds(self, job):
strings = []
for fileobj in (job.subproc.stdout, job.subproc.stderr):
for fileobj in (job.rundata.subproc.stdout, job.rundata.subproc.stderr):
strings.append(fileobj.read().decode('utf-8'))
return strings
8 changes: 8 additions & 0 deletions pyccc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ def __init__(self, engine):

class DockerMachineError(Exception):
""" Failures related to connecting to docker machines
"""

class PathError(Exception):
""" The engine can't fulfill the requested input or output filesystem path
"""

class JobNotFound(Exception):
""" The requested job was not found
"""
16 changes: 10 additions & 6 deletions pyccc/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import fnmatch

from mdtcollections import DotDict

import pyccc
from pyccc import files, status
from pyccc.utils import *
Expand All @@ -30,7 +32,9 @@
def exports(o):
__all__.append(o.__name__)
return o
__all__ = []


__all__ = ['Job']


class EngineFunction(object):
Expand Down Expand Up @@ -95,10 +99,11 @@ def __init__(self, engine=None,
self.command = if_not_none(command, '')
self.engine_options = if_not_none(engine_options, {})
self.workingdir = workingdir
self.env = env
self.rundata = DotDict()
self.env = if_not_none(env, {})

self.inputs = inputs
if self.inputs is not None: # translate strings into file objects
self.inputs = if_not_none(inputs, {})
if self.inputs: # translate strings into file objects
for filename, fileobj in inputs.items():
if isinstance(fileobj, basestring):
self.inputs[filename] = files.StringContainer(fileobj)
Expand All @@ -118,7 +123,6 @@ def __init__(self, engine=None,

def _reset(self):
self._submitted = False
self._started = False
self._final_stdout = None
self._final_stderr = None
self._finished = False
Expand All @@ -136,7 +140,7 @@ def _reset(self):

def __str__(self):
desc = ['Job "%s" status:%s' % (self.name, self.status)]
if self.jobid: desc.append('jobid:%s' % self.jobid)
if self.jobid: desc.append('jobid:%s' % (self.jobid,) )
if self.engine: desc.append('engine:%s' % type(self.engine).__name__)
return ' '.join(desc)

Expand Down

0 comments on commit 65a854b

Please sign in to comment.