-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Let osc handle xz compressed control files
- Loading branch information
1 parent
d3cec0b
commit 96be165
Showing
2 changed files
with
229 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
|
||
from __future__ import print_function | ||
|
||
from . import ar | ||
import os.path | ||
import re | ||
import tarfile | ||
import StringIO | ||
from . import packagequery | ||
|
||
HAVE_LZMA = True | ||
try: | ||
import lzma | ||
except ImportError: | ||
HAVE_LZMA = False | ||
|
||
class DebError(packagequery.PackageError): | ||
pass | ||
|
||
class DebQuery(packagequery.PackageQuery, packagequery.PackageQueryResult): | ||
|
||
default_tags = ('package', 'version', 'release', 'epoch', 'architecture', 'description', | ||
'provides', 'depends', 'pre_depends', 'conflicts', 'breaks') | ||
|
||
def __init__(self, fh): | ||
self.__file = fh | ||
self.__path = os.path.abspath(fh.name) | ||
self.filename_suffix = 'deb' | ||
self.fields = {} | ||
|
||
def read(self, all_tags=False, self_provides=True, *extra_tags): | ||
arfile = ar.Ar(fh = self.__file) | ||
arfile.read() | ||
debbin = arfile.get_file('debian-binary') | ||
if debbin is None: | ||
raise DebError(self.__path, 'no debian binary') | ||
if debbin.read() != '2.0\n': | ||
raise DebError(self.__path, 'invalid debian binary format') | ||
control = arfile.get_file('control.tar.gz') | ||
if control is not None: | ||
# XXX: python2.4 relies on a name | ||
tar = tarfile.open(name='control.tar.gz', fileobj=control) | ||
else: | ||
control = arfile.get_file('control.tar.xz') | ||
if control is None: | ||
raise DebError(self.__path, 'missing control.tar') | ||
if not HAVE_LZMA: | ||
raise DebError(self.__path, 'can\'t open control.tar.xz without python-lzma') | ||
decompressed = lzma.decompress(control.read()) | ||
tar = tarfile.open(name="control.tar.xz", | ||
fileobj=StringIO.StringIO(decompressed)) | ||
try: | ||
name = './control' | ||
# workaround for python2.4's tarfile module | ||
if 'control' in tar.getnames(): | ||
name = 'control' | ||
control = tar.extractfile(name) | ||
except KeyError: | ||
raise DebError(self.__path, | ||
'missing \'control\' file in control.tar') | ||
self.__parse_control(control, all_tags, self_provides, *extra_tags) | ||
return self | ||
|
||
def __parse_control(self, control, all_tags=False, self_provides=True, *extra_tags): | ||
data = control.readline().strip() | ||
while data: | ||
field, val = re.split(':\s*', data.strip(), 1) | ||
data = control.readline() | ||
while data and re.match('\s+', data): | ||
val += '\n' + data.strip() | ||
data = control.readline().rstrip() | ||
field = field.replace('-', '_').lower() | ||
if field in self.default_tags + extra_tags or all_tags: | ||
# a hyphen is not allowed in dict keys | ||
self.fields[field] = val | ||
versrel = self.fields['version'].rsplit('-', 1) | ||
if len(versrel) == 2: | ||
self.fields['version'] = versrel[0] | ||
self.fields['release'] = versrel[1] | ||
else: | ||
self.fields['release'] = None | ||
verep = self.fields['version'].split(':', 1) | ||
if len(verep) == 2: | ||
self.fields['epoch'] = verep[0] | ||
self.fields['version'] = verep[1] | ||
else: | ||
self.fields['epoch'] = '0' | ||
self.fields['provides'] = [ i.strip() for i in re.split(',\s*', self.fields.get('provides', '')) if i ] | ||
self.fields['depends'] = [ i.strip() for i in re.split(',\s*', self.fields.get('depends', '')) if i ] | ||
self.fields['pre_depends'] = [ i.strip() for i in re.split(',\s*', self.fields.get('pre_depends', '')) if i ] | ||
self.fields['conflicts'] = [ i.strip() for i in re.split(',\s*', self.fields.get('conflicts', '')) if i ] | ||
self.fields['breaks'] = [ i.strip() for i in re.split(',\s*', self.fields.get('breaks', '')) if i ] | ||
self.fields['recommends'] = [ i.strip() for i in re.split(',\s*', self.fields.get('recommends', '')) if i ] | ||
self.fields['suggests'] = [ i.strip() for i in re.split(',\s*', self.fields.get('suggests', '')) if i ] | ||
self.fields['enhances'] = [ i.strip() for i in re.split(',\s*', self.fields.get('enhances', '')) if i ] | ||
if self_provides: | ||
# add self provides entry | ||
self.fields['provides'].append('%s (= %s)' % (self.name(), '-'.join(versrel))) | ||
|
||
def vercmp(self, debq): | ||
res = cmp(int(self.epoch()), int(debq.epoch())) | ||
if res != 0: | ||
return res | ||
res = DebQuery.debvercmp(self.version(), debq.version()) | ||
if res != None: | ||
return res | ||
res = DebQuery.debvercmp(self.release(), debq.release()) | ||
return res | ||
|
||
def name(self): | ||
return self.fields['package'] | ||
|
||
def version(self): | ||
return self.fields['version'] | ||
|
||
def release(self): | ||
return self.fields['release'] | ||
|
||
def epoch(self): | ||
return self.fields['epoch'] | ||
|
||
def arch(self): | ||
return self.fields['architecture'] | ||
|
||
def description(self): | ||
return self.fields['description'] | ||
|
||
def path(self): | ||
return self.__path | ||
|
||
def provides(self): | ||
return self.fields['provides'] | ||
|
||
def requires(self): | ||
return self.fields['depends'] + self.fields['pre_depends'] | ||
|
||
def conflicts(self): | ||
return self.fields['conflicts'] + self.fields['breaks'] | ||
|
||
def obsoletes(self): | ||
return [] | ||
|
||
def recommends(self): | ||
return self.fields['recommends'] | ||
|
||
def suggests(self): | ||
return self.fields['suggests'] | ||
|
||
def supplements(self): | ||
# a control file has no notion of "supplements" | ||
return [] | ||
|
||
def enhances(self): | ||
return self.fields['enhances'] | ||
|
||
def gettag(self, num): | ||
return self.fields.get(num, None) | ||
|
||
def canonname(self): | ||
return DebQuery.filename(self.name(), self.epoch(), self.version(), self.release(), self.arch()) | ||
|
||
@staticmethod | ||
def query(filename, all_tags = False, *extra_tags): | ||
f = open(filename, 'rb') | ||
debq = DebQuery(f) | ||
debq.read(all_tags, *extra_tags) | ||
f.close() | ||
return debq | ||
|
||
@staticmethod | ||
def debvercmp(ver1, ver2): | ||
""" | ||
implementation of dpkg's version comparison algorithm | ||
""" | ||
# 32 is arbitrary - it is needed for the "longer digit string wins" handling | ||
# (found this nice approach in Build/Deb.pm (build package)) | ||
ver1 = re.sub('(\d+)', lambda m: (32 * '0' + m.group(1))[-32:], ver1) | ||
ver2 = re.sub('(\d+)', lambda m: (32 * '0' + m.group(1))[-32:], ver2) | ||
vers = map(lambda x, y: (x or '', y or ''), ver1, ver2) | ||
for v1, v2 in vers: | ||
if v1 == v2: | ||
continue | ||
if (v1.isalpha() and v2.isalpha()) or (v1.isdigit() and v2.isdigit()): | ||
res = cmp(v1, v2) | ||
if res != 0: | ||
return res | ||
else: | ||
if v1 == '~' or not v1: | ||
return -1 | ||
elif v2 == '~' or not v2: | ||
return 1 | ||
ord1 = ord(v1) | ||
if not (v1.isalpha() or v1.isdigit()): | ||
ord1 += 256 | ||
ord2 = ord(v2) | ||
if not (v2.isalpha() or v2.isdigit()): | ||
ord2 += 256 | ||
if ord1 > ord2: | ||
return 1 | ||
else: | ||
return -1 | ||
return 0 | ||
|
||
@staticmethod | ||
def filename(name, epoch, version, release, arch): | ||
if release: | ||
return '%s_%s-%s_%s.deb' % (name, version, release, arch) | ||
else: | ||
return '%s_%s_%s.deb' % (name, version, arch) | ||
|
||
if __name__ == '__main__': | ||
import sys | ||
try: | ||
debq = DebQuery.query(sys.argv[1]) | ||
except DebError as e: | ||
print(e.msg) | ||
sys.exit(2) | ||
print(debq.name(), debq.version(), debq.release(), debq.arch()) | ||
print(debq.description()) | ||
print('##########') | ||
print('\n'.join(debq.provides())) | ||
print('##########') | ||
print('\n'.join(debq.requires())) |