forked from pantsbuild/pants
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rules.py
148 lines (121 loc) · 4.87 KB
/
rules.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from dataclasses import dataclass
from typing import Tuple
from pants.backend.python.lint.autoflake.skip_field import SkipAutoflakeField
from pants.backend.python.lint.autoflake.subsystem import Autoflake
from pants.backend.python.target_types import InterpreterConstraintsField, PythonSourceField
from pants.backend.python.util_rules import pex
from pants.backend.python.util_rules.pex import PexRequest, VenvPex, VenvPexProcess
from pants.core.goals.fmt import FmtRequest, FmtResult
from pants.core.goals.lint import LintResult, LintResults, LintTargetsRequest
from pants.core.util_rules.source_files import SourceFiles, SourceFilesRequest
from pants.engine.fs import Digest
from pants.engine.process import FallibleProcessResult, Process, ProcessResult
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.engine.target import FieldSet, Target
from pants.engine.unions import UnionRule
from pants.util.logging import LogLevel
from pants.util.strutil import pluralize
@dataclass(frozen=True)
class AutoflakeFieldSet(FieldSet):
required_fields = (PythonSourceField,)
source: PythonSourceField
interpreter_constraints: InterpreterConstraintsField
@classmethod
def opt_out(cls, tgt: Target) -> bool:
return tgt.get(SkipAutoflakeField).value
class AutoflakeRequest(FmtRequest, LintTargetsRequest):
field_set_type = AutoflakeFieldSet
name = "autoflake"
@dataclass(frozen=True)
class SetupRequest:
request: AutoflakeRequest
check_only: bool
@dataclass(frozen=True)
class Setup:
process: Process
original_digest: Digest
def generate_argv(
source_files: SourceFiles, autoflake: Autoflake, *, check_only: bool
) -> Tuple[str, ...]:
args = []
if check_only:
args.append("--check")
else:
args.append("--in-place")
args.append("--remove-all-unused-imports")
args.extend(autoflake.args)
args.extend(source_files.files)
return tuple(args)
@rule(level=LogLevel.DEBUG)
async def setup_autoflake(setup_request: SetupRequest, autoflake: Autoflake) -> Setup:
autoflake_pex_get = Get(
VenvPex,
PexRequest(
output_filename="autoflake.pex",
internal_only=True,
requirements=autoflake.pex_requirements(),
interpreter_constraints=autoflake.interpreter_constraints,
main=autoflake.main,
),
)
source_files_get = Get(
SourceFiles,
SourceFilesRequest(field_set.source for field_set in setup_request.request.field_sets),
)
source_files, autoflake_pex = await MultiGet(source_files_get, autoflake_pex_get)
source_files_snapshot = (
source_files.snapshot
if setup_request.request.prior_formatter_result is None
else setup_request.request.prior_formatter_result
)
process = await Get(
Process,
VenvPexProcess(
autoflake_pex,
argv=generate_argv(source_files, autoflake, check_only=setup_request.check_only),
input_digest=source_files_snapshot.digest,
output_files=source_files_snapshot.files,
description=f"Run Autoflake on {pluralize(len(setup_request.request.field_sets), 'file')}.",
level=LogLevel.DEBUG,
),
)
return Setup(process, original_digest=source_files_snapshot.digest)
@rule(desc="Format with Autoflake", level=LogLevel.DEBUG)
async def autoflake_fmt(request: AutoflakeRequest, autoflake: Autoflake) -> FmtResult:
if autoflake.skip:
return FmtResult.skip(formatter_name=request.name)
setup = await Get(Setup, SetupRequest(request, check_only=False))
result = await Get(ProcessResult, Process, setup.process)
return FmtResult.from_process_result(
result,
original_digest=setup.original_digest,
formatter_name=request.name,
strip_chroot_path=True,
)
@rule(desc="Lint with autoflake", level=LogLevel.DEBUG)
async def autoflake_lint(request: AutoflakeRequest, autoflake: Autoflake) -> LintResults:
if autoflake.skip:
return LintResults([], linter_name=request.name)
setup = await Get(Setup, SetupRequest(request, check_only=True))
result = await Get(FallibleProcessResult, Process, setup.process)
def strip_check_result(output: str) -> str:
return "\n".join(line for line in output.splitlines() if line != "No issues detected!")
return LintResults(
[
LintResult(
result.exit_code,
strip_check_result(result.stdout.decode()),
result.stderr.decode(),
)
],
linter_name=request.name,
)
def rules():
return [
*collect_rules(),
UnionRule(FmtRequest, AutoflakeRequest),
UnionRule(LintTargetsRequest, AutoflakeRequest),
*pex.rules(),
]