In [1]:
from time import time
from collections import defaultdict
from functools import cmp_to_key
from operator import itemgetter
import array
from sys import byteorder
import struct

import numpy as np
import pandas as pd

# Utils

In [2]:
def read_file(name):
    root_path = './data/lesson01/'
    extension = '.txt'

#     file = open(root_path + name + extension, encoding='utf-8-sig')
    file = open(root_path + name + extension, 'rb')

    text = file.read()
    file.close()
    
    return text

In [3]:
def get_entropy(iterable):
    frequency = defaultdict(int)
    for item in iterable:
        frequency[item] += 1
    
    num_items = len(iterable)
    probability = dict((item, freq/num_items) for item, freq in frequency.items())
    
    return -sum(prob*np.log2(prob) for prob in probability.values())

# Read data

In [4]:
labels = ['czech', 'german', 'english', 'french', 'hungarian']
datasets = [read_file(label) for label in labels]

# BWT

In [5]:
class BwtEncoder(object):
    def __init__(self, text):
        self.text = text
    
        # compare rotated strings by chars
        # when comparing whole rotated string, python runs out of memory
        # (sorting czech dataset in C# took 21ms compared to python's 3.81s)
        def compare(a, b):
            for i in range(len(text)):
                c1, c2 = text[(a+i)%len(text)], text[(b+i)%len(text)]
                if c1 < c2:
                    return -1
                if c1 > c2:
                    return 1

            return 0
        
        self.T = sorted(range(len(text)), key=cmp_to_key(compare))
    
    @property
    def F(self):
        return [self.text[i] for i in self.T]
    
    @property
    def L(self):
        return [self.text[i-1%len(self.text)] for i in self.T]
    
    @property
    def I(self):
        return self.T.index(0)

In [6]:
class BwtDecoder(object):
    def __init__(self, L, I):
        L_sorted = sorted(zip(L, range(len(L))), key=itemgetter(0))
        reversed_T = [i for char, i in L_sorted]
        
        T = [0]*len(reversed_T)
        for i in range(len(reversed_T)):
            T[reversed_T[i]] = i
            
        self.I = I
        self.T = T
        self.reversed_T = reversed_T    
        self.L = L
        
    @property
    def F(self):
        f = ['']*len(self.L)
        for i in range(len(self.L)):
            f[i] = self.L[self.reversed_T[i]]
        return f
    
    def reconstruct(self):
        n = len(self.T)
        S = ['']*n
        
        t_prev = self.I
        for i in range(n):
            S[n-1-i] = self.L[t_prev]
            t_prev = self.T[t_prev]
        return S

In [7]:
encoder = BwtEncoder('swiss miss')

In [8]:
decoder = BwtDecoder(encoder.L, encoder.I)

In [9]:
''.join(decoder.reconstruct())

'swiss miss'

In [10]:
%time encoder = BwtEncoder(datasets[3])

Wall time: 5.16 s


In [11]:
decoder = BwtDecoder(encoder.L, encoder.I)

In [12]:
array.array('B', decoder.reconstruct()).tostring() == datasets[0]

False

# Move to front

In [13]:
class MoveToFrontEncoder(object):
    def __init__(self, alphabet=list(range(256))):
        self.alphabet = alphabet
        
    def encode(self, values):
        queue = self.alphabet.copy()
        
        for value in values:
            i = queue.index(value)
            
            char = queue.pop(i)
            queue.insert(0, char)
            
            yield i
            
# class MoveToFrontUtf8Encoder(object):
#     def encode(self, values):
#         queue = []
        
#         for value in values:
#             try:
#                 i = queue.index(value)
#                 queue.pop(i)
#             except ValueError:
#                 i = int.from_bytes(value.encode(), byteorder=byteorder)
            
#             queue.insert(0, value)
            
#             yield i

In [14]:
class MoveToFrontDecoder(object):
    def __init__(self, alphabet=list(range(256))):
        self.alphabet = alphabet
        
    def decode(self, values):
        queue = self.alphabet.copy()
        
        for i in values:
            char = queue[i]
            if i > 0:
                queue.pop(i)
                queue.insert(0, char)

            yield char
            
# class MoveToFrontUtf8Decoder(object):
#     def decode(self, values):
#         queue = []
        
#         for i in values:
#             try:
#                 char = queue[i]
#                 if i > 0:
#                     queue.pop(i)
#             except IndexError:
#                 if i < 256:
#                     char = chr(i)
#                 else:
#                     char = struct.pack('H', i).decode('utf-8')
#             if i > 0:
#                 queue.insert(0, char)

#             yield char

In [15]:
enc = MoveToFrontEncoder(['a', 'b', 'c', 'd', 'm', 'n', 'o', 'p'])

In [16]:
encoded = list(enc.encode('abcddcbamnopponm'))
encoded

[0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3]

In [17]:
dec = MoveToFrontDecoder(['a', 'b', 'c', 'd', 'm', 'n', 'o', 'p'])

In [18]:
decoded = ''.join(dec.decode([0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3]))
decoded 

'abcddcbamnopponm'

In [19]:
decoded == 'abcddcbamnopponm'

True

# Results

In [20]:
df = pd.DataFrame(columns=['before', 'after', 'decompression_ok', 'time'], index=labels)

for label, text in zip(labels, datasets):
    t0 = time()
    H_before = get_entropy(text)

    bwt_encoder = BwtEncoder(text)
    mtf_encoder = MoveToFrontEncoder()
    encoded = list(mtf_encoder.encode(bwt_encoder.L))

    H_after = get_entropy(encoded)

    mtf_decoder = MoveToFrontDecoder()
    decoded_L = list(mtf_decoder.decode(encoded))
    bwt_decoder = BwtDecoder(decoded_L, bwt_encoder.I)
    decoded_text = array.array('B', bwt_decoder.reconstruct()).tostring()
    
    is_ok = text == decoded_text
    
    df.loc[label] = [H_before, H_after, is_ok, time()-t0]

In [21]:
df

Unnamed: 0,before,after,decompression_ok,time
czech,5.20571,2.86037,True,6.22195
german,4.85786,2.74431,True,8.76844
english,4.70253,2.72626,True,6.66557
french,4.99521,2.63374,True,5.89897
hungarian,4.96675,3.01661,True,6.84108
