Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add methods to RunningJob to incrementally update cached stderr and t… #436

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 60 additions & 9 deletions genie-client/src/main/python/pygenie/jobs/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,62 @@ def status(self):
self._info['status'] = self._status

return self._status.upper() if self._status else None

@property
def stderr_url(self):
"""
Returns a url for the stderr of the job.
"""

return '{}/stderr'.format(self.output_uri.replace('/output/', '/file/', 1))

def update_stderr(self):
'''
Use the http Range header to incrementally update stderr output While
this version keeps all of stderr in memory, if it exceeds available
memory, we could instead use iterators to retrieve and print only new
lines, keeping only the number of bytes retrieved in memory.

Returns the updated fraction of stderr.
'''

headers = {
'user-agent': '/'.join([
socket.getfqdn(),
'nflx-genie-client',
pkg_resources.get_distribution('nflx-genie-client').version
]),
'Range': 'bytes={}-'.format(len(self._cached_stderr) if self._cached_stderr else 0)
}

r = requests.request('get', url=self.stderr_url(), headers=headers,
auth=self._adapter.auth_handler.auth, stream=False)

if r.status_code in (200, 206):
else:
print('\nError updating stderr: Status code {}'.format(r.status_code))

if self._cached_stderr:
self._cached_stderr += r.content
else
self._cached_stderr = r.content

return r.content

def watch_stderr(self, interval=1):
'''Tail output of a genie job's stderr to the terminal until the job is done.

Args:
interval (int or float, optional): Time between updates.
'''

while not self.is_done:
time.sleep(interval)
print(self.update_stderr())
sys.stdout.flush()

print(self.update_stderr())
print('\n\nGenie job {}. {}'.format(self.status, self.output_uri))

def stderr(self, iterator=False, **kwargs):
"""
Expand All @@ -536,15 +592,10 @@ def stderr(self, iterator=False, **kwargs):
str or iterator.
"""

if self.is_done:
if not self._cached_stderr:
self._cached_stderr = self._adapter.get_stderr(self._job_id,
**kwargs)
return self._cached_stderr.split('\n') if iterator \
else self._cached_stderr
return self._adapter.get_stderr(self._job_id,
iterator=iterator,
**kwargs)
self.update_stderr()

return self._cached_stderr.split('\n') if iterator \
else self._cached_stderr

@property
def stdout_url(self):
Expand Down