Skip to content

Commit

Permalink
execution: unify docker cmdline usage (prepping for kubernetes drop-i…
Browse files Browse the repository at this point in the history
…n replacement)
  • Loading branch information
lukasheinrich committed Nov 9, 2017
1 parent ff680c9 commit 5ee0d76
Showing 1 changed file with 117 additions and 136 deletions.
253 changes: 117 additions & 136 deletions packtivity/handlers/execution_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,81 +22,59 @@ def sourcepath(path):
else:
return path

def state_context_to_mounts(state):
readwrites = state.readwrite
readonlies = state.readonly
mounts = ''
for rw in readwrites:
mounts += '-v {}:{}:rw'.format(sourcepath(os.path.abspath(rw)),rw)
for ro in readonlies:
mounts += ' -v {}:{}:ro'.format(sourcepath(ro),ro)
return mounts

def prepare_par_mounts(parmounts,state):
mounts = []
for i,x in enumerate(parmounts):
parmountfile = os.path.join(state.readwrite[0],'_yadage_parmount_{}.txt'.format(i))
with open(parmountfile,'w') as f:
f.write(x['mountcontent'])

mounts.append('{}:{}'.format(
mounts.append(' -v {}:{}'.format(
os.path.abspath(parmountfile),
x['mountpath']
))

return mounts

def state_context_to_mounts(state):
readwrites = state.readwrite
readonlies = state.readonly
mounts = ''
for rw in readwrites:
mounts += '-v {}:{}:rw'.format(sourcepath(os.path.abspath(rw)),rw)
for ro in readonlies:
mounts += ' -v {}:{}:ro'.format(sourcepath(ro),ro)
return mounts

def cvmfs_from_volume_plugin(command_line,cvmfs_repos = None):
def cvmfs_from_volume_plugin(cvmfs_repos = None):
if not cvmfs_repos:
cvmfs_repos = yaml.load(os.environ.get('PACKTIVITY_CVMFS_REPOS','null'))
if not cvmfs_repos:
cvmfs_repos = ['atlas.cern.ch','atlas-condb.cern.ch','sft.cern.ch']
command_line += ' --security-opt label:disable'
command_line = ' --security-opt label:disable'
for repo in cvmfs_repos:
command_line += ' --volume-driver cvmfs -v {cvmfs_repo}:/cvmfs/{cvmfs_repo}'.format(cvmfs_repo = repo)
return command_line

def cvmfs_from_external_mount(command_line):
command_line+=' -v {}:/cvmfs'.format(os.environ.get('PACKTIVITY_CVMFS_LOCATION','/cvmfs'))
return command_line

def prepare_docker(state,do_cvmfs,do_auth,par_mounts,log,metadata):
docker_mod = state_context_to_mounts(state)



if do_cvmfs:
cvmfs_source = os.environ.get('PACKTIVITY_CVMFS_SOURCE','external')
if cvmfs_source == 'external':
docker_mod = cvmfs_from_external_mount(docker_mod)
elif cvmfs_source == 'voldriver':
docker_mod = cvmfs_from_volume_plugin(docker_mod)
else:
raise RuntimeError('unknown CVMFS location requested')

if do_auth:
if 'PACKTIVITY_AUTH_LOCATION' not in os.environ:
docker_mod+=' -v /home/recast/recast_auth:/recast_auth'
else:
docker_mod+=' -v {}:/recast_auth'.format(os.environ['PACKTIVITY_AUTH_LOCATION'])


for x in par_mounts:
docker_mod+=' -v {}'.format(x)


cidfile = '{}/{}.cid'.format(state.metadir,metadata['name'])

if os.path.exists(cidfile):
log.warning('cid file %s seems to exist, container execution will crash',cidfile)
docker_mod += ' --cidfile {}'.format(cidfile)
def cvmfs_from_external_mount():
return ' -v {}:/cvmfs'.format(os.environ.get('PACKTIVITY_CVMFS_LOCATION','/cvmfs'))

def cvmfs_mount():
cvmfs_source = os.environ.get('PACKTIVITY_CVMFS_SOURCE','external')
if cvmfs_source == 'external':
return cvmfs_from_external_mount()
elif cvmfs_source == 'voldriver':
return cvmfs_from_volume_plugin()
else:
raise RuntimeError('unknown CVMFS location requested')

docker_mod += ' {}'.format(os.environ.get('PACKTIVITY_DOCKER_CMD_MOD',''))

return docker_mod
def auth_mount():
if 'PACKTIVITY_AUTH_LOCATION' not in os.environ:
return ' -v /home/recast/recast_auth:/recast_auth'
else:
return ' -v {}:/recast_auth'.format(os.environ['PACKTIVITY_AUTH_LOCATION'])

def prepare_docker_context(state,environment,log,metadata):
def resource_mounts(state,environment,log,metadata):
report = '''\n\
--------------
run in docker container image: {image}
Expand All @@ -113,9 +91,50 @@ def prepare_docker_context(state,environment,log,metadata):
do_auth = ('GRIDProxy' in environment['resources']) or ('KRB5Auth' in environment['resources'])
log.debug('do_auth: %s do_cvmfs: %s',do_auth,do_cvmfs)

par_mounts = prepare_par_mounts(environment['par_mounts'], state)

return prepare_docker(state,do_cvmfs,do_auth,par_mounts,log,metadata)
resource_mounts = ''
if do_cvmfs:
resource_mounts+=cvmfs_mount()

if do_auth:
resource_mounts+=auth_mount()

return resource_mounts

def docker_execution_cmdline(state,environment,log,metadata,combined_flags,cmd_argv):
quoted_string = ' '.join(map(pipes.quote,cmd_argv))

image = environment['image']
imagetag = environment['imagetag']

# generic non-volume mount flags
workdir_flag = '-w {}'.format(environment['workdir']) if environment['workdir'] is not None else ''

cidfile = '{}/{}.cid'.format(state.metadir,metadata['name'])
if os.path.exists(cidfile):
log.warning('cid file %s seems to exist, container execution will crash',cidfile)
cid_file = '--cidfile {}'.format(cidfile)

custom_mod = ' {}'.format(os.environ.get('PACKTIVITY_DOCKER_CMD_MOD',''))

# volume mounts (resources, parameter mounts and state mounts)
state_mounts = state_context_to_mounts(state)
rsrcs_mounts = resource_mounts(state,environment,log,metadata)

par_mounts = ' '.join(prepare_par_mounts(environment['par_mounts'], state))

return 'docker run {combined} {cid} {workdir} {custom} {state_mounts} {rsrcs} {par_mounts} {img}:{tag} {command}'.format(
combined = combined_flags,
cid = cid_file,
workdir = workdir_flag,
custom = custom_mod,
state_mounts = state_mounts,
rsrcs = rsrcs_mounts,
par_mounts = par_mounts,
img = image,
tag = imagetag,
command = quoted_string
)

def run_docker_with_script(state,environment,job,log,metadata):
script = job['script']
Expand All @@ -130,59 +149,12 @@ def run_docker_with_script(state,environment,job,log,metadata):
envmod = 'source {} && '.format(environment['envscript']) if environment['envscript'] else ''
indocker = envmod+indocker

try:
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:
subcmd = docker_execution_cmdline(
state,environment,log,metadata,
combined_flags = '--rm -i',
cmd_argv = ['sh', '-c', indocker]
)
log.debug('running docker cmd: %s',subcmd)
proc = subprocess.Popen(shlex.split(subcmd), stdin = subprocess.PIPE, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)

log.debug('started run subprocess with pid %s. now piping script',proc.pid)
proc.stdin.write(script.encode('utf-8'))
proc.stdin.close()
time.sleep(0.5)

for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('container execution finished. return code: %s',proc.returncode)
if proc.returncode:
log.error('non-zero return code raising exception')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = subcmd)
log.debug('moving on from run')
except subprocess.CalledProcessError as exc:
log.exception('subprocess failed. code: %s, command %s',exc.returncode,exc.cmd)
raise RuntimeError('failed container execution.')
except:
log.exception("Unexpected error: %s",sys.exc_info())
raise
finally:

log.debug('finally for run')

def docker_execution_cmdline(state,environment,log,metadata,combined_flags,cmd_argv):
quoted_string = ' '.join(map(pipes.quote,cmd_argv))

image = environment['image']
imagetag = environment['imagetag']
docker_mod = prepare_docker_context(state,environment,log,metadata)
workdir_flag = '-w {}'.format(environment['workdir']) if environment['workdir'] is not None else ''

return 'docker run {} {} {} {}:{} {}'.format(
combined_flags,
workdir_flag,
docker_mod,
image,
imagetag,
quoted_string
docker_run_cmd_str = docker_execution_cmdline(
state,environment,log,metadata,
combined_flags = '--rm -i',
cmd_argv = ['sh', '-c', indocker]
)
execute_docker(metadata,state,log,docker_run_cmd_str,stdin_content=script)

def run_docker_with_oneliner(state,environment,command,log,metadata):
log.debug('''\n\
Expand All @@ -200,76 +172,86 @@ def run_docker_with_oneliner(state,environment,command,log,metadata):
combined_flags = '--rm',
cmd_argv = ['sh', '-c', in_docker_cmd]
)
docker_run_cmd(docker_run_cmd_str,log,state,metadata)
execute_docker(metadata,state,log,docker_run_cmd_str)

def docker_pull(docker_pull_cmd,log,state,metadata):
log.debug('container image pull command: \n %s',docker_pull_cmd)
def execute_docker(metadata,state,log,docker_run_cmd_str,stdin_content = None):
log.debug('container execution command: \n%s',docker_run_cmd_str)
if 'PACKTIVITY_DRYRUN' in os.environ:
return
try:
with logutils.setup_logging_topic(metadata,state,'pull', return_logger = True) as pulllog:
proc = subprocess.Popen(shlex.split(docker_pull_cmd), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started pull subprocess with pid %s. now wait to finish',proc.pid)
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:

proc = None
if stdin_content:
log.debug('stdin: \n%s',stdin_content)
proc = subprocess.Popen(shlex.split(docker_run_cmd_str),
stdin = subprocess.PIPE,
stderr = subprocess.STDOUT,
stdout = subprocess.PIPE,
bufsize=1,
close_fds = True)
proc.stdin.write(stdin_content.encode('utf-8'))
proc.stdin.close()
else:
proc = subprocess.Popen(shlex.split(docker_run_cmd_str), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)

log.debug('started run subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])

for line in iter(proc.stdout.readline, b''):
pulllog.info(line.strip())
runlog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('pull subprocess finished. return code: %s',proc.returncode)
log.debug('container execution subprocess finished. return code: %s',proc.returncode)
if proc.returncode:
log.error('non-zero return code raising exception')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = docker_pull_cmd)
log.debug('moving on from pull')
except RuntimeError as e:
log.exception('caught RuntimeError')
raise e
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = docker_run_cmd_str)
log.debug('moving on from run')
except subprocess.CalledProcessError as exc:
log.exception('subprocess failed. code: %s, command %s',exc.returncode,exc.cmd)
raise RuntimeError('failed container image pull subprocess in docker_enc_handler.')
raise RuntimeError('failed container execution subprocess.')
except:
log.exception("Unexpected error: %s",sys.exc_info())
raise
finally:
log.debug('finally for pull')
log.debug('finally for run')

def docker_run_cmd(fullest_command,log,state,metadata):
log.debug('container execution command: \n%s',fullest_command)
def docker_pull(docker_pull_cmd,log,state,metadata):
log.debug('container image pull command: \n %s',docker_pull_cmd)
if 'PACKTIVITY_DRYRUN' in os.environ:
return
try:
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:
proc = subprocess.Popen(shlex.split(fullest_command), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started run subprocess with pid %s. now wait to finish',proc.pid)
with logutils.setup_logging_topic(metadata,state,'pull', return_logger = True) as pulllog:
proc = subprocess.Popen(shlex.split(docker_pull_cmd), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started pull subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])

for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
pulllog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('container execution subprocess finished. return code: %s',proc.returncode)
log.debug('pull subprocess finished. return code: %s',proc.returncode)
if proc.returncode:
log.error('non-zero return code raising exception')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = fullest_command)
log.debug('moving on from run')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = docker_pull_cmd)
log.debug('moving on from pull')
except RuntimeError as e:
log.exception('caught RuntimeError')
raise e
except subprocess.CalledProcessError as exc:
log.exception('subprocess failed. code: %s, command %s',exc.returncode,exc.cmd)
raise RuntimeError('failed container execution subprocess.')
raise RuntimeError('failed container image pull subprocess in docker_enc_handler.')
except:
log.exception("Unexpected error: %s",sys.exc_info())
raise
finally:
log.debug('finally for run')


log.debug('finally for pull')


@executor('docker-encapsulated')
Expand All @@ -288,7 +270,6 @@ def docker_enc_handler(environment,state,job,metadata):

if 'command' in job:
run_docker_with_oneliner(state,environment,job['command'],log,metadata)
log.debug('reached return for docker_enc_handler')
elif 'script' in job:
run_docker_with_script(state,environment,job,log,metadata)
else:
Expand Down

0 comments on commit 5ee0d76

Please sign in to comment.