Skip to content

Commit

Permalink
Huffman
Browse files Browse the repository at this point in the history
  • Loading branch information
fat-crocodile committed May 21, 2014
1 parent 7643f21 commit cd362bc
Show file tree
Hide file tree
Showing 5 changed files with 479 additions and 0 deletions.
165 changes: 165 additions & 0 deletions huffman/bitstream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Read and write bitsreams"""

class InputStream(object):
"""Get iterable of symbols, treat it as bit stream, from lower byte bits to higher"""

def __init__(self, data):
self.data = iter(data)
self.bit_index = 0
self.byte_index = 0
self.byte = None

def get_le(self, n):
"""return n-bit integer, that lay in stream from lower bits to higher (like Little Endian)"""
if n == 0: return 0

source = self.data.next
# in this function while n > 0, byte always is not None
byte = self.byte if (self.byte is not None) else source()
bit = self.bit_index

res = 0
shift = 0

if n >= 8 - bit: # read rest of current byte
res += (ord(byte) >> bit)
shift += 8-bit
n -= 8-bit
bit = 0
byte = source() if n > 0 else None
self.byte_index += 1

while n >= 8: # read by bytes
res += ord(byte) << shift
shift += 8
n -= 8
byte = source() if n > 0 else None
self.byte_index += 1

if n > 0: # read rest of bits
res += (((ord(byte) >> bit) & ((1 << n) - 1)) << shift)
bit += n

self.bit_index = bit
self.byte = byte
return res

def get_be(self, n):
"""return n-bit integer, that lay in stream from higher bits to lower (like Big Endian)"""
if n == 0: return 0

source = self.data.next
byte = self.byte if self.byte is not None else source()
bit = self.bit_index

res = 0

# really slow code
# effective implementation needs to reverse bits in bytes
# the easiest way to do it is some reverse table
# but it make code much more difficult.
# and anyway rewrite it in C will be much more better
while n > 0:
res = res * 2 + ((ord(byte) >> bit) & 1)
bit += 1
n -= 1

if bit == 8:
bit = 0
byte = source() if n > 0 else None
self.byte_index += 1

self.bit_index = bit
self.byte = byte
return res

def finish_byte(self):
"""Skip tail of current byte"""
if self.byte is not None:
self.bit_index = 0
self.byte = None
self.byte_index += 1

def get_byte(self):
"""Return next byte"""
if self.bit_index != 0:
raise Exception('Alinment error')
b = self.data.next()
self.byte_index += 1
return ord(b)

def get_bytes(self, n):
"""Return n next bytes"""
if self.bit_index != 0:
raise Exception('Alinment error')

for i in xrange(n):
yield self.data.next()
self.byte_index += 1

class OutputStream(object):
"""Save bitstream as stream of characters"""
def __init__(self):
self.buffer = []
self.byte = 0
self.bit_index = 0

def put_le(self, v, n):
"""Put n-bit integer l in bit sream, least bits first (like Litte Endian)"""
if n == 0: return
bit = self.bit_index
byte = self.byte
res = []

if n + bit >= 8:
delta = 8 - bit
byte |= (v & ((1 << delta) - 1)) << bit
bit = 0
n -= delta
v <<= delta
res.append(byte)
byte = 0

while n >= 8:
res.append(v & ((1 << 8) - 1))
n -= 8
v <<= 8

if n > 0:
byte |= (v & ((1 << n) - 1)) << bit
bit += n

self.bit_index = bit
self.byte = byte
self.buffer.extend(chr(x) for x in res)

def put_be(self, v, n):
"""Put n-bit integer l in bit sream, highest bits first (like Big Endian)"""
if n == 0: return
bit = self.bit_index
byte = self.byte

while n > 0:
byte |= ((v >> (n-1)) & 1) << bit
bit += 1
if bit == 8:
bit = 0
self.buffer.append(chr(byte))
byte = 0
n -= 1

self.bit_index = bit
self.byte = byte

def finish(self):
"""Finish current byte"""
self.buffer.append(chr(self.byte))
self.byte = 0
self.bit = 0

def get(self):
"""Return result and clear buffer for new data"""
t = self.buffer
self.buffer = []
return t

78 changes: 78 additions & 0 deletions huffman/bounded_huffman.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Implement algorithm from
LAWRENCE L. LARMORE, DANIEL S. HIRSCHBERG
"A Fast Algorithm for Optimal Length-Limited Huffman Codes"
Journal of the Association for Computing Machinery, Vol. 37, No. 3, July 1990"""

def _merge_coins(c1, c2):
"""Merge two coins in one meta-coin. Each coin in pair (weight, {base coin id --> height in tree})"""
w = c1[0] + c2[0]

d = c1[1].copy()
for k,v in c2[1].iteritems():
if k not in d: d[k] = 0
d[k] = max(d[k], v)

return w, d

def _imerge(iter1, iter2, less_then = None):
"""Merge two sorted iterables in one sorted iterable"""
i1,i2 = None, None
iter1, iter2 = iter(iter1), iter(iter2)

while True:
if i1 is None:
try:
i1 = iter1.next()
except StopIteration:
if i2 is not None: yield i2
for x in iter2:
yield x
return
if i2 is None:
try:
i2 = iter2.next()
except StopIteration:
if i1 is not None: yield i1
for x in iter1:
yield x
return

if less_then(i1, i2):
yield i1
i1 = None
else:
yield i2
i2 = None


def make_code(weights, limit):
"""Input:
- symbols weights in alphabetical order (symbols with weight 0 allowed)
- code lenght limit
Output:
- symbols code lenghts in alphabetical order
"""
positioned_weights = sorted((w, n) for n,(_,w) in enumerate(weights) if w > 0)

if len(positioned_weights) > 2**limit:
raise Exception('there are no such code')

coins = []

for level in range(limit, 0, -1):
# current level coins
new_coins = [(w, {i:level}) for w,i in positioned_weights]
# coins, merged from previous level coins
prev_coins = [_merge_coins(coins[2*i], coins[2*i+1]) for i in range(len(coins) / 2)]
# merge lists
coins = list(_imerge(prev_coins, new_coins, lambda x,y: x[0] < y[0]))

res = [(s,0) for s,_ in weights]

for i in range(len(positioned_weights) * 2 - 2):
for k,v in coins[i][1].items():
if res[k][1] < v: res[k] = (res[k][0], v)

return res

89 changes: 89 additions & 0 deletions huffman/coder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Decode/encode Huffman-encoded symbols from/to bitstream"""

from collections import defaultdict, namedtuple

_HuffmanRecord = namedtuple("_HuffmanRecord", "length start_code end_code symbols")

def _tables_from_lenghts(lens):
"""Make canonical huffman code tables from list of code leghts.
Input: list of pairs (symbol, code length), ordered by alphabet
Output: list of tuples (length, start_code, end_code, [symbols in alphabet order])
Tuples are sorted by length"""

# make dict {len --> [list of symbols in alphabet order]}
by_len = defaultdict(list)
for a, l in lens:
if l > 0:
by_len[l].append(a)

# lengths that exists in code
actual_lens = by_len.keys()
actual_lens.sort()

tables = []
code = 0
prev_len = 0

# fill table
for l in actual_lens:
code *= 2**(l-prev_len)
tables.append(_HuffmanRecord(l, code, code + len(by_len[l]), by_len[l]))
code += len(by_len[l])
prev_len = l

return tables

class Decoder(object):
"""Decode symbol from bitsream"""

def __init__(self, lens):
if isinstance(lens[0], int):
lens = [(n, l) for n,l in enumerate(lens)]

self.tables = _tables_from_lenghts(lens)

def get(self, bs):
"""Read next Huffman-encoded symbol from bitstream bs, return decoded symbol"""
code = 0 # readed code
index = 0 # current huffman record
readed = 0 # number of bits already readed

while index < len(self.tables):
record = self.tables[index]

# read additional bits
delta = record.length - readed
i = bs.get_be(delta)
code += i
readed += delta

if code < record.end_code: # match
return record.symbols[code - record.start_code]

# looking for a record that seems good
while index < len(self.tables) and self.tables[index].end_code <= code:
index += 1
code <<= (self.tables[index].length - self.tables[index-1].length)

raise Exception('unknown code: %d, %d' % (code, readed))

class Encoder(object):
"""Put symbol into bitstream"""

def __init__(self, lens):
if isinstance(lens[0], int):
lens = [(n, l) for n,l in enumerate(lens)]

tables = _tables_from_lenghts(lens)
self.code = {}

for l, start, _, symbols in tables:
for i, c in enumerate(symbols):
self.code[c] = (start + i, l)

def put(self, bs, c):
"""Put Huffman-encoded symbol c into bitstream bs"""
v, n = self.code[c]
bs.put_be(v, n)


34 changes: 34 additions & 0 deletions huffman/huffman.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Implement classical Huffman alghorithm"""

def make_code(weights):
"""Input:
- list of pairs (symbol, weight), simbols with weight 0 are allowed
Output:
- list of pairs (symbol, code lenght) in the same order"""

# each items is:
# (weight, [(symbol1, len1), (symbol2, len2), ... ])
codes = [(w, [(i, 0)]) for i,(_,w) in enumerate(weights) if w > 0]
codes.sort(key=lambda x: x[0], reverse=True)

while len(codes) > 1:
# get two least popular symbols
m1 = codes.pop()
m2 = codes.pop()

# merge them in one
s = (m1[0] + m2[0], [(i, l+1) for i,l in m1[1] + m2[1]])

# insert new meta-symbol in list
i = len(codes)
while i > 0 and codes[i-1][0] < s[0]: i -= 1

codes.insert(i, s)

# now all pairs (symbol, code_len) are contained in codes[0][1]
res = [(s,0) for s,_ in weights]

for i,l in codes[0][1]:
res[i] = (res[i][0], l)

return res
Loading

0 comments on commit cd362bc

Please sign in to comment.