/
policyenforce.py
executable file
·318 lines (276 loc) · 11.2 KB
/
policyenforce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
#!/usr/bin/env python3
#
# This is a simple policy enforcement script for the PostgreSQL project
# git server.
# It supports a couple of different policies that can be enforced on
# all commits.
#
# Note: the script (not surprisingly) uses the git commands in pipes, so git
# needs to be available in the path.
#
# Copyright (C) 2010-2013 PostgreSQL Global Development Group
# Author: Magnus Hagander <magnus@hagander.net>
#
# Released under the PostgreSQL license
#
#
# The script is configured by having a file named policyenforce.ini in the same
# directory as the script. See the README file for information about the contents.
#
import sys
import os.path
import re
from subprocess import Popen, PIPE
from configparser import ConfigParser
import codecs
#
# Load the global config
#
cfgname = "%s/policyenforce.ini" % os.path.dirname(sys.argv[0])
if not os.path.isfile(cfgname):
raise Exception("Config file '%s' is missing!" % cfgname)
c = ConfigParser()
with codecs.open(cfgname, 'r', encoding='utf8') as f:
c.read_file(f)
# Figure out if we should do debugging
try:
debug = int(c.get('policyenforce', 'debug'))
except Exception as e:
print("Except: %s" % e)
debug = 1
class PolicyObject(object):
def _enforce(self, policyname):
"""
Check if a specific policy should be enforced, returning True/False
"""
try:
enf = int(c.get("policies", policyname))
if enf == 1:
return True
return False
except Exception as e:
return False
def _enforce_str(self, policyname):
"""
Check if a specific policy should be enforced, returning a string
containing the value of the policy, or None if the policy is not
specified or empty.
"""
try:
enf = c.get("policies", policyname).strip()
if enf == "":
return None
return enf
except Exception as e:
return None
class Commit(PolicyObject):
"""
This class wraps a single commit, and the checking of policies on it.
"""
def __init__(self, commitid):
"""
Initialize and load basic information about a commit. Takes the SHA-1
of a commit as parameter (should in theory work with other types of
references as well).
"""
self.commitid = commitid
self.tree = None
self.parent = []
self.author = None
self.committer = None
# Get the basic info about the commit using git cat-file
p = Popen("git cat-file commit %s" % commitid, shell=True, stdout=PIPE)
for l in p.stdout:
l = l.decode('utf8', errors='ignore')
if re.match(r'^(\s+)$', l):
break
elif l.startswith("tree "):
self.tree = l[5:].strip()
elif l.startswith("parent "):
self.parent.append(l[7:].strip())
elif l.startswith("author "):
self.author = self._parse_author(l[7:].strip())
elif l.startswith("committer "):
self.committer = self._parse_author(l[10:].strip())
elif l.startswith("gpgsig "):
pass
else:
raise Exception("Unknown commit info line for commit %s: %s" % (commitid, l))
p.stdout.close()
# Verify that the basic information we retrieved is complete.
if not self.tree:
raise Exception("Commit %s has no tree" % commitid)
if len(self.parent) == 0:
raise Exception("Commit %s has no parent(s)" % commitid)
if not self.author:
raise Exception("Commit %s has no author" % commitid)
if not self.committer:
raise Exception("Commit %s has no committer" % commitid)
def _parse_author(self, authorstring):
"""
Parse an author record from a git object. Expects the format
name <email> number timezone
Returns the "name <email>" part.
"""
m = re.search(r'^([^<]+ <[^>]+>) \d+ [+-]\d{4}$', authorstring)
if not m:
raise Exception("User '%s' on commit %s does not follow format rules." % (authorstring, self.commitid))
return m.group(1)
def _policyfail(self, msg):
"""
Indicate that a commit violated a policy, and abort the program with the
appropriate exitcode.
"""
print("Commit %s violates the policy: %s" % (self.commitid, msg))
sys.exit(1)
def check_policies(self):
"""
For this commit, check all policies that are enabled in the configuration
file.
"""
if self._enforce("nomerge"):
# Merge commits always have more than one parent
if len(self.parent) != 1:
self._policyfail("No merge commits allowed")
if self._enforce("committerequalsauthor"):
if self.author != self.committer:
self._policyfail("Author (%s) must be equal to committer (%s)" % (self.author, self.committer))
if self._enforce("committerlist"):
# Enforce specific committer listed in config file.
self.enforce_user(self.committer, 'Committer')
if self._enforce("authorlist"):
# Enforce specific author is listed in config file (as committer).
self.enforce_user(self.author, 'Author')
if self._enforce("signcommits"):
# Enforce that all commits are signed
e = os.environ.copy()
e['GNUPGHOME'] = c.get('policyenforce', 'gpghome')
p = Popen("git verify-commit %s" % self.commitid, shell=True, stderr=PIPE, env=e)
for l in p.stderr:
if l.startswith('gpg: Good signature from'):
if debug:
print("Signature verified: %s" % l)
break
else:
self._policyfail("Commit is not signed by a trusted key")
def enforce_user(self, user, usertype):
# We do this by splitting the name again, and doing a lookup
# match on that.
m = re.search('^([^<]+) <([^>]+)>', user)
if not m:
raise Exception("%s '%s' for commit %s does not follow format rules." % (usertype, user, self.commitid))
uname = str(m.group(1)).lower()
if not c.has_option('committers', uname):
self._policyfail("%s %s not listed in committers section" % (usertype, uname))
if not c.get('committers', uname) == m.group(2):
self._policyfail("%s %s has wrong email (%s, should be %s)" % (
usertype, uname, m.group(2), c.get('committers', uname)))
class Tag(PolicyObject):
def __init__(self, ref, name):
self.ref = ref
self.name = name
def check_policies(self):
if self._enforce("nolightweighttag"):
# A lightweight tag is a tag object, a "heavy" tag is
# a commit object.
p = Popen("git cat-file -t %s" % self.ref, shell=True, stdout=PIPE)
t = p.stdout.read().strip()
p.stdout.close()
if t == "commit":
self._policyfail("No lightweight tags allowed")
if self._enforce("signtags"):
# Enforce that all tags are signed
e = os.environ.copy()
e['GNUPGHOME'] = c.get('policyenforce', 'gpghome')
p = Popen("git verify-tag %s" % self.ref, shell=True, stderr=PIPE, env=e)
for l in p.stderr:
if l.startswith('gpg: Good signature from'):
if debug:
print("Signature verified: %s" % l)
break
else:
self._policyfail("Tag is not signed by a trusted key")
def _policyfail(self, msg):
"""
Indicate that a tag violated a policy, and abort the program with the
appropriate exitcode.
"""
print("Tag %s violates the policy: %s" % (self.name, msg))
sys.exit(1)
class Branch(PolicyObject):
def __init__(self, ref, name):
self.ref = ref
self.name = name
def check_create(self):
if self._enforce("nobranchcreate"):
self._policyfail("No branch creation allowed")
if self._enforce_str("branchnamefilter"):
# All branch names starts with refs/heads/, so just remove that
# when doing the regexp match
if not re.match(self._enforce_str("branchnamefilter"),
self.name[len("refs/heads/"):]):
self._policyfail("Branch name does not match allowed regexp")
def check_remove(self):
if self._enforce("nobranchdelete"):
self._policyfail("No branch removal allowed")
def _policyfail(self, msg):
"""
Indicate that a branch violated a policy, and abort the program with the
appropriate exitcode.
"""
print("Branch %s violates the policy: %s" % (self.name, msg))
sys.exit(1)
class ForcePush(PolicyObject):
def __init__(self, ref, oldobj, newobj):
self.name = ref
self.old = oldobj
self.new = newobj
def check_force(self):
patterns = self._enforce_str("forcepushbranches")
if not patterns:
# If not configured, then all branches are accepted
return
for p in patterns.split(','):
if re.fullmatch(p, self.name[len("refs/heads/"):]):
return
# With no match on the branch name that means that *if* this is a force-push, we should
# reject it. So figure out if it is.
p = Popen("git merge-base {} {}".format(self.old, self.new), shell=True, stdout=PIPE)
merge = p.stdout.read().decode('utf8', 'ignore').strip()
if merge != self.old:
print("Forced pushes are not allowed on branch {}".format(self.name[len("refs/heads/"):]))
sys.exit(1)
if __name__ == "__main__":
# Get a list of refs on stdin, do something smart with it
ref = sys.argv[1]
oldobj = sys.argv[2]
newobj = sys.argv[3]
if oldobj == "".zfill(40):
# old object being all zeroes means a new branch or tag was created
if ref.startswith("refs/heads/"):
# It's a branch!
Branch(newobj, ref).check_create()
elif ref.startswith("refs/tags/"):
# It's a tag!
Tag(newobj, ref).check_policies()
else:
raise Exception("Unknown branch/tag type %s" % ref)
elif newobj == "".zfill(40):
# new object being all zeroes means a branch was removed
Branch(newobj, ref).check_remove()
else:
# These are both real objects.
# If force push protection is configured, make sure this is not a force-push.
ForcePush(ref, oldobj, newobj).check_force()
# Now use git rev-list to identify exactly which ones they are,
# and apply policies as needed.
p = Popen("git rev-list %s..%s" % (oldobj, newobj), shell=True, stdout=PIPE)
for l in p.stdout:
if debug:
print("Checking commit %s" % l.decode('utf8', errors='ignore').strip())
Commit(l.decode('utf8', errors='ignore').strip()).check_policies()
if debug:
print("Commit ok.")
if debug:
print("In debugging mode, refusing push unconditionally.")
sys.exit(1)