-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Collectors.py
316 lines (269 loc) · 12.7 KB
/
Collectors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import functools
import os
import pkg_resources
import itertools
from pyprint.NullPrinter import NullPrinter
from coalib.bears.BEAR_KIND import BEAR_KIND
from coalib.collecting.Importers import iimport_objects
from coala_utils.decorators import yield_once
from coalib.output.printers.LOG_LEVEL import LOG_LEVEL
from coalib.parsing.Globbing import fnmatch, iglob, glob_escape
from coalib.output.printers.LogPrinter import LogPrinter
def _get_kind(bear_class):
try:
return bear_class.kind()
except NotImplementedError:
return None
def _import_bears(file_path, kinds):
# recursive imports:
for bear_list in iimport_objects(file_path,
names='__additional_bears__',
types=list):
for bear_class in bear_list:
if _get_kind(bear_class) in kinds:
yield bear_class
# normal import
for bear_class in iimport_objects(file_path,
attributes='kind',
local=True):
if _get_kind(bear_class) in kinds:
yield bear_class
@yield_once
def icollect(file_paths, ignored_globs=None, match_cache={}):
"""
Evaluate globs in file paths and return all matching files.
:param file_paths: File path or list of such that can include globs
:param ignored_globs: List of globs to ignore when matching files
:param match_cache: Dictionary to use for caching results
:return: Iterator that yields tuple of path of a matching
file, the glob where it was found
"""
if isinstance(file_paths, str):
file_paths = [file_paths]
for file_path in file_paths:
if file_path not in match_cache:
match_cache[file_path] = list(iglob(file_path))
for match in match_cache[file_path]:
if not ignored_globs or not fnmatch(match, ignored_globs):
yield match, file_path
def collect_files(file_paths, log_printer, ignored_file_paths=None,
limit_file_paths=None, section_name=''):
"""
Evaluate globs in file paths and return all matching files
:param file_paths: File path or list of such that can include globs
:param ignored_file_paths: List of globs that match to-be-ignored files
:param limit_file_paths: List of globs that the files are limited to
:param section_name: Name of currently executing section
:return: List of paths of all matching files
"""
limit_fnmatch = (functools.partial(fnmatch, globs=limit_file_paths)
if limit_file_paths else lambda fname: True)
valid_files = list(filter(lambda fname: os.path.isfile(fname[0]),
icollect(file_paths, ignored_file_paths)))
# Find globs that gave no files and warn the user
if valid_files:
collected_files, file_globs_with_files = zip(*valid_files)
else:
collected_files, file_globs_with_files = [], []
_warn_if_unused_glob(log_printer, file_paths, file_globs_with_files,
'No files matching \'{}\' were found. '
'If this rule is not required, you can remove it '
'from section [' + section_name + '] in your '
'.coafile to deactivate this warning.')
limited_files = list(filter(limit_fnmatch, collected_files))
return limited_files
def collect_dirs(dir_paths, ignored_dir_paths=None):
"""
Evaluate globs in directory paths and return all matching directories
:param dir_paths: File path or list of such that can include globs
:param ignored_dir_paths: List of globs that match to-be-ignored dirs
:return: List of paths of all matching directories
"""
valid_dirs = list(filter(lambda fname: os.path.isdir(fname[0]),
icollect(dir_paths, ignored_dir_paths)))
if valid_dirs:
collected_dirs, _ = zip(*valid_dirs)
return list(collected_dirs)
else:
return []
@yield_once
def icollect_bears(bear_dir_glob, bear_globs, kinds, log_printer):
"""
Collect all bears from bear directories that have a matching kind.
:param bear_dir_glob: Directory globs or list of such that can contain bears
:param bear_globs: Globs of bears to collect
:param kinds: List of bear kinds to be collected
:param log_printer: Log_printer to handle logging
:return: Iterator that yields a tuple with bear class and
which bear_glob was used to find that bear class.
"""
for bear_dir, dir_glob in filter(lambda x: os.path.isdir(x[0]),
icollect(bear_dir_glob)):
# Since we get a real directory here and since we
# pass this later to iglob, we need to escape this.
bear_dir = glob_escape(bear_dir)
for bear_glob in bear_globs:
for matching_file in iglob(
os.path.join(bear_dir, bear_glob + '.py')):
try:
for bear in _import_bears(matching_file, kinds):
yield bear, bear_glob
except pkg_resources.VersionConflict as exception:
log_printer.log_exception(
('Unable to collect bears from {file} because there '
'is a conflict with the version of a dependency '
'you have installed. This may be resolved by '
'creating a separate virtual environment for coala '
'or running `pip install \"{pkg}\"`. Be aware that '
'the latter solution might break other python '
'packages that depend on the currently installed '
'version.').format(file=matching_file,
pkg=exception.req),
exception, log_level=LOG_LEVEL.WARNING)
except BaseException as exception:
log_printer.log_exception(
'Unable to collect bears from {file}. Probably the '
'file is malformed or the module code raises an '
'exception.'.format(file=matching_file),
exception,
log_level=LOG_LEVEL.WARNING)
def collect_bears(bear_dirs, bear_globs, kinds, log_printer,
warn_if_unused_glob=True):
"""
Collect all bears from bear directories that have a matching kind
matching the given globs.
:param bear_dirs: Directory name or list of such that can contain
bears.
:param bear_globs: Globs of bears to collect.
:param kinds: List of bear kinds to be collected.
:param log_printer: log_printer to handle logging.
:param warn_if_unused_glob: True if warning message should be shown if a
glob didn't give any bears.
:return: Tuple of list of matching bear classes based on
kind. The lists are in the same order as kinds.
"""
bears_found = tuple([] for i in range(len(kinds)))
bear_globs_with_bears = set()
for bear, glob in icollect_bears(bear_dirs, bear_globs, kinds, log_printer):
index = kinds.index(_get_kind(bear))
bears_found[index].append(bear)
bear_globs_with_bears.add(glob)
if warn_if_unused_glob:
_warn_if_unused_glob(log_printer, bear_globs, bear_globs_with_bears,
'No bears matching \'{}\' were found. Make sure '
'you have coala-bears installed or you have typed '
'the name correctly.')
return bears_found
def filter_section_bears_by_languages(bears, languages):
"""
Filters the bears by languages.
:param bears: The dictionary of the sections as keys and list of
bears as values.
:param languages: Languages that bears are being filtered on.
:return: New dictionary with filtered out bears that don't match
any language from languages.
"""
new_bears = {}
# All bears with "all" languages supported shall be shown
languages = set(language.lower() for language in languages) | {'all'}
for section in bears.keys():
new_bears[section] = tuple(
bear for bear in bears[section]
if {language.lower() for language in bear.LANGUAGES} & languages)
return new_bears
def filter_capabilities_by_languages(bears, languages):
"""
Filters the bears capabilities by languages.
:param bears: Dictionary with sections as keys and list of bears as
values.
:param languages: Languages that bears are being filtered on.
:return: New dictionary with languages as keys and their bears
capabilities as values. The capabilities are stored in a
tuple of two elements where the first one represents
what the bears can detect, and the second one what they
can fix.
"""
languages = set(language.lower() for language in languages)
language_bears_capabilities = {language: (
set(), set()) for language in languages}
for section_bears in bears.values():
for bear in section_bears:
bear_language = (
({language.lower() for language in bear.LANGUAGES} | {'all'}) &
languages)
language = bear_language.pop() if bear_language else ''
capabilities = (language_bears_capabilities[language]
if language else tuple())
language_bears_capabilities.update(
{language: (capabilities[0] | bear.can_detect,
capabilities[1] | bear.CAN_FIX)}
if language else {})
return language_bears_capabilities
def get_all_bears():
"""
Get a ``list`` of all available bears.
"""
from coalib.settings.Section import Section
printer = LogPrinter(NullPrinter())
local_bears, global_bears = collect_bears(
Section('').bear_dirs(),
['**'],
[BEAR_KIND.LOCAL, BEAR_KIND.GLOBAL],
printer,
warn_if_unused_glob=False)
return list(itertools.chain(local_bears, global_bears))
def get_all_bears_names():
"""
Get a ``list`` of names of all available bears.
"""
return [bear.name for bear in get_all_bears()]
def collect_all_bears_from_sections(sections, log_printer):
"""
Collect all kinds of bears from bear directories given in the sections.
:param sections: List of sections so bear_dirs are taken into account
:param log_printer: Log_printer to handle logging
:return: Tuple of dictionaries of local and global bears.
The dictionary key is section class and
dictionary value is a list of Bear classes
"""
local_bears = {}
global_bears = {}
for section in sections:
bear_dirs = sections[section].bear_dirs()
local_bears[section], global_bears[section] = collect_bears(
bear_dirs,
['**'],
[BEAR_KIND.LOCAL, BEAR_KIND.GLOBAL],
log_printer,
warn_if_unused_glob=False)
return local_bears, global_bears
def _warn_if_unused_glob(log_printer, globs, used_globs, message):
"""
Warn if a glob has not been used.
:param log_printer: The log_printer to handle logging.
:param globs: List of globs that were expected to be used.
:param used_globs: List of globs that were actually used.
:param message: Warning message to display if a glob is unused.
The glob which was unused will be added using
.format()
"""
unused_globs = set(globs) - set(used_globs)
for glob in unused_globs:
log_printer.warn(message.format(glob))
def collect_registered_bears_dirs(entrypoint):
"""
Searches setuptools for the entrypoint and returns the bear
directories given by the module.
:param entrypoint: The entrypoint to find packages with.
:return: List of bear directories.
"""
collected_dirs = []
for ep in pkg_resources.iter_entry_points(entrypoint):
registered_package = None
try:
registered_package = ep.load()
except pkg_resources.DistributionNotFound:
continue
collected_dirs.append(os.path.abspath(
os.path.dirname(registered_package.__file__)))
return collected_dirs