/
utils.py
407 lines (315 loc) · 12.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# -*- coding: utf-8 -*-
"""Utility belt for working with ``pyang`` and ``pyangext``."""
import io
import logging
from os.path import isfile
from warnings import warn
from six import StringIO
from pyang import Context, FileRepository
from pyang.error import err_level, err_to_str, error_codes
from pyang.translators import yang
from pyang.yang_parser import YangParser
from .definitions import PREFIX_SEPARATOR
__all__ = [
'create_context',
'compare_prefixed',
'qualify_str',
'select',
'find',
'dump',
'check',
'parse',
'walk',
]
logging.basicConfig(level=logging.INFO)
logging.captureWarnings(True)
LOGGER = logging.getLogger(__name__)
DEFAULT_OPTIONS = {
'path': [],
'deviations': [],
'features': [],
'format': 'yang',
'keep_comments': True,
'no_path_recurse': False,
'trim_yin': False,
'yang_canonical': False,
'yang_remove_unused_imports': False,
# -- errors
'ignore_error_tags': [],
'ignore_errors': [],
'list_errors': True,
'print_error_code': False,
'errors': [],
'warnings': [code for code, desc in error_codes.items() if desc[0] > 4],
'verbose': True,
}
"""Default options for pyang command line"""
_COPY_OPTIONS = [
'canonical',
'max_line_len',
'max_identifier_len',
'trim_yin',
'lax_xpath_checks',
'strict',
]
"""copy options to pyang context options"""
class objectify(object): # pylint: disable=invalid-name
"""Utility for providing object access syntax (.attr) to dicts"""
def __init__(self, *args, **kwargs):
for entry in args:
self.__dict__.update(entry)
self.__dict__.update(kwargs)
def __getattr__(self, _):
return None
def __setattr__(self, attr, value):
self.__dict__[attr] = value
def _parse_features_string(feature_str):
if feature_str.find(':') == -1:
return (feature_str, [])
[module_name, rest] = feature_str.split(':', 1)
if rest == '':
return (module_name, [])
features = rest.split(',')
return (module_name, features)
def create_context(path='.', *options, **kwargs):
"""Generates a pyang context.
The dict options and keyword arguments are similar to the command
line options for ``pyang``. For ``plugindir`` use env var
``PYANG_PLUGINPATH``. For ``path`` option use the argument with the
same name, or ``PYANG_MODPATH`` env var.
Arguments:
path (str): location of YANG modules.
(Join string with ``os.pathsep`` for multiple locations).
Default is the current working dir.
*options: list of dicts, with options to be passed to context.
See bellow.
**kwargs: similar to ``options`` but have a higher precedence.
See bellow.
Keyword Arguments:
print_error_code (bool): On errors, print the error code instead
of the error message. Default ``False``.
warnings (list): If contains ``error``, treat all warnings
as errors, except any other error code in the list.
If contains ``none``, do not report any warning.
errors (list): Treat each error code container as an error.
ignore_error_tags (list): Ignore error code.
(For a list of error codes see ``pyang --list-errors``).
ignore_errors (bool): Ignore all errors. Default ``False``.
canonical (bool): Validate the module(s) according to the
canonical YANG order. Default ``False``.
yang_canonical (bool): Print YANG statements according to the
canonical order. Default ``False``.
yang_remove_unused_imports (bool): Remove unused import statements
when printing YANG. Default ``False``.
trim_yin (bool): In YIN input modules, trim whitespace
in textual arguments. Default ``False``.
lax_xpath_checks (bool): Lax check of XPath expressions.
Default ``False``.
strict (bool): Force strict YANG compliance. Default ``False``.
max_line_len (int): Maximum line length allowed. Disabled by default.
max_identifier_len (int): Maximum identifier length allowed.
Disabled by default.
features (list): Features to support, default all.
Format ``<modname>:[<feature>,]*``.
keep_comments (bool): Do not discard comments. Default ``True``.
no_path_recurse (bool): Do not recurse into directories
in the yang path. Default ``False``.
Returns:
pyang.Context: Context object for ``pyang`` usage
"""
# deviations (list): Deviation module (NOT CURRENTLY WORKING).
opts = objectify(DEFAULT_OPTIONS, *options, **kwargs)
repo = FileRepository(path, no_path_recurse=opts.no_path_recurse)
ctx = Context(repo)
ctx.opts = opts
for attr in _COPY_OPTIONS:
setattr(ctx, attr, getattr(opts, attr))
# make a map of features to support, per module (taken from pyang bin)
for feature_name in opts.features:
(module_name, features) = _parse_features_string(feature_name)
ctx.features[module_name] = features
# apply deviations (taken from pyang bin)
for file_name in opts.deviations:
with io.open(file_name, "r", encoding="utf-8") as fd:
module = ctx.add_module(file_name, fd.read())
if module is not None:
ctx.deviation_modules.append(module)
return ctx
def qualify_str(arg, prefix_sep=PREFIX_SEPARATOR):
"""Transform prefixed strings in tuple ``(prefix, string)``"""
response = arg if isinstance(arg, tuple) else tuple(arg.split(prefix_sep))
if len(response) == 2:
return response
return ('', response[0])
def compare_prefixed(arg1, arg2,
prefix_sep=PREFIX_SEPARATOR, ignore_prefix=False):
"""Compare 2 arguments : prefixed strings or tuple ``(prefix, string)``
Arguments:
arg1 (str or tuple): first argument
arg2 (str or tuple): first argument
prefix_sep (str): prefix string separator (default: ``':'``)
Returns:
bool
"""
cmp1 = qualify_str(arg1, prefix_sep=prefix_sep)
cmp2 = qualify_str(arg2, prefix_sep=prefix_sep)
if ignore_prefix:
return cmp1[-1:] == cmp2[-1:]
return cmp1 == cmp2
def select(statements, keyword=None, arg=None, ignore_prefix=False):
"""Given a list of statements filter by keyword, or argument or both.
Arguments:
statements (list of pyang.statements.Statement):
list of statements to be filtered.
keyword (str): if specified the statements should have this keyword
arg (str): if specified the statements should have this argument
``keyword`` and ``arg`` can be also used as keyword arguments.
Returns:
list: nodes that matches the conditions
"""
response = []
for item in statements:
if (keyword and keyword != item.keyword and
not compare_prefixed(
keyword, item.raw_keyword, ignore_prefix=ignore_prefix)):
continue
if (arg and arg != item.arg and
not compare_prefixed(
arg, item.arg, ignore_prefix=ignore_prefix)):
continue
response.append(item)
return response
def find(parent, keyword=None, arg=None, ignore_prefix=False):
"""Select all sub-statements by keyword, or argument or both.
See Also:
function :func:`select`
"""
return select(parent.substmts, keyword, arg, ignore_prefix)
def walk(parent, select=lambda x: x, apply=lambda x: x, key='substmts'):
# pylint: disable=redefined-builtin,redefined-outer-name
"""Recursivelly find nodes and/or apply a function to them.
Arguments:
parent (pyang.statements.Statement): root of the subtree were
the search will take place.
select: optional callable that receives a node and returns a bool
(True if the node matches the criteria)
apply: optional callable that are going to be applied to the node
if it matches the criteria
key (str): property where the children nodes are stored,
default is ``substmts``
Returns:
list: results collected from the apply function
"""
results = []
if select(parent):
results.append(apply(parent))
if hasattr(parent, key):
children = getattr(parent, key)
for child in children:
results.extend(walk(child, select, apply, key))
return results
def dump(node, file_obj=None, prev_indent='', indent_string=' ', ctx=None):
"""Generate a string representation of an abstract syntax tree.
Arguments:
node (pyang.statements.Statement): object to be represented
file_obj (file): *file-like* object where the representation
will be dumped. If nothing is passed, the method returns
a string
Keyword Arguments:
prev_indent (str): string to be added to the produced indentation
indent_string (str): string to be used as indentation
ctx (pyang.Context): context object used to generate string
representation. If no context is passed, a dummy object
is used with default configuration
Returns:
str: text content if ``file_obj`` is not specified
"""
# create a buffer to allow string return if no file_obj given
_file_obj = file_obj or StringIO()
# process AST
yang.emit_stmt(
ctx or create_context(), node, _file_obj, 1, None,
prev_indent, indent_string)
# one-liners <3: if no file_obj get buffer content and close it!
return file_obj or (_file_obj.getvalue(), _file_obj.close())[0]
def check(ctx, rescue=False):
"""Check existence of errors or warnings in context.
Code mostly borrowed from ``pyang`` script.
Arguments:
ctx (pyang.Context): pyang context to be checked.
Keyword Arguments:
rescue (bool): if ``True``, no exception/warning will be raised.
Raises:
SyntaxError: if errors detected
Warns:
SyntaxWarning: if warnings detected
Returns:
tuple: (list of errors, list of warnings), if ``rescue`` is ``True``
"""
errors = []
warnings = []
opts = ctx.opts
if opts.ignore_errors:
return (errors, warnings)
for (epos, etag, eargs) in ctx.errors:
if (hasattr(opts, 'ignore_error_tags') and
etag in opts.ignore_error_tags):
continue
if not ctx.implicit_errors and hasattr(epos.top, 'i_modulename'):
# this module was added implicitly (by import); skip this error
# the code includes submodules
continue
elevel = err_level(etag) # elevel 4 -> warning
explain = err_to_str(etag, eargs)
reason = etag if opts.print_error_code else explain
if 'unexpected keyword "description"' in reason:
# TODO: WTF pyang bug??
elevel = 4
message = '({}) {}'.format(str(epos), reason)
if (elevel >= 4 or etag in opts.warnings) and etag not in opts.errors:
if 'error' in opts.warnings and etag not in opts.warnings:
pass
elif 'none' in opts.warnings:
continue
else:
warnings.append(message)
continue
errors.append(message)
if rescue:
return (errors, warnings)
if warnings:
for message in warnings:
warn(message, SyntaxWarning)
if errors:
raise SyntaxError('\n'.join(errors))
return (errors, warnings)
def parse(text, ctx=None):
"""Parse a YANG statement into an Abstract Syntax subtree.
Arguments:
text (str): file name for a YANG module or text
ctx (optional pyang.Context): context used to validate text
Returns:
pyang.statements.Statement: Abstract syntax subtree
Note:
The ``parse`` function can be used to parse small amounts of text.
If yout plan to parse an entire YANG (sub)module, please use instead::
ast = ctx.add_module(module_name, text_contet)
It is also well known that ``parse`` function cannot solve
YANG deviations yet.
"""
parser = YangParser()
filename = 'parser-input'
ctx_ = ctx or create_context()
if isfile(text):
filename = text
with open(filename, 'r') as fp:
text = fp.read()
# ensure reported errors are just from parsing
old_errors = ctx_.errors
ctx_.errors = []
ast = parser.parse(ctx_, filename, text)
# look for errors and warnings
check(ctx_)
# restore other errors
ctx_.errors = old_errors
return ast