forked from dask/dask
-
Notifications
You must be signed in to change notification settings - Fork 0
/
compression.py
99 lines (74 loc) · 2.7 KB
/
compression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from __future__ import print_function, division, absolute_import
import bz2
import sys
import zlib
from toolz import identity
from ..compatibility import gzip_compress, gzip_decompress, GzipFile
from ..utils import ignoring
def noop_file(file, **kwargs):
return file
compress = {'gzip': gzip_compress,
'zlib': zlib.compress,
'bz2': bz2.compress,
None: identity}
decompress = {'gzip': gzip_decompress,
'zlib': zlib.decompress,
'bz2': bz2.decompress,
None: identity}
files = {'gzip': lambda f, **kwargs: GzipFile(fileobj=f, **kwargs),
None: noop_file}
seekable_files = {None: noop_file}
with ignoring(ImportError):
import snappy
compress['snappy'] = snappy.compress
decompress['snappy'] = snappy.decompress
with ignoring(ImportError):
import lz4
compress['lz4'] = lz4.LZ4_compress
decompress['lz4'] = lz4.LZ4_uncompress
with ignoring(ImportError):
from ..compatibility import LZMAFile, lzma_compress, lzma_decompress
compress['xz'] = lzma_compress
decompress['xz'] = lzma_decompress
files['xz'] = LZMAFile
# Seekable xz files actually tend to scan whole file - see `get_xz_blocks`
# with ignoring(ImportError):
# import lzma
# seekable_files['xz'] = lzma.LZMAFile
#
# with ignoring(ImportError):
# import lzmaffi
# seekable_files['xz'] = lzmaffi.LZMAFile
if sys.version_info[0] >= 3:
import bz2
files['bz2'] = bz2.BZ2File
def get_xz_blocks(fp):
from lzmaffi import (STREAM_HEADER_SIZE, decode_stream_footer,
decode_index, LZMAError)
fp.seek(0, 2)
def _peek(f, size):
data = f.read(size)
f.seek(-size, 1)
return data
if fp.tell() < 2 * STREAM_HEADER_SIZE:
raise LZMAError("file too small")
# read stream paddings (4 bytes each)
fp.seek(-4, 1)
padding = 0
while _peek(fp, 4) == b'\x00\x00\x00\x00':
fp.seek(-4, 1)
padding += 4
fp.seek(-STREAM_HEADER_SIZE + 4, 1)
stream_flags = decode_stream_footer(_peek(fp, STREAM_HEADER_SIZE))
fp.seek(-stream_flags.backward_size, 1)
index = decode_index(_peek(fp, stream_flags.backward_size), padding)
return {'offsets': [b.compressed_file_offset for i, b in index],
'lengths': [b.unpadded_size for i, b in index],
'check': stream_flags.check}
def xz_decompress(data, check):
from lzmaffi import decode_block_header_size, LZMADecompressor, FORMAT_BLOCK
hsize = decode_block_header_size(data[:1])
header = data[:hsize]
dc = LZMADecompressor(format=FORMAT_BLOCK, header=header,
unpadded_size=len(data), check=check)
return dc.decompress(data[len(header):])