-
Notifications
You must be signed in to change notification settings - Fork 260
/
linter.py
242 lines (199 loc) · 8.62 KB
/
linter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# Copyright (C) 2016 Adrien Vergé
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import yaml
from yamllint import parser
PROBLEM_LEVELS = {
0: None,
1: 'warning',
2: 'error',
None: 0,
'warning': 1,
'error': 2,
}
class LintProblem(object):
"""Represents a linting problem found by yamllint."""
def __init__(self, line, column, desc='<no description>', rule=None):
#: Line on which the problem was found (starting at 1)
self.line = line
#: Column on which the problem was found (starting at 1)
self.column = column
#: Human-readable description of the problem
self.desc = desc
#: Identifier of the rule that detected the problem
self.rule = rule
self.level = None
@property
def message(self):
if self.rule is not None:
return '{} ({})'.format(self.desc, self.rule)
return self.desc
def __eq__(self, other):
return (self.line == other.line and
self.column == other.column and
self.rule == other.rule)
def __lt__(self, other):
return (self.line < other.line or
(self.line == other.line and self.column < other.column))
def __repr__(self):
return '%d:%d: %s' % (self.line, self.column, self.message)
def get_cosmetic_problems(buffer, conf, filepath):
rules = conf.enabled_rules(filepath)
# Split token rules from line rules
token_rules = [r for r in rules if r.TYPE == 'token']
comment_rules = [r for r in rules if r.TYPE == 'comment']
line_rules = [r for r in rules if r.TYPE == 'line']
context = {}
for rule in token_rules:
context[rule.ID] = {}
class DisableDirective:
def __init__(self):
self.rules = set()
self.all_rules = {r.ID for r in rules}
def process_comment(self, comment):
try:
comment = str(comment)
except UnicodeError:
return # this certainly wasn't a yamllint directive comment
if re.match(r'^# yamllint disable( rule:\S+)*\s*$', comment):
items = comment[18:].rstrip().split(' ')
rules = [item[5:] for item in items][1:]
if len(rules) == 0:
self.rules = self.all_rules.copy()
else:
for id in rules:
if id in self.all_rules:
self.rules.add(id)
elif re.match(r'^# yamllint enable( rule:\S+)*\s*$', comment):
items = comment[17:].rstrip().split(' ')
rules = [item[5:] for item in items][1:]
if len(rules) == 0:
self.rules.clear()
else:
for id in rules:
self.rules.discard(id)
def is_disabled_by_directive(self, problem):
return problem.rule in self.rules
class DisableLineDirective(DisableDirective):
def process_comment(self, comment):
try:
comment = str(comment)
except UnicodeError:
return # this certainly wasn't a yamllint directive comment
if re.match(r'^# yamllint disable-line( rule:\S+)*\s*$', comment):
items = comment[23:].rstrip().split(' ')
rules = [item[5:] for item in items][1:]
if len(rules) == 0:
self.rules = self.all_rules.copy()
else:
for id in rules:
if id in self.all_rules:
self.rules.add(id)
# Use a cache to store problems and flush it only when a end of line is
# found. This allows the use of yamllint directive to disable some rules on
# some lines.
cache = []
disabled = DisableDirective()
disabled_for_line = DisableLineDirective()
disabled_for_next_line = DisableLineDirective()
for elem in parser.token_or_comment_or_line_generator(buffer):
if isinstance(elem, parser.Token):
for rule in token_rules:
rule_conf = conf.rules[rule.ID]
for problem in rule.check(rule_conf,
elem.curr, elem.prev, elem.next,
elem.nextnext,
context[rule.ID]):
problem.rule = rule.ID
problem.level = rule_conf['level']
cache.append(problem)
elif isinstance(elem, parser.Comment):
for rule in comment_rules:
rule_conf = conf.rules[rule.ID]
for problem in rule.check(rule_conf, elem):
problem.rule = rule.ID
problem.level = rule_conf['level']
cache.append(problem)
disabled.process_comment(elem)
if elem.is_inline():
disabled_for_line.process_comment(elem)
else:
disabled_for_next_line.process_comment(elem)
elif isinstance(elem, parser.Line):
for rule in line_rules:
rule_conf = conf.rules[rule.ID]
for problem in rule.check(rule_conf, elem):
problem.rule = rule.ID
problem.level = rule_conf['level']
cache.append(problem)
# This is the last token/comment/line of this line, let's flush the
# problems found (but filter them according to the directives)
for problem in cache:
if not (disabled_for_line.is_disabled_by_directive(problem) or
disabled.is_disabled_by_directive(problem)):
yield problem
disabled_for_line = disabled_for_next_line
disabled_for_next_line = DisableLineDirective()
cache = []
def get_syntax_error(buffer):
try:
list(yaml.parse(buffer, Loader=yaml.BaseLoader))
except yaml.error.MarkedYAMLError as e:
problem = LintProblem(e.problem_mark.line + 1,
e.problem_mark.column + 1,
'syntax error: ' + e.problem + ' (syntax)')
problem.level = 'error'
return problem
def _run(buffer, conf, filepath):
assert hasattr(buffer, '__getitem__'), \
'_run() argument must be a buffer, not a stream'
first_line = next(parser.line_generator(buffer)).content
if re.match(r'^#\s*yamllint disable-file\s*$', first_line):
return
# If the document contains a syntax error, save it and yield it at the
# right line
syntax_error = get_syntax_error(buffer)
for problem in get_cosmetic_problems(buffer, conf, filepath):
# Insert the syntax error (if any) at the right place...
if (syntax_error and syntax_error.line <= problem.line and
syntax_error.column <= problem.column):
yield syntax_error
# If there is already a yamllint error at the same place, discard
# it as it is probably redundant (and maybe it's just a 'warning',
# in which case the script won't even exit with a failure status).
if (syntax_error.line == problem.line and
syntax_error.column == problem.column):
syntax_error = None
continue
syntax_error = None
yield problem
if syntax_error:
yield syntax_error
def run(input, conf, filepath=None):
"""Lints a YAML source.
Returns a generator of LintProblem objects.
:param input: buffer, string or stream to read from
:param conf: yamllint configuration object
"""
if conf.is_file_ignored(filepath):
return ()
if isinstance(input, (bytes, str)):
return _run(input, conf, filepath)
elif hasattr(input, 'read'): # Python 2's file or Python 3's io.IOBase
# We need to have everything in memory to parse correctly
content = input.read()
return _run(content, conf, filepath)
else:
raise TypeError('input should be a string or a stream')