diff --git a/genie-client/src/main/python/pygenie/jobs/running.py b/genie-client/src/main/python/pygenie/jobs/running.py index f901f14720c..8ee75647877 100644 --- a/genie-client/src/main/python/pygenie/jobs/running.py +++ b/genie-client/src/main/python/pygenie/jobs/running.py @@ -518,6 +518,62 @@ def status(self): self._info['status'] = self._status return self._status.upper() if self._status else None + + @property + def stderr_url(self): + """ + Returns a url for the stderr of the job. + """ + + return '{}/stderr'.format(self.output_uri.replace('/output/', '/file/', 1)) + + def update_stderr(self): + ''' + Use the http Range header to incrementally update stderr output While + this version keeps all of stderr in memory, if it exceeds available + memory, we could instead use iterators to retrieve and print only new + lines, keeping only the number of bytes retrieved in memory. + + Returns the updated fraction of stderr. + ''' + + headers = { + 'user-agent': '/'.join([ + socket.getfqdn(), + 'nflx-genie-client', + pkg_resources.get_distribution('nflx-genie-client').version + ]), + 'Range': 'bytes={}-'.format(len(self._cached_stderr) if self._cached_stderr else 0) + } + + r = requests.request('get', url=self.stderr_url(), headers=headers, + auth=self._adapter.auth_handler.auth, stream=False) + + if r.status_code in (200, 206): + else: + print('\nError updating stderr: Status code {}'.format(r.status_code)) + + if self._cached_stderr: + self._cached_stderr += r.content + else + self._cached_stderr = r.content + + return r.content + + def watch_stderr(self, interval=1): + '''Tail output of a genie job's stderr to the terminal until the job is done. + + Args: + interval (int or float, optional): Time between updates. + ''' + + while not self.is_done: + time.sleep(interval) + print(self.update_stderr()) + sys.stdout.flush() + + print(self.update_stderr()) + print('\n\nGenie job {}. {}'.format(self.status, self.output_uri)) def stderr(self, iterator=False, **kwargs): """ @@ -536,15 +592,10 @@ def stderr(self, iterator=False, **kwargs): str or iterator. """ - if self.is_done: - if not self._cached_stderr: - self._cached_stderr = self._adapter.get_stderr(self._job_id, - **kwargs) - return self._cached_stderr.split('\n') if iterator \ - else self._cached_stderr - return self._adapter.get_stderr(self._job_id, - iterator=iterator, - **kwargs) + self.update_stderr() + + return self._cached_stderr.split('\n') if iterator \ + else self._cached_stderr @property def stdout_url(self):