Skip to content

Commit

Permalink
Refactor script processors, include brief detail on generic errors (#…
Browse files Browse the repository at this point in the history
…1485)

It introduces a new base class named ScriptOperationProcessor
from which the existingPythonScriptOperationProcessor and
RScriptOperationProcessor classes now derive. This base class
contains 90% of the applicable code with the subclasses providing
their name and argument vectors.

It introduces a log_and_raise() method on the base 
FIleOperationProcessor class that is available to all file-based
operations. Building on the work done in #1411, this method
checks the length of the error message and truncates it
to around the max (80), replacing overflow with ellipses (...).

Adds a test that removes the kernel metadata from a notebook
node and ensures the appropriate error is raised.
  • Loading branch information
kevin-bates committed Mar 29, 2021
1 parent 2421627 commit 9132c16
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 58 deletions.
93 changes: 53 additions & 40 deletions elyra/pipeline/processor_local.py
Expand Up @@ -23,7 +23,7 @@
from jupyter_server.gateway.managers import GatewayClient
from subprocess import run, CalledProcessError, PIPE
from traitlets import log
from typing import Dict
from typing import Dict, List


class LocalPipelineProcessor(PipelineProcessor):
Expand All @@ -42,7 +42,7 @@ class LocalPipelineProcessor(PipelineProcessor):
_type = 'local'

def __init__(self, root_dir, **kwargs):
super(LocalPipelineProcessor, self).__init__(root_dir, **kwargs)
super().__init__(root_dir, **kwargs)
notebook_op_processor = NotebookOperationProcessor(self.root_dir)
python_op_processor = PythonScriptOperationProcessor(self.root_dir)
r_op_processor = RScriptOperationProcessor(self.root_dir)
Expand Down Expand Up @@ -118,8 +118,10 @@ def process(self, operation: Operation):

class FileOperationProcessor(OperationProcessor):

MAX_ERROR_LEN: int = 80

def __init__(self, root_dir: str):
super(FileOperationProcessor, self).__init__()
super().__init__()
self._root_dir = root_dir

@property
Expand All @@ -130,21 +132,46 @@ def operation_name(self) -> str:
def process(self, operation: Operation):
raise NotImplementedError

def get_valid_filepath(self, op_filename):
def get_valid_filepath(self, op_filename: str) -> str:
filepath = get_absolute_path(self._root_dir, op_filename)
if not os.path.exists(filepath):
raise FileNotFoundError(f'Could not find {filepath}')
if not os.path.isfile(filepath):
raise ValueError(f'Not a file: {filepath}')
return filepath

def log_and_raise(self, file_name: str, ex: Exception) -> None:
"""Log and raise the exception that occurs when processing file_name.
If the exception's message is longer than MAX_ERROR_LEN, it will be
truncated with an ellipses (...) when raised. The complete message
will be logged.
"""
self.log.error(f'Error executing {file_name}: {str(ex)}')
truncated_msg = FileOperationProcessor._truncate_msg(str(ex))
raise RuntimeError(f'({file_name}): {truncated_msg}') from ex

@staticmethod
def _truncate_msg(msg: str) -> str:
"""Truncates the msg string to be less that MAX_ERROR_LEN.
If msg is longer than MAX_ERROR_LEN, the first space is found from the right,
then ellipses (...) are appended to that location so that they don't appear
in the middle of a word. As a result, the truncation could result in lengths
less than the max+2.
"""
if len(msg) < FileOperationProcessor.MAX_ERROR_LEN:
return msg
# locate the first whitespace from the 80th character and truncate from there
last_space = msg.rfind(' ', 0, FileOperationProcessor.MAX_ERROR_LEN)
if last_space >= 0:
return msg[:last_space] + "..."
return msg[:FileOperationProcessor.MAX_ERROR_LEN]


class NotebookOperationProcessor(FileOperationProcessor):
_operation_name = 'execute-notebook-node'

def __init__(self, root_dir: str):
super(NotebookOperationProcessor, self).__init__(root_dir)

def process(self, operation: Operation):
filepath = self.get_valid_filepath(operation.filename)

Expand Down Expand Up @@ -183,29 +210,29 @@ def process(self, operation: Operation):
raise RuntimeError(f'({file_name}) in cell {pmee.exec_count}: ' +
f'{str(pmee.ename)} {str(pmee.evalue)}') from pmee
except Exception as ex:
self.log.error(f'Error executing {file_name}: {str(ex)}')
raise RuntimeError(f'({file_name})') from ex
self.log_and_raise(file_name, ex)

t1 = time.time()
duration = (t1 - t0)
self.log.debug(f'Execution of {file_name} took {duration:.3f} secs.')


class PythonScriptOperationProcessor(FileOperationProcessor):
_operation_name = 'execute-python-node'
class ScriptOperationProcessor(FileOperationProcessor):

_script_type: str = None

def __init__(self, root_dir):
super(PythonScriptOperationProcessor, self).__init__(root_dir)
def get_argv(self, filepath) -> List[str]:
raise NotImplementedError

def process(self, operation: Operation):
filepath = self.get_valid_filepath(operation.filename)

file_dir = os.path.dirname(filepath)
file_name = os.path.basename(filepath)

self.log.debug(f'Processing python script: {filepath}')
self.log.debug(f'Processing {self._script_type} script: {filepath}')

argv = ['python3', filepath, '--PYTHONHOME', file_dir]
argv = self.get_argv(filepath)

envs = os.environ.copy() # Make sure this process's env is "available" in subprocess
envs.update(operation.env_vars_as_dict())
Expand All @@ -222,38 +249,24 @@ def process(self, operation: Operation):
else:
raise RuntimeError(f'({file_name})') from cpe
except Exception as ex:
self.log.error(f'Error executing {file_name}: {str(ex)}')
raise RuntimeError(f'({file_name})') from ex
self.log_and_raise(file_name, ex)

t1 = time.time()
duration = (t1 - t0)
self.log.debug(f'Execution of {file_name} took {duration:.3f} secs.')


class RScriptOperationProcessor(FileOperationProcessor):
_operation_name = 'execute-r-node'

def __init__(self, root_dir):
super(RScriptOperationProcessor, self).__init__(root_dir)

def process(self, operation: Operation):
filepath = self.get_valid_filepath(operation.filename)

file_dir = os.path.dirname(filepath)
file_name = os.path.basename(filepath)
class PythonScriptOperationProcessor(ScriptOperationProcessor):
_operation_name = 'execute-python-node'
_script_type = 'Python'

self.log.debug(f'Processing R script: {filepath}')
def get_argv(self, file_path) -> List[str]:
return ['python3', file_path, '--PYTHONHOME', os.path.dirname(file_path)]

argv = ['Rscript', filepath]

envs = os.environ.copy() # Make sure this process's env is "available" in subprocess
envs.update(operation.env_vars_as_dict())
t0 = time.time()
try:
run(argv, cwd=file_dir, env=envs, check=True)
except Exception as ex:
raise RuntimeError(f'Internal error executing {filepath}: {ex}') from ex
class RScriptOperationProcessor(ScriptOperationProcessor):
_operation_name = 'execute-r-node'
_script_type = 'R'

t1 = time.time()
duration = (t1 - t0)
self.log.debug(f'Execution of {file_name} took {duration:.3f} secs.')
def get_argv(self, file_path) -> List[str]:
return ['Rscript', file_path]
18 changes: 0 additions & 18 deletions elyra/pipeline/tests/resources/node_util/node.ipynb
Expand Up @@ -11,24 +11,6 @@
"- `OUTPUT_FILENAMES`: (Optional) A SEMI-COLON-separated list of filenames. Each entry can include a _relative_ path as a prefix to the filename. Each file is NOT expected to exist, but will be created as a function of the notebook's execution."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip3 install --upgrade pygithub"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from github import Github"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
24 changes: 24 additions & 0 deletions elyra/pipeline/tests/test_pipeline_processor_local.py
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import nbformat
import os
import pytest

Expand Down Expand Up @@ -93,6 +94,29 @@ def test_pipeline_execution(pipeline_dir):
assert os.path.exists(os.path.join(pipeline_dir, output))


def test_pipeline_execution_missing_kernelspec(pipeline_dir):
# Construct 4-node pipeline consisting of 3 notebooks and 1 python script.
# This pipeline is "diamond shaped" with node1 feeding nodes 2 and 3, each then
# feeding node4.
node1 = NotebookNode("node1", num_outputs=2)
node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1])
node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1])
node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
nodes = [node1, node2, node3, node4]

pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)

node1nb_file = os.path.join(pipeline_dir, pipeline.operations[node1.id].filename)
nb = nbformat.read(node1nb_file, 4)
nb['metadata'].pop('kernelspec')
nbformat.write(nb, node1nb_file)

with pytest.raises(RuntimeError) as e:
LocalPipelineProcessor(pipeline_dir).process(pipeline)
assert 'Error processing operation node1 (node1.ipynb): No kernel ' \
'name found in notebook and no override provided.' in str(e.value)


def test_pipeline_execution_bad_notebook(pipeline_dir):
# Construct 4-node pipeline where node 3 (nodebook) produces a failure
node1 = NotebookNode("node1", num_outputs=2)
Expand Down

0 comments on commit 9132c16

Please sign in to comment.