Skip to content

Commit

Permalink
Merge f4ef66f into 998f7bd
Browse files Browse the repository at this point in the history
  • Loading branch information
atronchi committed Jan 3, 2017
2 parents 998f7bd + f4ef66f commit ae14ee7
Showing 1 changed file with 60 additions and 9 deletions.
69 changes: 60 additions & 9 deletions genie-client/src/main/python/pygenie/jobs/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,62 @@ def status(self):
self._info['status'] = self._status

return self._status.upper() if self._status else None

@property
def stderr_url(self):
"""
Returns a url for the stderr of the job.
"""

return '{}/stderr'.format(self.output_uri.replace('/output/', '/file/', 1))

def update_stderr(self):
'''
Use the http Range header to incrementally update stderr output While
this version keeps all of stderr in memory, if it exceeds available
memory, we could instead use iterators to retrieve and print only new
lines, keeping only the number of bytes retrieved in memory.
Returns the updated fraction of stderr.
'''

headers = {
'user-agent': '/'.join([
socket.getfqdn(),
'nflx-genie-client',
pkg_resources.get_distribution('nflx-genie-client').version
]),
'Range': 'bytes={}-'.format(len(self._cached_stderr) if self._cached_stderr else 0)
}

r = requests.request('get', url=self.stderr_url(), headers=headers,
auth=self._adapter.auth_handler.auth, stream=False)

if r.status_code in (200, 206):
else:
print('\nError updating stderr: Status code {}'.format(r.status_code))

if self._cached_stderr:
self._cached_stderr += r.content
else
self._cached_stderr = r.content

return r.content

def watch_stderr(self, interval=1):
'''Tail output of a genie job's stderr to the terminal until the job is done.
Args:
interval (int or float, optional): Time between updates.
'''

while not self.is_done:
time.sleep(interval)
print(self.update_stderr())
sys.stdout.flush()

print(self.update_stderr())
print('\n\nGenie job {}. {}'.format(self.status, self.output_uri))

def stderr(self, iterator=False, **kwargs):
"""
Expand All @@ -536,15 +592,10 @@ def stderr(self, iterator=False, **kwargs):
str or iterator.
"""

if self.is_done:
if not self._cached_stderr:
self._cached_stderr = self._adapter.get_stderr(self._job_id,
**kwargs)
return self._cached_stderr.split('\n') if iterator \
else self._cached_stderr
return self._adapter.get_stderr(self._job_id,
iterator=iterator,
**kwargs)
self.update_stderr()

return self._cached_stderr.split('\n') if iterator \
else self._cached_stderr

@property
def stdout_url(self):
Expand Down

0 comments on commit ae14ee7

Please sign in to comment.