/
processor_local.py
275 lines (216 loc) · 10.7 KB
/
processor_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#
# Copyright 2018-2021 Elyra Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import papermill
import time
from abc import ABC, abstractmethod
from elyra.pipeline import PipelineProcessor, PipelineProcessorResponse, Operation
from elyra.util.path import get_absolute_path
from jupyter_server.gateway.managers import GatewayClient
from subprocess import run, CalledProcessError, PIPE
from traitlets import log
from typing import Dict, List
class LocalPipelineProcessor(PipelineProcessor):
"""
Local pipeline processor runs a pipeline locally. The scope of this runtime is
simply to automate the execution of multiple notebooks from a pipeline as if they
were being executed manually. Any additional support is out of the scope of this
processor. If a notebook doesn't run using `run all` it will fail in the
same way when processed by this processor. Also, any relationship or specific capabilities
associated with a particular runtime is not supported by local mode and any additional properties
other then the specific file to be executed is ignored by the local processor.
Note: Execution happens in-place and a ledger of runs will be available at $TMPFILE/elyra/pipeline-name-<timestamp>
"""
_operation_processor_registry: Dict
_type = 'local'
def __init__(self, root_dir, **kwargs):
super().__init__(root_dir, **kwargs)
notebook_op_processor = NotebookOperationProcessor(self.root_dir)
python_op_processor = PythonScriptOperationProcessor(self.root_dir)
r_op_processor = RScriptOperationProcessor(self.root_dir)
self._operation_processor_registry = {
notebook_op_processor.operation_name: notebook_op_processor,
python_op_processor.operation_name: python_op_processor,
r_op_processor.operation_name: r_op_processor,
}
@property
def type(self):
return self._type
def process(self, pipeline):
"""
Process a pipeline locally.
The pipeline execution consists on properly ordering the operations
based on it's dependency graph and than delegating the execution
to proper executor (e.g. papermill to notebooks)
"""
self.log_pipeline_info(pipeline.name, "processing pipeline")
t0_all = time.time()
# Sort operations based on dependency graph (topological order)
operations = PipelineProcessor._sort_operations(pipeline.operations)
for operation in operations:
try:
t0 = time.time()
operation_processor = self._operation_processor_registry[operation.classifier]
operation_processor.process(operation)
self.log_pipeline_info(pipeline.name, f"completed {operation.filename}",
operation_name=operation.name,
duration=(time.time() - t0))
except Exception as ex:
raise RuntimeError(f'Error processing operation {operation.name} {str(ex)}') from ex
self.log_pipeline_info(pipeline.name, "pipeline processed", duration=(time.time() - t0_all))
return LocalPipelineProcessorResponse()
def export(self, pipeline, pipeline_export_format, pipeline_export_path, overwrite):
raise NotImplementedError('Local pipelines does not support export functionality')
class LocalPipelineProcessorResponse(PipelineProcessorResponse):
_type = 'local'
def __init__(self):
super().__init__('', '', '')
@property
def type(self):
return self._type
class OperationProcessor(ABC):
_operation_name: str = None
def __init__(self):
self.log = log.get_logger()
@property
def operation_name(self) -> str:
return self._operation_name
@abstractmethod
def process(self, operation: Operation):
raise NotImplementedError
@staticmethod
def _collect_envs(operation: Operation) -> Dict:
envs = os.environ.copy() # Make sure this process's env is "available" in the kernel subprocess
envs['ELYRA_RUNTIME_ENV'] = "local" # Special case
envs.update(operation.env_vars_as_dict())
return envs
class FileOperationProcessor(OperationProcessor):
MAX_ERROR_LEN: int = 80
def __init__(self, root_dir: str):
super().__init__()
self._root_dir = root_dir
@property
def operation_name(self) -> str:
return self._operation_name
@abstractmethod
def process(self, operation: Operation):
raise NotImplementedError
def get_valid_filepath(self, op_filename: str) -> str:
filepath = get_absolute_path(self._root_dir, op_filename)
if not os.path.exists(filepath):
raise FileNotFoundError(f'Could not find {filepath}')
if not os.path.isfile(filepath):
raise ValueError(f'Not a file: {filepath}')
return filepath
def log_and_raise(self, file_name: str, ex: Exception) -> None:
"""Log and raise the exception that occurs when processing file_name.
If the exception's message is longer than MAX_ERROR_LEN, it will be
truncated with an ellipses (...) when raised. The complete message
will be logged.
"""
self.log.error(f'Error executing {file_name}: {str(ex)}')
truncated_msg = FileOperationProcessor._truncate_msg(str(ex))
raise RuntimeError(f'({file_name}): {truncated_msg}') from ex
@staticmethod
def _truncate_msg(msg: str) -> str:
"""Truncates the msg string to be less that MAX_ERROR_LEN.
If msg is longer than MAX_ERROR_LEN, the first space is found from the right,
then ellipses (...) are appended to that location so that they don't appear
in the middle of a word. As a result, the truncation could result in lengths
less than the max+2.
"""
if len(msg) < FileOperationProcessor.MAX_ERROR_LEN:
return msg
# locate the first whitespace from the 80th character and truncate from there
last_space = msg.rfind(' ', 0, FileOperationProcessor.MAX_ERROR_LEN)
if last_space >= 0:
return msg[:last_space] + "..."
return msg[:FileOperationProcessor.MAX_ERROR_LEN]
class NotebookOperationProcessor(FileOperationProcessor):
_operation_name = 'execute-notebook-node'
def process(self, operation: Operation):
filepath = self.get_valid_filepath(operation.filename)
file_dir = os.path.dirname(filepath)
file_name = os.path.basename(filepath)
self.log.debug(f'Processing notebook: {filepath}')
# We'll always use the ElyraEngine. This engine is essentially the default Papermill engine
# but allows for environment variables to be passed to the kernel process (via 'kernel_env').
# If the current notebook server is running with Enterprise Gateway configured, we will also
# point the 'kernel_manager_class' to GatewayKernelManager so that notebooks run as they
# would outside of Elyra. Current working directory (cwd) is specified both for where papermill
# runs the notebook (cwd) and where the directory of the kernel process (kernel_cwd). The latter
# of which is important when EG is configured.
additional_kwargs = dict()
additional_kwargs['engine_name'] = "ElyraEngine"
additional_kwargs['cwd'] = file_dir # For local operations, papermill runs from this dir
additional_kwargs['kernel_cwd'] = file_dir
additional_kwargs['kernel_env'] = OperationProcessor._collect_envs(operation)
if GatewayClient.instance().gateway_enabled:
additional_kwargs['kernel_manager_class'] = 'jupyter_server.gateway.managers.GatewayKernelManager'
t0 = time.time()
try:
papermill.execute_notebook(
filepath,
filepath,
**additional_kwargs
)
except papermill.PapermillExecutionError as pmee:
self.log.error(f'Error executing {file_name} in cell {pmee.exec_count}: ' +
f'{str(pmee.ename)} {str(pmee.evalue)}')
raise RuntimeError(f'({file_name}) in cell {pmee.exec_count}: ' +
f'{str(pmee.ename)} {str(pmee.evalue)}') from pmee
except Exception as ex:
self.log_and_raise(file_name, ex)
t1 = time.time()
duration = (t1 - t0)
self.log.debug(f'Execution of {file_name} took {duration:.3f} secs.')
class ScriptOperationProcessor(FileOperationProcessor):
_script_type: str = None
def get_argv(self, filepath) -> List[str]:
raise NotImplementedError
def process(self, operation: Operation):
filepath = self.get_valid_filepath(operation.filename)
file_dir = os.path.dirname(filepath)
file_name = os.path.basename(filepath)
self.log.debug(f'Processing {self._script_type} script: {filepath}')
argv = self.get_argv(filepath)
envs = OperationProcessor._collect_envs(operation)
t0 = time.time()
try:
run(argv, cwd=file_dir, env=envs, check=True, stderr=PIPE)
except CalledProcessError as cpe:
error_msg = str(cpe.stderr.decode())
self.log.error(f'Error executing {file_name}: {error_msg}')
error_trim_index = error_msg.rfind('\n', 0, error_msg.rfind('Error'))
if error_trim_index != -1:
raise RuntimeError(f'({file_name}): {error_msg[error_trim_index:].strip()}') from cpe
else:
raise RuntimeError(f'({file_name})') from cpe
except Exception as ex:
self.log_and_raise(file_name, ex)
t1 = time.time()
duration = (t1 - t0)
self.log.debug(f'Execution of {file_name} took {duration:.3f} secs.')
class PythonScriptOperationProcessor(ScriptOperationProcessor):
_operation_name = 'execute-python-node'
_script_type = 'Python'
def get_argv(self, file_path) -> List[str]:
return ['python3', file_path, '--PYTHONHOME', os.path.dirname(file_path)]
class RScriptOperationProcessor(ScriptOperationProcessor):
_operation_name = 'execute-r-node'
_script_type = 'R'
def get_argv(self, file_path) -> List[str]:
return ['Rscript', file_path]