-
Notifications
You must be signed in to change notification settings - Fork 4
/
reviewers.py
executable file
·321 lines (280 loc) · 11.1 KB
/
reviewers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
#!/usr/bin/env python3
import argparse
from collections import Counter
import json
import os
import pathlib
import subprocess
import sys
import typing # NOQA
from typing import List, Tuple
if sys.version_info < (3, 0): # NOQA pragma: no cover
raise SystemError("Must be using Python 3")
__version__ = '0.13.5'
STRIP_DOMAIN_USERNAMES = ['uber.com']
REVIEWERS_LIMIT = 7
class FindReviewers():
def __init__(self, config): # type: (Config) -> None
self.config = config
def get_reviewers(self): # type: () -> typing.Counter[str]
"""
All review classes should implement this and return a list of strings
representing reviewers
"""
raise NotImplementedError()
def run_command(self, command: List[str]) -> List[str]:
""" Wrapper for running external subprocesses """
process = subprocess.run(command, stdout=subprocess.PIPE)
data = process.stdout.decode("utf-8").strip()
if data:
return data.split('\n')
return []
def extract_username_from_email(self, email: str) -> str:
""" Given an email, extract the username for that email """
domain = email[email.find('@')+1:]
if domain in STRIP_DOMAIN_USERNAMES:
return email[:email.find('@')]
return email
def check_phabricator_activated(
self, username: str
) -> subprocess.Popen[bytes]:
""" Check whether a phabricator user has been activated by """
phab_command = ['arc', 'call-conduit', 'user.search']
request = '{"constraints": {"usernames": ["%s"]}}' % username
process = subprocess.Popen(
phab_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate(input=request.encode("utf-8"))
return process
def parse_phabricator(self, username, process):
# type: (str, subprocess.Popen[bytes]) -> str
stdout, stderr = process.communicate()
if process.returncode != 0:
print("stdout: %s" % stdout.decode("utf-8"))
print("stderr: %s" % stderr.decode("utf-8"))
raise RuntimeError("Arc not able to call conduit")
output_str = stdout.decode("utf-8").strip()
phab_output = json.loads(output_str)
data = phab_output['response']['data']
if not data:
return username
roles = data[0]['fields']['roles']
if 'disabled' in roles:
return ''
return username
def filter_phabricator_activated(self, all_users: List[str]) -> List[str]:
limited_users = all_users[:REVIEWERS_LIMIT]
username_processes = [
(x, self.check_phabricator_activated(x)) for x in limited_users
]
usernames = [self.parse_phabricator(*x) for x in username_processes]
usernames = [x for x in usernames if x]
if len(usernames) < REVIEWERS_LIMIT:
for username in all_users[REVIEWERS_LIMIT:]:
check_proc = self.check_phabricator_activated(username)
username = self.parse_phabricator(username, check_proc)
if username:
usernames.append(username)
if len(usernames) >= REVIEWERS_LIMIT:
break
return usernames
class FindFileLogReviewers(FindReviewers):
def extract_username_from_shortlog(self, shortlog: str) -> Tuple[str, int]:
""" Given a line from a git shortlog, extract the username """
shortlog = shortlog.strip()
email = shortlog[shortlog.rfind("<")+1:]
email = email[:email.find(">")]
username = self.extract_username_from_email(email)
count = int(shortlog.split("\t")[0])
return username, count
def get_log_reviewers_from_file(self, file_paths):
# type: (List[str]) -> typing.Counter[str]
""" Find the reviewers based on the git log for a file """
git_shortlog_command = ['git', 'shortlog', '-sne']
if file_paths:
git_shortlog_command += ['--'] + file_paths
git_shortlog = self.run_command(git_shortlog_command)
users = dict(
self.extract_username_from_shortlog(shortlog)
for shortlog
in git_shortlog
)
users = {
reviewer: count for (reviewer, count)
in users.items() if reviewer
}
return Counter(users)
def get_changed_files(self) -> List[str]:
raise NotImplementedError()
def get_reviewers(self): # type: () -> typing.Counter[str]
""" Find the reviewers based on the git log of the diffed files """
changed_files = self.get_changed_files()
reviewers = self.get_log_reviewers_from_file(changed_files)
return reviewers
class FindLogReviewers(FindFileLogReviewers):
def get_changed_files(self) -> List[str]:
""" Find the changed files between current status and master """
branch = self.config.base_branch
git_diff_files_command = ['git', 'diff', branch, '--name-only']
git_diff_files = self.run_command(git_diff_files_command)
return git_diff_files
class FindHistoricalReviewers(FindFileLogReviewers):
def get_reviewers(self): # type: () -> typing.Counter[str]
reviewers = self.get_log_reviewers_from_file([])
return reviewers
class FindArcCommitReviewers(FindLogReviewers):
"""
Get reviewers based on arc commit messages, which list which users
have approved past diffs
"""
def get_log_reviewers_from_file(self, file_paths):
# type: (List[str]) -> typing.Counter[str]
command = ['git', 'log', '--all', '--'] + file_paths
git_commit_messages = self.run_command(command)
reviewers_identifier = 'Reviewed By: '
reviewers = Counter() # type: typing.Counter[str]
for line in git_commit_messages:
if reviewers_identifier not in line:
continue
line = line.replace(reviewers_identifier, '')
line_reviewers = line.split(', ')
line_reviewers = [r.strip() for r in line_reviewers]
reviewers.update(line_reviewers)
return reviewers
def show_reviewers(reviewer_list, copy_clipboard):
# type: (List[str], bool) -> None
""" Output the reviewers to stdout and optionally to OS clipboard """
reviewer_string = ", ".join(reviewer_list)
print(reviewer_string)
if not copy_clipboard:
return
try:
p = subprocess.Popen(
['pbcopy', 'w'],
stdin=subprocess.PIPE, close_fds=True
)
p.communicate(input=reviewer_string.encode('utf-8'))
except FileNotFoundError:
pass
def get_reviewers(config): # type: (Config) -> List[str]
""" Main function to get reviewers for a repository """
phabricator = False
finders = [
FindLogReviewers,
FindHistoricalReviewers,
FindArcCommitReviewers
]
reviewers = Counter() # type: typing.Counter[str]
for finder in finders:
finder_reviewers = finder(config).get_reviewers()
if config.verbose:
print(
"Reviewers from %s: %s" %
(finder.__name__, dict(finder_reviewers))
)
reviewers.update(finder_reviewers)
if finder == FindArcCommitReviewers and finder_reviewers:
phabricator = True
most_common = [x[0] for x in reviewers.most_common()]
most_common = [x for x in most_common if x not in config.ignores]
if phabricator:
most_common = FindArcCommitReviewers(config) \
.filter_phabricator_activated(most_common)
reviewers_list = most_common[:REVIEWERS_LIMIT]
return reviewers_list
class Config():
DEFAULT_GLOBAL_JSON = ".git/reviewers"
VERBOSE_DEFAULT = None
IGNORES_DEFAULT = ''
JSON_DEFAULT = ''
COPY_DEFAULT = None
BASE_BRANCH_DEFAULT = 'master'
def __init__(self) -> None:
self.verbose = False
self.ignores: List[str] = []
self.json = ''
self.copy = False
self.base_branch = 'master'
@staticmethod
def default_global_json():
# type: () -> str
"""
Return the path to the default config file for the current user
"""
home_dir = str(pathlib.Path.home())
json_path = os.path.join(home_dir, Config.DEFAULT_GLOBAL_JSON)
return json_path
def read_configs(self, args):
# type: (argparse.Namespace) -> None
""" Read config data """
self.read_from_json(Config.default_global_json())
self.read_from_json(args.json)
self.read_from_args(args)
def read_from_json(self, args_json):
# type: (str) -> None
""" Read configs from the json config file """
self.json = args_json
try:
with open(self.json, 'r') as config_handle:
config_data = config_handle.read()
config = json.loads(config_data)
except (FileNotFoundError, json.decoder.JSONDecodeError):
return
if type(config) is not dict:
return
self.verbose = config.get('verbose', self.verbose)
self.copy = config.get('copy', self.copy)
self.ignores += config.get('ignore', self.ignores)
self.base_branch = config.get('base_branch', self.base_branch)
def read_from_args(self, args):
# type: (argparse.Namespace) -> None
""" Parse configs by joining config file against argparse """
if args.verbose != Config.VERBOSE_DEFAULT:
self.verbose = args.verbose
if args.copy != Config.VERBOSE_DEFAULT:
self.copy = args.copy
if args.ignore != Config.IGNORES_DEFAULT:
self.ignores += args.ignore.split(',')
if args.base_branch != Config.BASE_BRANCH_DEFAULT:
self.base_branch = args.base_branch
def main() -> None:
""" Main entrypoint function to receive CLI arguments """
description = "Suggest reviewers for your diff.\n"
description += "https://github.com/albertyw/git-reviewers"
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
'-v', '--version', action='version', version=__version__,
)
parser.add_argument(
'--verbose',
default=Config.VERBOSE_DEFAULT, action='store_true',
help='verbose mode',
)
parser.add_argument(
'-i', '--ignore',
default=Config.IGNORES_DEFAULT,
help='ignore a list of reviewers (comma separated)',
)
parser.add_argument(
'-j', '--json',
default=Config.JSON_DEFAULT,
help='json file to read configs from, overridden by CLI flags',
)
parser.add_argument(
'-c', '--copy',
default=Config.COPY_DEFAULT, action='store_true',
help='Copy the list of reviewers to clipboard, if available',
)
parser.add_argument(
'-b', '--base-branch',
default=Config.BASE_BRANCH_DEFAULT,
help='Compare against a base branch (default: master)',
)
args = parser.parse_args()
config = Config()
config.read_configs(args)
reviewers_list = get_reviewers(config)
show_reviewers(reviewers_list, config.copy)
if __name__ == "__main__":
main()