/
encoders.py
144 lines (131 loc) · 4.35 KB
/
encoders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
'''
Created on 2015/11/26
:author: hubo
'''
import zlib
import codecs
from .gzipheader import header, tail
import time
def unicode_encoder(encoding, errors = 'strict'):
return codecs.getincrementalencoder(encoding)(errors).encode
def unicode_decoder(encoding, errors = 'strict'):
return codecs.getincrementaldecoder(encoding)(errors).decode
def donothing_encoder(x, iseof):
return x
if str is bytes:
def str_encoder(encoding, errors = 'strict'):
return donothing_encoder
def str_decoder(encoding, errors = 'strict'):
return donothing_encoder
else:
str_encoder = unicode_encoder
str_decoder = unicode_decoder
def deflate_encoder(level = None):
if level is None:
obj = zlib.compressobj()
else:
obj = zlib.compressobj(level)
def enc(data, final):
ret = obj.compress(data)
if final:
ret += obj.flush()
return ret
return enc
def deflate_decoder(wbits = None):
if wbits is None:
obj = zlib.decompressobj()
else:
obj = zlib.decompressobj(wbits)
def enc(data, final):
ret = obj.decompress(data)
if final:
ret += obj.flush()
return ret
return enc
def _tobytes(s, encoding = 'utf-8'):
if s is bytes:
return s
else:
return s.encode(encoding)
class GzipEncoder(object):
def __init__(self, fname = 'tmp', level = 9):
self.crc = 0
self.size = 0
self.fname = _tobytes(fname)
self.level = level
self.writeheader = False
def enc(self, data, final):
buf = []
if not self.writeheader:
h = header.new()
h.mtime = int(time.time())
h.fname = self.fname
buf.append(header.tobytes(h))
self.compobj = zlib.compressobj(self.level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL)
self.writeheader = True
buf.append(self.compobj.compress(data))
self.crc = zlib.crc32(data, self.crc) & 0xffffffff
self.size += len(data)
if final:
buf.append(self.compobj.flush())
t = tail.new()
t.crc32 = self.crc
t.isize = self.size
buf.append(tail.tobytes(t))
return b''.join(buf)
def gzip_encoder(fname = 'tmp', level = 9):
return GzipEncoder(fname, level).enc
class GzipDecoder(object):
def __init__(self):
self.crc = 0
self.size = 0
self.readheader = True
self.readtail = False
self.buffer = b''
def enc(self, data, final):
buf = []
self.buffer += data
while True:
if self.readheader:
r = header.parse(self.buffer)
if r is None:
break
h, size = r
if h.id1 != 0x1f or h.id2 != 0x8b or h.cm != 8:
raise ValueError('Unsupported format')
self.decompobj = zlib.decompressobj(-zlib.MAX_WBITS)
self.buffer = self.buffer[size:]
self.readheader = False
elif self.readtail:
r = tail.parse(self.buffer)
if r is None:
break
t, size = r
if t.crc32 != self.crc or t.isize != self.size:
raise ValueError('Checksum not met')
self.crc = 0
self.size = 0
self.buffer = self.buffer[size:]
self.readtail = False
self.readheader = True
else:
newdata = self.decompobj.decompress(self.buffer)
self.crc = zlib.crc32(newdata, self.crc) & 0xFFFFFFFF
self.size += len(newdata)
buf.append(newdata)
if self.decompobj.unused_data:
self.buffer = self.decompobj.unused_data
newdata = self.decompobj.flush()
self.crc = zlib.crc32(newdata, self.crc) & 0xFFFFFFFF
self.size += len(newdata)
buf.append(newdata)
self.readtail = True
else:
self.buffer = b''
break
if final:
if self.buffer or not self.readheader:
raise ValueError('Unexpected EOF')
return b''.join(buf)
def gzip_decoder():
return GzipDecoder().enc