In [1]:
from collections import deque
from bitarray import bitarray
from unidecode import unidecode
import os

In [15]:
# words only with ASCII characters
class StaticHuffman:
    
    class _Node:
        def __init__(self, weight, left=None, right=None, parent=None, letter=None):
            self.left = left
            self.right = right
            self.letter = letter
            self.parent = parent
            self.weight = weight
        
    def __init__(self, text=None, file_name=None):
        self.root = None
        self.leafs = dict()
        self.encodings = dict()
        
        if text:
            self._build(unidecode(text))
            if file_name:
                self.encode_to_file(text, file_name)
            
    def _cache_leaf_representations(self):
        for (l, n) in self.leafs.items():
            e = []
            node = n
            while node.parent is not None:
                if node is node.parent.left:
                    e.append(False)
                else:
                    e.append(True)
                node = node.parent
            e = bitarray(reversed(e))
            self.encodings[l] = e
        
    def _build(self, text):
        alphabet_statistics = dict()
        
        for a in text:
            if a not in alphabet_statistics:
                alphabet_statistics[a] = 1
            else:
                alphabet_statistics[a] += 1
        leafs = [self._Node(w, letter=l) for (l, w) in alphabet_statistics.items()]
        self.leafs = {l.letter:l for l in leafs}
        leafs.sort(key=lambda x:x.weight)
        left = deque(leafs)
        right = deque()
        
        def get_min(left, right):
            if len(left) == 0:
                return right.popleft()
            if len(right) == 0:
                return left.popleft()
            
            if left[0].weight <= right[0].weight:
                return left.popleft()
            else:
                return right.popleft()
            
        # Build Huffman tree
        while(len(left) + len(right) > 1):
            n1 = get_min(left, right)
            n2 = get_min(left, right)
            top = self._Node(n1.weight + n2.weight, left=n1, right=n2)
            n1.parent = top
            n2.parent = top
            right.append(top)
        # Create encodings for each leaf
        self._cache_leaf_representations()
        self.root = right.popleft()
    
    # depricated
    def _encode(self, c):
        return self.leafs[c]
    
    def _encode_tree(self):
        #raise NotImplementedError()
        s = [self.root]
        encoded_tree = bitarray()
        
        while s:
            node = s.pop()
            if node.letter is None:
                encoded_tree.append(False)
                s.append(node.right)
                s.append(node.left)
            else:
                encoded_tree.append(True)
                encoded_tree.frombytes(node.letter.encode())
        return encoded_tree
    
    def _decode_tree(self, encoded_data):
        if self.root is not None:
            raise RuntimeError('Tree should be empty')
        
        offset = 0
        
        # skip padding
        while encoded_data[offset]:
            offset += 1
        
        self.root = self._Node(0)
        s = [self.root]
        #offset+=1
        while s:
            node = s.pop()
            if encoded_data[offset]:
                node.letter = encoded_data[offset+1:offset+9].tobytes().decode()
                self.leafs[node.letter] = node
                offset += 9
            else:
                left = self._Node(0, parent=node)
                right = self._Node(0, parent=node)
                
                node.left = left
                node.right = right
                
                s.append(right)
                s.append(left)
                offset += 1
                
        self._cache_leaf_representations()
        
        return encoded_data[offset:]
    
    # holds data in folowing wormat: 111..+tree+encoded_text, where 111.. is padding
    def encode_to_file(self, text, file_name):
        if file_name is None:
            raise RuntimeError('Specify file_name to be the name of file to write to')
        if self.root is None:
            raise RuntimeError('Tre should be builded beforehand')
            
        with open(file_name, 'wb') as f:
            tree_encoded = self._encode_tree()
            encoded_text = bitarray()
            encoded_text.encode(self.encodings, text)
            
            padding = 8 - (tree_encoded.length() + encoded_text.length()) % 8
            if padding != 8:
                tree_encoded = bitarray([True]*(padding)) + tree_encoded
            # danger
            (tree_encoded + encoded_text).tofile(f)
    
    @staticmethod
    def decode(file):
        with open(file, 'rb') as f:
            encoded_data = bitarray()
            encoded_data.fromfile(f)
            
            hc = StaticHuffman()
            encoded_data = hc._decode_tree(encoded_data)
            result = encoded_data.decode(hc.encodings)
            return ''.join(result)

In [16]:
h = StaticHuffman("aabc", file_name='test_file')

In [17]:
StaticHuffman.decode('test_file')

'aabc'

## Tests

In [18]:
def test_on_file(file_name):
    encoded_file = file_name + '_encoded'
    with open(file_name) as f:
        text = f.read()
        
        print('Encode time:')
        %timeit h = StaticHuffman(text, file_name=encoded_file)
        h = StaticHuffman(text, file_name=encoded_file)
        print('Decode time: ')
        %timeit StaticHuffman.decode(encoded_file)
        
        print(f'Are equal? {StaticHuffman.decode(encoded_file) == text}')
        
        print(f'Compresion factor is {1 - os.path.getsize(encoded_file) / os.path.getsize(file_name)}')

In [19]:
test_on_file('1kB_file')

Encode time:
515 µs ± 8.06 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Decode time: 
269 µs ± 14.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Are equal? True
Compresion factor is 0.419103313840156


In [20]:
test_on_file('10kB_file')

Encode time:
2.24 ms ± 109 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Decode time: 
695 µs ± 9.09 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Are equal? True
Compresion factor is 0.46096943741483354


In [21]:
test_on_file('100kB_file')

Encode time:
17.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Decode time: 
5.08 ms ± 60.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Are equal? True
Compresion factor is 0.4645448746062757


In [22]:
test_on_file('1MB_file')

Encode time:
168 ms ± 5.1 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Decode time: 
49.8 ms ± 1.39 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Are equal? True
Compresion factor is 0.46557590762036327


## Adaptive Huffman 

In [23]:
class AdaptiveHuffman:
    class _Node:
        def __init__(self, weight, left=None, right=None, parent=None, letter=None):
            self.left = left
            self.right = right
            self.letter = letter
            self.parent = parent
            self.weight = weight
        
        def __repr__(self):
            return self.letter if self.letter else 'None'
    
    def __init__(self):
        self.root = None
        self.NYT = self._Node(0, letter='NYT')
        self.nodes = []
        self.leafs = dict()
        self.leafs['NYT'] = self.NYT
    
    def find_leader(self, w):
        for i in self.nodes:
            if i.weight == w:
                return i
        
    def swap(self, node1, node2):
        idx1 = self.nodes.index(node1)
        idx2 = self.nodes.index(node2)
        
        self.nodes[idx1], self.nodes[idx2] = self.nodes[idx2], self.nodes[idx1]
        
        
        tmp = node1.parent
        node1.parent = node2.parent
        node2.parent = tmp
        
        def swap_branch(node1, node2):
            if node1.parent.left is node2:
                node1.parent.left = node1
            else:
                node1.parent.right = node1
        swap_branch(node1, node2)
        swap_branch(node2, node1)
        
        
    def update(self, letter):
        node = None
        if letter not in self.leafs:
            intermediate = self._Node(1, parent=self.NYT.parent)
            
            leaf = self._Node(1, parent=intermediate, letter=letter)
            self.leafs[leaf.letter] = leaf
            
            intermediate.right = leaf
            intermediate.left = self.NYT
            self.NYT.parent = intermediate
            
            if self.root is None:
                self.root = intermediate
            else:
                intermediate.parent.left = intermediate
            
            self.nodes.append(intermediate)
            self.nodes.append(leaf)
            
            node = intermediate.parent
        else:
            node = self.leafs[letter]
            
        while node is not None:
            leader = self.find_leader(node.weight)
            
            if (leader is not node) and (leader.parent is not node) and (node.parent is not leader):
                self.swap(leader, node)
            
            node.weight += 1
            node = node.parent
    
    def encode_leaf(self, letter):
        e = []
        node = self.leafs[letter]
        while node.parent is not None:
            if node is node.parent.left:
                e.append(False)
            else:
                e.append(True)
            node = node.parent
        return bitarray(reversed(e))
        
    def encode(self, text, file_name):
        encoded_text = bitarray()
        text = unidecode(text)
        for i in text:
            if self.root == None:
                l = bitarray()
                l.frombytes(i.encode())
                encoded_text += l
            else:
                if i in self.leafs:
                    encoded_text += self.encode_leaf(i)
                else:
                    encoded_text += self.encode_leaf('NYT')
                    l = bitarray()
                    l.frombytes(i.encode())
                    encoded_text += l
            self.update(i)
        
        with open(file_name, 'wb') as f:
            padding = 8 - encoded_text.length() % 8
            
            if padding != 8:
                padding_bytes = bitarray()
                nyt_bytes = self.encode_leaf('NYT')
                while padding_bytes.length() != padding:
                    if padding_bytes.length() + nyt_bytes.length() <= padding:
                        padding_bytes += nyt_bytes
                    else:
                        padding_bytes += nyt_bytes[:(padding - padding_bytes.length())]
                encoded_text += padding_bytes
            
            encoded_text.tofile(f)
            #print(encoded_text)
    
    def decode(self, file_name):
        with open(file_name, 'rb') as f:
            result = []
            
            encoded_text = bitarray()
            encoded_text.fromfile(f)
            
            l = encoded_text[:8]
            result.append(l.tobytes().decode())
            
            offset = 8
            self.update(l.tobytes().decode())
            
            node = self.root
            
            while offset < encoded_text.length():
                while node.letter is None and offset < encoded_text.length():
                    if encoded_text[offset]:
                        node = node.right
                    else:
                        node = node.left
                    offset += 1
                
                if node.letter is None:
                    break
                
                if node.letter == 'NYT':
                    if offset + 8 <= encoded_text.length():
                        l = encoded_text[offset:offset+8].tobytes().decode()
                        offset += 8
                    else:
                        break
                else:
                    l = node.letter
                
                node = self.root
                
                result.append(l)
                self.update(l)
                
            return ''.join(result)

In [24]:
ah = AdaptiveHuffman()
ah.encode('aabc', file_name='test_file')
ah2 = AdaptiveHuffman()
ah2.decode('test_file')

'aabc'

In [26]:
def test_adaptive_on_file(file_name):
    encoded_file = file_name + '_encoded'
    with open(file_name) as f:
        text = f.read()
        
        print('Encode time:')
        h = AdaptiveHuffman()
        %timeit AdaptiveHuffman().encode(text, file_name=encoded_file)
        h.encode(text, file_name=encoded_file)
        print('Decode time: ')
        %timeit AdaptiveHuffman().decode(encoded_file)
        
        print(f'Are equal? {AdaptiveHuffman().decode(encoded_file) == text}')
        
        print(f'Compresion factor is {1 - os.path.getsize(encoded_file) / os.path.getsize(file_name)}')

In [27]:
test_adaptive_on_file('1kB_file')

Encode time:
7.64 ms ± 104 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Decode time: 
7.31 ms ± 453 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Are equal? True
Compresion factor is 0.42105263157894735


In [28]:
test_adaptive_on_file('10kB_file')

Encode time:
73.2 ms ± 3.77 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Decode time: 
65.2 ms ± 2.88 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Are equal? True
Compresion factor is 0.46048277204594124


In [29]:
test_adaptive_on_file('100kB_file')

Encode time:
641 ms ± 6.66 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Decode time: 
608 ms ± 32.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Are equal? True
Compresion factor is 0.4644751006738168


In [30]:
test_adaptive_on_file('1MB_file')

Encode time:
6.55 s ± 366 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Decode time: 
5.94 s ± 150 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Are equal? True
Compresion factor is 0.4655499941196287
