Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Keep stdout stderr #522

Merged
merged 4 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions python/res/job_queue/ert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ def __init__(self, ert):

self.__is_cancelled = False
self.__failed = False
self._stdoutdata = ""
self._stderrdata = ""

@property
def stdoutdata(self):
""" @rtype: str """
if isinstance(self._stdoutdata, bytes):
self._stdoutdata = self._stdoutdata.decode()
return self._stdoutdata

@property
def stderrdata(self):
""" @rtype: str """
if isinstance(self._stderrdata, bytes):
self._stderrdata = self._stderrdata.decode()
return self._stderrdata

def isVerbose(self):
return self.__verbose
Expand Down Expand Up @@ -72,22 +88,25 @@ def initializeAndRun(self, argument_types, argument_values, verbose=False):
if not hasattr(self, "run"):
self.__failed = True
return "Script '%s' has not implemented a 'run' function" % self.__class__.__name__
return self.defaultStackTrace(e)
self.outputStackTrace(e)
return None
except KeyboardInterrupt:
return "Script '%s' cancelled (CTRL+C)" % self.__class__.__name__
except Exception as e:
return self.defaultStackTrace(e)
self.outputStackTrace(e)
return None
finally:
self.cleanup()


__module_count = 0 # Need to have unique modules in case of identical object naming in scripts

def defaultStackTrace(self, error):
sys.stderr.write("The script '%s' caused an error while running:\n" % self.__class__.__name__)
def outputStackTrace(self, error=None):
stack_trace = error or "".join(traceback.format_exception(*sys.exc_info()))
msg = "The script '{}' caused an error while running:\n{}"

sys.stderr.write(msg.format(self.__class__.__name__, stack_trace))
self.__failed = True
stack_trace = traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback)
return "".join(stack_trace)

@staticmethod
def loadScriptFromFile(path):
Expand Down
18 changes: 15 additions & 3 deletions python/res/job_queue/external_ert_script.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from subprocess import Popen
import sys
from subprocess import Popen, PIPE
from res.job_queue import ErtScript


Expand All @@ -13,8 +14,19 @@ def __init__(self, ert, executable):
def run(self, *args):
command = [self.__executable]
command.extend([str(arg) for arg in args])
self.__job = Popen(command)
self.__job.wait() # This should not be here?

self.__job = Popen(command,
stdout = PIPE,
stderr = PIPE)

# The job will complete before stdout and stderr is returned
self._stdoutdata, self._stderrdata = self.__job.communicate()

sys.stdout.write(self._stdoutdata)

if self.__job.returncode != 0:
raise Exception(self._stderrdata)

return None

def cancel(self):
Expand Down
14 changes: 11 additions & 3 deletions python/res/job_queue/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, src_file, job_list):
self.__running = False
self.__cancelled = False
self.__current_job = None
self.__status = {}

def __len__(self):
return self._count( )
Expand Down Expand Up @@ -58,6 +59,8 @@ def run(self, ert, verbose=False, context=None):
@type context: SubstitutionList
@rtype: bool
"""
# Reset status
self.__status = {}
self.__running = True
success = self._try_compile(context)
if not success:
Expand All @@ -72,9 +75,10 @@ def run(self, ert, verbose=False, context=None):
self.__current_job = job
if not self.__cancelled:
return_value = job.run(ert, args, verbose)

if job.hasFailed():
print(return_value)
self.__status[job.name()] = {'stdout': job.stdoutdata(),
'stderr': job.stderrdata(),
'completed': not job.hasFailed(),
'return': return_value}

self.__current_job = None
self.__running = False
Expand Down Expand Up @@ -104,6 +108,10 @@ def getLastError(self):
""" @rtype: ConfigError """
return self._get_last_error( )

def getJobsReport(self):
""" @rtype: {dict} """
return self.__status

@classmethod
def createCReference(cls, c_pointer, parent=None):
workflow = super(Workflow, cls).createCReference(c_pointer, parent)
Expand Down
7 changes: 7 additions & 0 deletions python/res/job_queue/workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ def hasFailed(self):
def free(self):
self._free( )

def stdoutdata(self):
""" @rtype: str """
return self.__script.stdoutdata

def stderrdata(self):
""" @rtype: str """
return self.__script.stderrdata

@classmethod
def createCReference(cls, c_pointer, parent=None):
Expand Down
4 changes: 4 additions & 0 deletions python/res/job_queue/workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def workflowResult(self):
""" @rtype: bool or None """
return self.__workflow_result

def workflowReport(self):
""" @rtype: {dict} """
return self.__workflow.getJobsReport()

def workflowError(self):
""" @rtype: str """
error = self.__workflow.getLastError()
Expand Down
14 changes: 14 additions & 0 deletions python/tests/res/job_queue/test_workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,24 @@ def test_run_external_job(self):
argTypes = job.argumentTypes()
self.assertEqual( argTypes , [str , str] )
self.assertIsNone(job.run(None, ["test", "text"]))
self.assertEqual(job.stdoutdata(), "Hello World\n")

with open("test", "r") as f:
self.assertEqual(f.read(), "text")

def test_error_handling_external_job(self):

with TestAreaContext("python/job_queue/workflow_job") as work_area:
WorkflowCommon.createExternalDumpJob()

config = self._alloc_config()
job = self._alloc_from_file("DUMP", config, "dump_failing_job")

self.assertFalse(job.isInternal())
argTypes = job.argumentTypes()
self.assertIsNone(job.run(None, []))
self.assertTrue(job.stderrdata().startswith('Traceback'))


def test_run_internal_script(self):
with TestAreaContext("python/job_queue/workflow_job") as work_area:
Expand Down
11 changes: 11 additions & 0 deletions python/tests/res/job_queue/workflow_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,27 @@ def createExternalDumpJob():
f.write("MAX_ARG 2\n")
f.write("ARG_TYPE 0 STRING\n")

with open("dump_failing_job", "w") as f:
f.write("INTERNAL FALSE\n")
f.write("EXECUTABLE dump_failing.py\n")

with open("dump.py", "w") as f:
f.write("#!/usr/bin/env python\n")
f.write("import sys\n")
f.write("f = open('%s' % sys.argv[1], 'w')\n")
f.write("f.write('%s' % sys.argv[2])\n")
f.write("f.close()\n")
f.write("print(\"Hello World\")")

with open("dump_failing.py", "w") as f:
f.write("#!/usr/bin/env python\n")
f.write("print(\"Hello Failing\")\n")
f.write("raise Exception")

st = os.stat("dump.py")
os.chmod("dump.py", st.st_mode | stat.S_IEXEC) # | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
st = os.stat("dump_failing.py")
os.chmod("dump_failing.py", st.st_mode | stat.S_IEXEC)

with open("dump_workflow", "w") as f:
f.write("DUMP dump1 dump_text_1\n")
Expand Down