This repository has been archived by the owner on Oct 31, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
pkgfile.py
executable file
·427 lines (381 loc) · 16.8 KB
/
pkgfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
#!/usr/bin/python3
###
# pkgfile.py -- search the arch repo to see what package owns a file
# This program is a part of pkgtools
#
# Copyright (C) 2010 solsTiCe d'Hiver <solstice.dhiver@gmail.com>
# Copyright (C) 2008-2011 Daenyth <Daenyth+Arch _AT_ gmail _DOT_ com>
#
# Pkgtools is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# Pkgtools is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
##
import re
import glob
import os
import sys
import optparse
import subprocess
import urllib.request, urllib.error, urllib.parse
import tarfile
import time
import pkgfile
VERSION = '22'
CONFIG_DIR = '/etc/pkgtools'
FILELIST_DIR = '/var/cache/pkgtools/lists'
def find_dbpath():
'''find pacman dbpath'''
p = subprocess.Popen(['pacman', '-Tv', '--debug'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = str(p.communicate()[0], "utf-8")
for line in output.splitlines():
if line.startswith('DB Path'):
return line.split(':')[1].strip()
raise RuntimeError("Unable to determine pacman DB path")
def parse_config(filename, options=None, comment_char='#', option_char='='):
'''basic function to parse a key=value config file'''
# Borrowed at http://www.decalage.info/en/python/configparser
# another option is http://mail.python.org/pipermail/python-dev/2002-November/029987.html
if options is None:
options = {}
try:
with open(filename) as f:
for line in f:
line = line.strip()
if comment_char in line:
line, comment = line.split(comment_char, 1)
if option_char in line:
option, value = line.split(option_char, 1)
option = option.strip()
value = value.strip('"\' ')
try:
options[option] = int(value)
except ValueError:
options[option] = value
except IOError:
pass
return options
def load_config(conf_file, options=None):
'''load main config file and try in XDG_CONFIG_HOME too'''
options = parse_config(os.path.join(CONFIG_DIR, conf_file), options=options)
XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME')
if XDG_CONFIG_HOME is not None:
xdg_conf_file = os.path.join(XDG_CONFIG_HOME, 'pkgtools', conf_file)
if os.path.exists(xdg_conf_file):
local_options = parse_config(xdg_conf_file)
options.update(local_options)
return options
def die(n=-1, msg='Unknown error'):
# TODO: All calls to die() should probably just be exceptions
print(msg, file=sys.stderr)
sys.exit(n)
def print_pkg(pkg):
'''pretty print a pkg dict, mimicking pacman -Qi output'''
PKG_ATTRS = ('name', 'version', 'url', 'license', 'groups', 'provides',
'depends', 'optdepends', 'conflicts', 'replaces',
'isize','packager', 'arch', 'installdate', 'builddate', 'desc')
WIDTH = max(len(i) for i in PKG_ATTRS) + 1
# all attributes are not printed
for attr in PKG_ATTRS:
field = attr.capitalize().ljust(WIDTH)
try:
value = pkg[attr]
except KeyError:
continue
if value is None:
print('%s: --' % field)
continue
if attr == 'csize' or attr == 'isize':
print('%s: %d k' % (field, value/1024))
#elif attr == 'force':
# print = '%s: %d' % (field, value)
elif attr in ('groups', 'license', 'replaces', 'depends', 'conflicts', 'provides'):
print('%s: %s' % (field, ' '.join(value)))
elif attr == 'optdepends':
print('%s: %s' % (field, ('\n'+(WIDTH+2)*' ').join(value)))
elif attr == 'builddate':
try:
print('%s: %s' % (field, time.strftime('%a, %d %b %Y %H:%M:%S',
time.localtime(value))))
except ValueError:
print('%s: error !' % attr.ljust(22))
elif attr == 'backup':
s = field + ':'
for i in value:
s += '\n'+': '.join(i.split('\t')) +'\n'
else:
s += ' --'
print(s)
#elif attr == 'files':
# print '%s: %s' % (field, '\n'+'\n'.join(value))
else:
print('%s: %s' % (field, value))
print()
def get_mirrorlist():
"""Return a list of (reponame, mirror_url) for all mirrors known to pacman"""
p = subprocess.Popen(['pacman', '-T', '--debug'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = p.communicate()[0].decode('utf-8')
mirrors = []
server = re.compile(r'.*adding new server URL to database \'(.*)\': (.*)')
for line in output.splitlines():
m = server.match(line)
if m:
mirrors.append((m.group(1), m.group(2)))
return mirrors
def update_repo(options, target_repos=None, filelist_dir=FILELIST_DIR):
'''download .files.tar.gz for each repo found in pacman config or the one specified'''
# XXX: This function is way too big. Needs refactoring
if not os.path.exists(filelist_dir):
print(' Warning: %s does not exist. Creating it.' % filelist_dir, file=sys.stderr)
try:
os.mkdir(filelist_dir, 0o755)
except OSError:
# TODO: raise UpdateFailedError("Failed to create filelist dir: e")...
die(1, 'Error: Can\'t create %s directory' % filelist_dir)
# Check here first since we squash IOError later, and it goes into a long
# loop of repeated errors.
if not os.access(filelist_dir, os.F_OK|os.R_OK|os.W_OK|os.X_OK):
die(1, 'Error: %s is not accessible' % filelist_dir)
mirror_list = get_mirrorlist()
repo_done = []
for repo, mirror in mirror_list:
if target_repos is not None and repo != target_repos:
continue
if repo not in repo_done:
print(':: Checking [%s] for files list ...' % repo)
repofile = '%s.files.tar.gz' % repo
fileslist = mirror + '/' + repofile
try:
if options.verbose:
print(' Trying mirror %s ...' % mirror)
dbfile = '%s/%s.files.tar.gz' % (filelist_dir, repo)
try:
# try to get mtime of dbfile
local_mtime = os.path.getmtime(dbfile)
except os.error:
local_mtime = 0 # fake a very old date if dbfile doesn't exist
# Initiate connection to get 'Last-Modified' header
conn = urllib.request.urlopen(fileslist, timeout=30)
# No more conn.info().getdate() in py3k, so we need to parse it
# into something similar to a struct_time manually
last_modified = conn.headers['last-modified']
if last_modified is None:
should_update = True
else:
# I hope that format string is correct.. I have a feeling this bit is really fragile.
time_struct = time.strptime(last_modified,
'%a, %d %b %Y %H:%M:%S %Z')
remote_mtime = time.mktime(time_struct)
should_update = remote_mtime > local_mtime
if should_update or options.update > 1:
if options.verbose:
print(' Downloading %s ...' % fileslist)
f = open(dbfile, 'wb')
f.write(conn.read())
f.close()
conn.close()
else:
print(' No update available')
conn.close()
repo_done.append(repo)
except IOError as e:
# XXX: This looks ugly. Consider reworking
if options.verbose:
error_message = ' Warning: could not retrieve %s' % fileslist
else:
error_message = ' Warning: could not retrieve file list.'
print(error_message, file=sys.stderr)
if options.verbose:
print(" " + str(e), file=sys.stderr)
continue
# remove left-over db (for example for repo removed from pacman config)
# XXX: This should probably be in some type of behavior like pacman -Scc (pkgfile -c[c]?)
repos = glob.glob(os.path.join(filelist_dir, '*.files.tar.gz'))
registered_repos = set(os.path.join(filelist_dir, r[0]+'.files.tar.gz') for r in mirror_list)
for r in repos:
if r not in registered_repos:
print(':: Deleting %s' % r)
os.unlink(r)
def is_binary(path):
"""Utility function used to determine whether a file should be displayed under -b"""
if isinstance(path, bytes):
path = path.decode('utf-8', errors='ignore')
return re.search(r'(?:^|/)s?bin/.', path) != None
def list_files(pkgname, options, filelist_dir=FILELIST_DIR):
'''list files of package matching pkgname'''
target_repo = options.repo
if '/' in pkgname:
res = pkgname.split('/')
if len(res) > 2:
# XXX: This behavior seems to duplicate the -R switch. Maybe we
# should pick one and forbid the other. Probably this is the
# behavior that should be removed
print('If given foo/bar, assume "bar" package in "foo" repo', file=sys.stderr)
return
target_repo, pkg = res
else:
pkg = pkgname
if target_repo:
tmp = os.path.join(filelist_dir, '%s.files.tar.gz' % target_repo)
if not os.path.exists(tmp):
die(1, 'Error: %s repo does not exist' % target_repo)
repo_list = [tmp]
else:
repo_list = glob.glob(os.path.join(filelist_dir, '*.files.tar.gz'))
try:
if options.glob:
match_type = pkgfile.MATCH_SHELL
elif options.regex:
match_type = pkgfile.MATCH_REGEX
else:
match_type = pkgfile.MATCH_SIMPLE
search = pkgfile.Search(match_type, pkgfile.SEARCH_PACKAGE, pkg)
except pkgfile.RegexError:
die(1, 'Error: invalid pattern or regular expression')
found_pkg = False
for dbfile in repo_list:
repo = os.path.basename(dbfile).replace('.files.tar.gz', '')
matches = search(dbfile)
# XXX: nested loop, investigate options
for match in sorted(matches):
for file_ in sorted(match['files']):
filename = str(file_, 'utf-8')
if options.binaries:
if is_binary(filename):
print('%s /%s' % (match['name'], filename))
found_pkg = True
else:
print('%s /%s' % (match['name'], filename))
found_pkg = True
if not found_pkg:
print('Package "%s" not found' % pkg, end='')
if target_repo != '':
print(' in [%s] repo ' % target_repo, end=' ')
print()
def query_pkg(filename, options, filelist_dir=FILELIST_DIR):
'''search package with a file matching filename'''
try:
search_type = pkgfile.SEARCH_FILENAME
if options.glob:
match_type = pkgfile.MATCH_SHELL
search_type = pkgfile.SEARCH_PATH
elif options.regex:
match_type = pkgfile.MATCH_REGEX
search_type = pkgfile.SEARCH_PATH
else:
match_type = pkgfile.MATCH_SIMPLE
if filename.startswith('/'):
search_type = pkgfile.SEARCH_PATH
filename = filename.lstrip('/')
search = pkgfile.Search(match_type, search_type, filename)
except pkgfile.RegexError:
die(1, 'Error: invalid pattern or regular expression')
target_repos = options.repo
if target_repos:
tmp = os.path.join(filelist_dir, '%s.files.tar.gz' % target_repos)
if not os.path.exists(tmp):
die(1, 'Error: %s repo does not exist' % target_repos)
repo_list = [tmp]
else:
repo_list = glob.glob(os.path.join(filelist_dir, '*.files.tar.gz'))
for dbfile in repo_list:
# search the package name that have a filename
matches = search(dbfile)
repo = os.path.basename(dbfile).replace('.files.tar.gz', '')
res = []
for match in matches:
files = match['files']
if options.binaries:
files = list(filter(is_binary, files))
if files != []:
if options.info:
pkg = pkgfile.pkg_info(dbfile, [bytes(match['name'], 'utf-8')])[0]
print_pkg(pkg)
if options.verbose:
print('\n'.join('%s/%s : /%s' % (repo, match['name'], f.decode('utf-8', 'ignore')) for f in files))
print()
else:
if options.verbose:
print('\n'.join('%s/%s (%s) : /%s' % (repo, match['name'], match['version'], f.decode('utf-8', 'ignore')) for f in files))
else:
print('%s/%s' % (repo, match['name']))
def main():
# This section is here for backward compatibility
dict_options = load_config('pkgfile.conf')
try:
filelist_dir = os.path.expanduser(dict_options['FILELIST_DIR'].rstrip('/'))
except KeyError:
filelist_dir = FILELIST_DIR
# PKGTOOLS_DIR is meaningless here
# CONFIG_DIR is useless
# RATELIMIT is not used yet
# options are:
# * use wget
# * make a throttling urlretrieve
# * use urlgrabber
# * use pycurl
# CMD_SEARCH_ENABLED is not used here
# UPDATE_CRON neither
usage = '%prog [ACTIONS] [OPTIONS] filename'
parser = optparse.OptionParser(usage=usage, version='%%prog %s' % VERSION)
# actions
actions = optparse.OptionGroup(parser, 'ACTIONS')
actions.add_option('-i', '--info', dest='info', action='store_true',
default=False, help='provides information about the package owning a file')
actions.add_option('-l', '--list', dest='list', action='store_true',
default=False, help='list files of a given package; similar to "pacman -Ql"')
actions.add_option('-s', '--search', dest='search', action='store_true',
default=True, help='search which package owns a file')
actions.add_option('-u', '--update', dest='update', action='count',
default=0, help='update to the latest filelist. This requires write permission to %s' % filelist_dir)
parser.add_option_group(actions)
# options
parser.add_option('-b', '--binaries', dest='binaries', action='store_true',
default=False, help='only show files in a {s}bin/ directory. Works with -s, -l')
parser.add_option('-c', '--case-sensitive', dest='case_sensitive', action='store_true',
default=False, help='make searches case sensitive')
parser.add_option('-g', '--glob', dest='glob', action='store_true',
default=False, help='allow the use of * and ? as wildcards.')
parser.add_option('-r', '--regex', dest='regex', action='store_true',
default=False, help='allow the use of regex in searches')
parser.add_option('-R', '--repo', dest='repo', action='store',
default='', help='search only in the specified repository')
parser.add_option('-v', '--verbose', dest='verbose', action='store_true',
default=False, help='enable verbose output')
(options, args) = parser.parse_args()
if options.glob and options.regex:
die(1, 'Error: -g/--glob and -r/--regex are exclusive.')
if options.update:
try:
update_repo(options, filelist_dir=filelist_dir, target_repos=args[0])
except IndexError:
update_repo(options, filelist_dir=filelist_dir)
elif options.list:
try:
list_files(args[0], options, filelist_dir=filelist_dir)
except IndexError:
parser.print_help()
die(1, 'Error: No target specified')
elif options.info or options.search:
try:
query_pkg(args[0], options, filelist_dir=filelist_dir)
except IndexError:
parser.print_help()
die(1, 'Error: No target specified')
if __name__ == '__main__':
# This will ensure that any files we create are readable by normal users
# TODO: Move to more relevent section
os.umask(0o022)
try:
main()
except KeyboardInterrupt:
print('Aborted')