Huffman encode the three files yielded by high level compression pipeline

Huffman

In [1]:
# Code retrieved from https://www.geeksforgeeks.org/huffman-coding-in-python/

# Python program for Huffman Coding
import heapq
import collections
import pickle
import os

class Node:
    def __init__(self, symbol=None, frequency=None):
        self.symbol = symbol
        self.frequency = frequency
        self.left = None
        self.right = None

    def __lt__(self, other):
        return self.frequency < other.frequency

def build_huffman_tree(freq_dict):
  
    # Create a priority queue of nodes
    priority_queue = [Node(char, f) for char, f in freq_dict.items()]
    heapq.heapify(priority_queue)

    # Build the Huffman tree
    while len(priority_queue) > 1:
        left_child = heapq.heappop(priority_queue)
        right_child = heapq.heappop(priority_queue)
        merged_node = Node(frequency=left_child.frequency + right_child.frequency)
        merged_node.left = left_child
        merged_node.right = right_child
        heapq.heappush(priority_queue, merged_node)

    return priority_queue[0]

def generate_huffman_codes(node, code="", huffman_codes={}):
    if node is not None:
        if node.symbol is not None:
            huffman_codes[node.symbol] = code
        generate_huffman_codes(node.left, code + "0", huffman_codes)
        generate_huffman_codes(node.right, code + "1", huffman_codes)

    return huffman_codes

def print_tree(node, prefix="", is_left=True):
    if node is not None:
        s = prefix + ("├─0 " if is_left else "└─1 ")
        if node.symbol is not None:
            s += str(node.symbol)
        print(s)
        if node.left or node.right:
            print_tree(node.left, prefix + ("│   " if is_left else "    "), True)
            print_tree(node.right, prefix + ("│   " if is_left else "    "), False)

Huffman encode the high-level compressed files

In [2]:
sample = 1
k = 30
target = 'deltas'

path = 'compressed/neural/sample'+str(sample)+'/k'+str(k)
with open(path + '/high_level_compress/'+target+'.csv', newline='') as f:
    lines = f.readlines()

    content = []
    for l in lines:
        content += list(map(int, l.replace('\r', '').replace('\n', '').split(',')))
        content.append('\n')
    content = content[:-1]

    root = build_huffman_tree(collections.Counter(content))
    huffman_codes = generate_huffman_codes(root)

    # Pickle the array and save to a file
    filename = path+'/low_level_compress/'+target+'_huffman_tree'
    with open(filename, 'wb') as file:
        pickle.dump((root, len(content)), file)
    file.close()
    
    with open(path + '/low_level_compress/huffman_encoded_'+target, 'wb') as file:
        leftover = ''
        for index, l in enumerate(lines):
            elems = l.replace('\r', '').replace('\n', '').split(',')
            elems = list(map(int, elems))
            if index < len(lines) - 1:
                elems.append('\n')
            binary_string = leftover
            for e in elems:
                binary_string += huffman_codes[e]

            main_part = binary_string[:(8 * (len(binary_string) // 8))]
            leftover = binary_string[(8 * (len(binary_string) // 8)):]

            for i in range(0, len(main_part), 8):
                byte_string = main_part[i:i + 8]
                byte_value = int(byte_string, 2)
                file.write(bytes([byte_value]))

        if leftover:
            leftover = leftover.ljust(8, '0')
            byte_value = int(leftover, 2)
            file.write(bytes([byte_value]))
            
    file.close()
    

Decode the Huffman-encoded files for verification

In [3]:
path = 'compressed/neural/sample'+str(sample)+'/k'+str(k) + '/low_level_compress/'

with open(path + target + '_huffman_tree', 'rb') as file:
    # Load the pickled object from the file
    root2, length = pickle.load(file)
file.close()

decoded_lines = []
count = 0
with open(path + 'huffman_encoded_' + target, 'rb') as file:

    decoded_line = []
    cur = root2
    n = 0
    while byte := file.read(1):
        for i in range(7,-1,-1):
            if count == length:
                break
            sym = cur.symbol
            if sym is not None:
                count += 1
                if sym != '\n':
                    decoded_line.append(sym)
                else:
                    decoded_lines.append(decoded_line)
                    decoded_line = []
                cur = root2
            if (byte[0] >> i) & 1 == 0:
                cur = cur.left
            else:
                cur = cur.right
    if decoded_line:
        decoded_lines.append(decoded_line)
file.close()

result = 'ENCODED CORRECTLY!'
for i,l in enumerate(lines):
    l = list(map(int, l.split(',')))

    if (l != decoded_lines[i]):
        result = 'ENCODED WRONG!!'
        break
print(result)

ENCODED CORRECTLY!


In [163]:
print(list(map(int, lines[-1].split(','))))
print(decoded_lines[-1])

[1, 1, 1, 2, 1, 1, 1, 1, 1, 6, 1, 4, 1, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 3, 1, 3, 1, 1, 1, 1, 4, 1, 10, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 4, 1, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 2, 1, 1, 1, 2, 3, 24, 36, 4, 124, 20, 21, 1, 1, 1, 1, 2, 1, 1, 4, 2, 8, 9, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 7, 3, 1, 1, 4, 1, 1, 6, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 2, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 3, 1, 1, 2, 1, 3, 1, 2, 1, 1, 2, 1, 7, 3, 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 4, 1, 3, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 4, 2, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 7, 1, 3, 1, 1, 1, 2, 1, 1, 4, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 5, 1, 1, 17, 1, 1, 1, 3, 1, 1, 1, 1, 1, 1, 3, 1, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 6, 1, 1, 1, 1, 1, 1, 1, 2, 1, 2, 1, 1, 1, 2, 1, 1, 1, 3, 1, 1, 1, 1, 1, 1, 1, 7, 5, 2, 3, 9, 13, 1, 17, 2, 1, 3, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 2, 1, 2, 1, 1, 7,