forked from pantsbuild/pants
/
unpack_remote_sources_base.py
159 lines (128 loc) · 6.15 KB
/
unpack_remote_sources_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# coding=utf-8
# Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import os
import re
from abc import abstractmethod
from future.utils import text_type
from twitter.common.dirutil.fileset import fnmatch_translate_extended
from pants.base.build_environment import get_buildroot
from pants.base.exceptions import TaskError
from pants.task.task import Task
from pants.util.meta import AbstractClass, classproperty
from pants.util.objects import datatype
logger = logging.getLogger(__name__)
class UnpackedArchives(datatype([('found_files', tuple), ('rel_unpack_dir', text_type)])):
def __new__(cls, found_files, rel_unpack_dir):
return super(UnpackedArchives, cls).__new__(
cls,
tuple(found_files),
text_type(rel_unpack_dir))
class UnpackRemoteSourcesBase(Task, AbstractClass):
@property
def cache_target_dirs(cls):
return True
@classmethod
def product_types(cls):
return [UnpackedArchives]
@classproperty
def source_target_constraint(cls):
"""Return a type constraint which is evaluated to determine "source" targets for this task.
:return: :class:`pants.util.objects.TypeConstraint`
"""
raise NotImplementedError()
@abstractmethod
def unpack_target(unpackable_target, unpack_dir):
"""Unpack the remote resources indicated by `unpackable_target` into `unpack_dir`."""
@property
def _unpacked_sources_product(self):
return self.context.products.get_data(UnpackedArchives, lambda: {})
@classmethod
def _file_filter(cls, filename, include_patterns, exclude_patterns):
""":returns: `True` if the file should be allowed through the filter."""
logger.debug('filename: {}'.format(filename))
for exclude_pattern in exclude_patterns:
if exclude_pattern.match(filename):
return False
if include_patterns:
found = False
for include_pattern in include_patterns:
if include_pattern.match(filename):
found = True
break
if not found:
return False
return True
class InvalidPatternError(Exception):
"""Raised if a pattern can't be compiled for including or excluding args"""
@classmethod
def compile_patterns(cls, patterns, field_name="Unknown", spec="Unknown"):
compiled_patterns = []
for p in patterns:
try:
compiled_patterns.append(re.compile(fnmatch_translate_extended(p)))
except (TypeError, re.error) as e:
raise cls.InvalidPatternError(
'In {spec}, "{field_value}" in {field_name} can\'t be compiled: {msg}'
.format(field_name=field_name, field_value=p, spec=spec, msg=e))
return compiled_patterns
@classmethod
def _calculate_unpack_filter(cls, includes=None, excludes=None, spec=None):
"""Take regex patterns and return a filter function.
:param list includes: List of include patterns to pass to _file_filter.
:param list excludes: List of exclude patterns to pass to _file_filter.
"""
include_patterns = cls.compile_patterns(includes or [],
field_name='include_patterns',
spec=spec)
logger.debug('include_patterns: {}'
.format(list(p.pattern for p in include_patterns)))
exclude_patterns = cls.compile_patterns(excludes or [],
field_name='exclude_patterns',
spec=spec)
logger.debug('exclude_patterns: {}'
.format(list(p.pattern for p in exclude_patterns)))
return lambda f: cls._file_filter(f, include_patterns, exclude_patterns)
@classmethod
def get_unpack_filter(cls, unpackable_target):
"""Calculate a filter function from the include/exclude patterns of a Target.
:param ImportRemoteSourcesMixin unpackable_target: A target with include_patterns and
exclude_patterns attributes.
"""
# TODO: we may be able to make use of glob matching in the engine to avoid doing this filtering.
return cls._calculate_unpack_filter(includes=unpackable_target.payload.include_patterns,
excludes=unpackable_target.payload.exclude_patterns,
spec=unpackable_target.address.spec)
class DuplicateUnpackedSourcesError(TaskError): pass
def _traverse_unpacked_dir(self, unpack_dir):
found_files = []
for root, dirs, files in os.walk(unpack_dir):
for f in files:
relpath = os.path.relpath(os.path.join(root, f), unpack_dir)
found_files.append(relpath)
rel_unpack_dir = os.path.relpath(unpack_dir, get_buildroot())
return found_files, rel_unpack_dir
def _add_unpacked_sources_for_target(self, target, unpack_dir):
maybe_existing_sources = self._unpacked_sources_product.get(target, None)
if maybe_existing_sources:
raise self.DuplicateUnpackedSourcesError(
"Target {} must not have any unpacked sources already registered!\n"
"The existing value was: {}\n"
"The second unpacked directory registered was: {}"
.format(target, maybe_existing_sources, unpack_dir))
found_files, rel_unpack_dir = self._traverse_unpacked_dir(unpack_dir)
self.context.log.debug('target: {}, rel_unpack_dir: {}, found_files: {}'
.format(target, rel_unpack_dir, found_files))
self._unpacked_sources_product[target] = UnpackedArchives(found_files, rel_unpack_dir)
class MissingUnpackedDirsError(Exception):
"""Raised if a directory that is expected to be unpacked doesn't exist."""
def execute(self):
with self.invalidated(self.get_targets(self.source_target_constraint.satisfied_by),
fingerprint_strategy=self.get_fingerprint_strategy(),
invalidate_dependents=True) as invalidation_check:
for vt in invalidation_check.invalid_vts:
self.unpack_target(vt.target, vt.results_dir)
for vt in invalidation_check.all_vts:
self._add_unpacked_sources_for_target(vt.target, vt.results_dir)