Skip to content

Commit

Permalink
tfrun can bring all workers' log back (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
windreamer committed Jan 9, 2018
1 parent d7c8d49 commit e25a265
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
9 changes: 8 additions & 1 deletion script/tfrun
Expand Up @@ -28,6 +28,7 @@ parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('-V', '--volume', type=str, action='append')
parser.add_argument('-r', '--role', type=str, action='store')
parser.add_argument('-e', '--extra_config', type=str, action='store')
parser.add_argument('--worker-logs', type=str, help='Comma seperated worker ids, to bring logs back. "*" to bring all workers\' log back', default='0')
parser.add_argument('cmd', type=str)
parser.add_argument('args', nargs='*')

Expand Down Expand Up @@ -84,7 +85,13 @@ lfd.bind(('', 0))
addr = (socket.gethostname(), lfd.getsockname()[1])
lfd.listen(10)
fds = [lfd]
forward_addresses = {'/job:%s/task:%s' % (worker_name, 0): addr}

if args.worker_logs == '*':
forward_workers = range(nworker)
else:
forward_workers = map(int, args.worker_logs.split(','))

forward_addresses = {'/job:%s/task:%s' % (worker_name, i): addr for i in forward_workers}


with cluster(jobs_def, role=args.role, master=args.master,
Expand Down
19 changes: 16 additions & 3 deletions tfmesos/server.py
@@ -1,5 +1,6 @@
# coding: utf-8

import os
import sys
import socket
import subprocess
Expand Down Expand Up @@ -72,21 +73,33 @@ def main(argv):
worker_name = 'worker'
ps_hosts = ','.join(cluster_def[server_name])
worker_hosts = ','.join(cluster_def[worker_name])
env = os.environ.copy()
env['PYTHONUNBUFFERED'] = '1'
prefix = '[%s:%s] ' % (job_name, task_index)
prefix = prefix.encode('ascii')

cmd = cmd.format(
ps_hosts=ps_hosts, worker_hosts=worker_hosts,
job_name=job_name, task_index=task_index
)
out = os.fdopen(sys.stdout.fileno(), 'wb', 1)
try:
subprocess.check_call(cmd, shell=True, cwd=cwd, stdout=forward_fd)
p = subprocess.Popen(cmd, shell=True, cwd=cwd, stdout=subprocess.PIPE, bufsize=1, env=env)
for l in iter(p.stdout.readline, b''):
out.write(l)
if forward_fd:
forward_fd.send(prefix + l)

sys.exit(p.wait())
finally:
final_cmd = extra_config.get('finalizer')
if final_cmd is not None:
logger.info('Running clean up command {}'.format(final_cmd))
subprocess.check_call(final_cmd, shell=True)

if forward_fd:
forward_fd.close()
out.close()
if forward_fd:
forward_fd.close()


if __name__ == '__main__':
Expand Down

0 comments on commit e25a265

Please sign in to comment.