In [1]:
import heapq
from collections import defaultdict

In [2]:
class Node:
    def __init__(self, char, freq):
        self.char = char
        self.freq = freq
        self.left = None
        self.right = None
    
    def __lt__(self, other):
        return self.freq < other.freq

In [3]:
def build_frequency_dict(text):
    frequency = defaultdict(int)
    for char in text:
        frequency[char] += 1
    return frequency

In [4]:
def build_huffman_tree(frequency):
    heap = [Node(char, freq) for char, freq in frequency.items()]
    heapq.heapify(heap)
    
    while len(heap) > 1:
        left = heapq.heappop(heap)
        right = heapq.heappop(heap)
        
        internal = Node(None, left.freq + right.freq)
        internal.left = left
        internal.right = right
        
        heapq.heappush(heap, internal)
    
    return heap[0]

In [5]:
def generate_codes(root, current_code, codes):
    if root is None:
        return
    
    if root.char is not None:
        codes[root.char] = current_code
        return
    
    generate_codes(root.left, current_code + "0", codes)
    generate_codes(root.right, current_code + "1", codes)

In [6]:
def compress(text):
    frequency = build_frequency_dict(text)
    root = build_huffman_tree(frequency)
    codes = {}
    generate_codes(root, "", codes)
    
    encoded_text = "".join(codes[char] for char in text)
    padding = 8 - (len(encoded_text) % 8)
    encoded_text += "0" * padding
    
    compressed = bytearray()
    for i in range(0, len(encoded_text), 8):
        byte = encoded_text[i:i+8]
        compressed.append(int(byte, 2))
    
    return compressed, codes, padding

In [7]:
def decompress(compressed, codes, padding):
    reversed_codes = {code: char for char, code in codes.items()}
    binary = "".join(format(byte, '08b') for byte in compressed)
    binary = binary[:-padding]
    
    decoded_text = ""
    current_code = ""
    for bit in binary:
        current_code += bit
        if current_code in reversed_codes:
            decoded_text += reversed_codes[current_code]
            current_code = ""
    
    return decoded_text

In [8]:
# Example usage and testing
def main():
    original_text = "this is an example of a huffman tree"
    print(f"Original text: {original_text}")
    
    compressed, codes, padding = compress(original_text)
    print(f"Compressed size: {len(compressed)} bytes")
    print(f"Compression ratio: {len(compressed) / len(original_text.encode('utf-8')):.2f}")
    
    decompressed_text = decompress(compressed, codes, padding)
    print(f"Decompressed text: {decompressed_text}")
    print(f"Original and decompressed texts match: {original_text == decompressed_text}")

if __name__ == "__main__":
    main()

Original text: this is an example of a huffman tree
Compressed size: 17 bytes
Compression ratio: 0.47
Decompressed text: this is an example of a huffman tree
Original and decompressed texts match: True


### File

In [9]:
import heapq
from collections import defaultdict
import pickle
import os

In [10]:
class Node:
    def __init__(self, char, freq):
        self.char = char
        self.freq = freq
        self.left = None
        self.right = None
    
    def __lt__(self, other):
        return self.freq < other.freq

In [11]:
def build_frequency_dict(filename):
    frequency = defaultdict(int)
    with open(filename, 'rb') as file:
        while True:
            chunk = file.read(8192)  # Read in chunks
            if not chunk:
                break
            for byte in chunk:
                frequency[byte] += 1
    return frequency

In [12]:
def build_huffman_tree(frequency):
    heap = [Node(char, freq) for char, freq in frequency.items()]
    heapq.heapify(heap)
    
    while len(heap) > 1:
        left = heapq.heappop(heap)
        right = heapq.heappop(heap)
        
        internal = Node(None, left.freq + right.freq)
        internal.left = left
        internal.right = right
        
        heapq.heappush(heap, internal)
    
    return heap[0]

In [13]:
def generate_codes(root, current_code, codes):
    if root is None:
        return
    
    if root.char is not None:
        codes[root.char] = current_code
        return
    
    generate_codes(root.left, current_code + "0", codes)
    generate_codes(root.right, current_code + "1", codes)

In [14]:
def compress(input_file, output_file):
    frequency = build_frequency_dict(input_file)
    root = build_huffman_tree(frequency)
    codes = {}
    generate_codes(root, "", codes)
    
    with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
        # Write the frequency dictionary instead of codes
        pickle.dump(frequency, outfile)
        
        # Compress the file content
        buffer = ""
        compressed_data = bytearray()
        while True:
            chunk = infile.read(8192)
            if not chunk:
                break
            for byte in chunk:
                buffer += codes[byte]
                while len(buffer) >= 8:
                    compressed_data.append(int(buffer[:8], 2))
                    buffer = buffer[8:]
        
        # Handle remaining bits
        if buffer:
            padding = 8 - len(buffer)
            buffer += "0" * padding
            compressed_data.append(int(buffer, 2))
        else:
            padding = 0
        
        # Write padding (should be a single byte)
        outfile.write(bytes([padding]))
        
        # Write compressed data
        outfile.write(compressed_data)

In [15]:
def decompress(input_file, output_file):
    with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
        # Read the frequency dictionary
        frequency = pickle.load(infile)
        
        # Rebuild the Huffman tree
        root = build_huffman_tree(frequency)
        codes = {}
        generate_codes(root, "", codes)
        reversed_codes = {code: bytes([char]) for char, code in codes.items()}
        
        # Read padding (as a single byte)
        padding = infile.read(1)[0]
        
        # Read compressed data
        compressed_data = infile.read()
        
        # Decompress
        buffer = ""
        for byte in compressed_data[:-1]:
            buffer += format(byte, '08b')
        
        # Handle the last byte, removing the padding bits
        last_byte = compressed_data[-1]
        buffer += format(last_byte, '08b')[:-padding] if padding else format(last_byte, '08b')
        
        current_code = ""
        for bit in buffer:
            current_code += bit
            if current_code in reversed_codes:
                outfile.write(reversed_codes[current_code])
                current_code = ""

In [16]:
import os

def main():
    input_file = "test_proj.txt"
    compressed_file = "compressed.bin"
    decompressed_file = "decompressed.txt"
    
    # Read and print the original file
    try:
        with open(input_file, 'r', encoding='utf-8') as f:
            original_content = f.read()
        print("\nContents of the original file:")
        print("------------------------------------")
        print(original_content)
        print("------------------------------------\n")
    except FileNotFoundError:
        print(f"Error: The file {input_file} was not found.")
        return
    except IOError:
        print(f"Error: There was an issue reading the file {input_file}.")
        return
    
    # Compress the file
    compress(input_file, compressed_file)
    # Read and print the decompressed file contents
    try:
        with open(compressed_file, 'rb') as f:
            compressed_content = f.read()
        print("\nContents of the compressed file:")
        print("------------------------------------")
        print(compressed_content)
        print("------------------------------------\n")
    except IOError:
        print(f"Error: There was an issue reading the compressed file {compressed_file}.")
        return

    # Calculate the compressed data size
    original_size = os.path.getsize(input_file)
    compressed_size = os.path.getsize(compressed_file)
    print(f"Original file size: {original_size} bytes")
    print(f"Compressed file size: {compressed_size} bytes")
    
    # Decompress the file
    decompress(compressed_file, decompressed_file)
    
    # Read and print the decompressed file contents
    try:
        with open(decompressed_file, 'r', encoding='utf-8') as f:
            decompressed_content = f.read()
        print("\nContents of the decompressed file:")
        print("------------------------------------")
        print(decompressed_content)
        print("------------------------------------\n")
    except IOError:
        print(f"Error: There was an issue reading the decompressed file {decompressed_file}.")
        return
    
    # Verify the decompressed file matches the original
    print(f"Decompression successful: {original_content == decompressed_content}")

if __name__ == "__main__":
    main()


Contents of the original file:
------------------------------------
Embark on a thrilling journey to revolutionize exoplanet education! The discovery of exoplanets has redefined our understanding of planetary systems, expanding what we know about our place in the universe. From scorching gas giants to potentially habitable rocky worlds, these distant worlds offer a glimpse into the remarkable diversity of planetary configurations. Traditional educational materials about this topic may not be accessible to everyone, particularly those from underserved communities or with limited access to resources. Your challenge is to develop engaging and accessible learning materials that leverage creativity to enlighten students about the wonders of exoplanets.
------------------------------------


Contents of the compressed file:
------------------------------------
b'\x80\x04\x95\xbc\x00\x00\x00\x00\x00\x00\x00\x8c\x0bcollections\x94\x8c\x0bdefaultdict\x94\x93\x94\x8c\x08builtins\x94\x8c\x03int\

## GUI

In [17]:
# !pip install gradio

# Huffman Compression Tool Analysis

## Overview
This code implements a Huffman compression tool with a Gradio-based user interface. The tool allows users to compress a file using Huffman coding, decompress it, and analyze the results.

## Key Components

### 1. Data Structures
- `Node` class: Represents a node in the Huffman tree, containing character, frequency, and left/right child information.
- Frequency dictionary: Stores the frequency of each byte in the input file.
- Huffman tree: Built using the frequency information to generate optimal codes.

### 2. Compression Process
1. Build frequency dictionary from input file.
2. Construct Huffman tree based on frequencies.
3. Generate Huffman codes for each byte.
4. Compress input file using generated codes.
5. Store frequency information and compressed data in output file.

### 3. Decompression Process
1. Read frequency information from compressed file.
2. Reconstruct Huffman tree.
3. Decode compressed data using the tree.
4. Write decompressed data to output file.

### 4. User Interface
- Utilizes Gradio to create a web-based interface.
- Allows users to upload a file for compression.
- Displays compression results and offers the decompressed file for download.

## Key Functions

1. `build_frequency_dict(filename)`: Counts byte frequencies in the input file.
2. `build_huffman_tree(frequency)`: Constructs the Huffman tree using a min-heap.
3. `generate_codes(root, current_code, codes)`: Creates Huffman codes for each byte.
4. `compress(input_file, output_file)`: Performs the compression process.
5. `decompress(input_file, output_file)`: Performs the decompression process.
6. `huffman_compression(input_file)`: Main function that handles compression, decompression, and result analysis.

## Results Analysis

The `huffman_compression` function provides the following results:

1. Original file size (in bytes)
2. Compressed file size (in bytes)
3. Compression ratio (as a percentage)
4. Decompression success status (boolean)

These results allow users to evaluate the effectiveness of the compression for their specific files.

## Potential Improvements

1. Error handling: Add robust error handling for file operations and user inputs.
2. Progress tracking: Implement progress bars for compression and decompression processes.
3. Multi-file support: Allow compression of multiple files or entire directories.
4. Adaptive Huffman coding: Implement adaptive Huffman coding for better compression of dynamic data.
5. Compression options: Provide user-configurable options like block size or compression level.

## Conclusion

This Huffman compression tool provides a user-friendly interface for file compression using Huffman coding. It effectively demonstrates the compression algorithm's workings and provides useful metrics for analysis. With some enhancements, it could be developed into a more robust and feature-rich compression utility.

In [18]:
import gradio as gr
import heapq
from collections import defaultdict
import pickle
import os

class Node:
    def __init__(self, char, freq):
        self.char = char
        self.freq = freq
        self.left = None
        self.right = None

    def __lt__(self, other):
        return self.freq < other.freq

def build_frequency_dict(filename):
    frequency = defaultdict(int)
    with open(filename, 'rb') as file:
        while True:
            chunk = file.read(8192)
            if not chunk:
                break
            for byte in chunk:
                frequency[byte] += 1
    return frequency

def build_huffman_tree(frequency):
    heap = [Node(char, freq) for char, freq in frequency.items()]
    heapq.heapify(heap)
    while len(heap) > 1:
        left = heapq.heappop(heap)
        right = heapq.heappop(heap)
        internal = Node(None, left.freq + right.freq)
        internal.left = left
        internal.right = right
        heapq.heappush(heap, internal)
    return heap[0]

def generate_codes(root, current_code, codes):
    if root is None:
        return
    if root.char is not None:
        codes[root.char] = current_code
        return
    generate_codes(root.left, current_code + "0", codes)
    generate_codes(root.right, current_code + "1", codes)

def compress(input_file, output_file):
    frequency = build_frequency_dict(input_file)
    root = build_huffman_tree(frequency)
    codes = {}
    generate_codes(root, "", codes)

    with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
        pickle.dump(frequency, outfile)
        buffer = ""
        compressed_data = bytearray()
        while True:
            chunk = infile.read(8192)
            if not chunk:
                break
            for byte in chunk:
                buffer += codes[byte]
                while len(buffer) >= 8:
                    compressed_data.append(int(buffer[:8], 2))
                    buffer = buffer[8:]
        if buffer:
            padding = 8 - len(buffer)
            buffer += "0" * padding
            compressed_data.append(int(buffer, 2))
        else:
            padding = 0
        outfile.write(bytes([padding]))
        outfile.write(compressed_data)

def decompress(input_file, output_file):
    with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
        frequency = pickle.load(infile)
        root = build_huffman_tree(frequency)
        codes = {}
        generate_codes(root, "", codes)
        reversed_codes = {code: bytes([char]) for char, code in codes.items()}
        
        padding = infile.read(1)[0]
        compressed_data = infile.read()
        
        buffer = ""
        for byte in compressed_data[:-1]:
            buffer += format(byte, '08b')
        last_byte = compressed_data[-1]
        buffer += format(last_byte, '08b')[:-padding] if padding else format(last_byte, '08b')
        
        current_code = ""
        for bit in buffer:
            current_code += bit
            if current_code in reversed_codes:
                outfile.write(reversed_codes[current_code])
                current_code = ""

def huffman_compression(input_file):
    original_size = os.path.getsize(input_file)
    compressed_file = "compressed.bin"
    decompressed_file = "decompressed.txt"

    compress(input_file, compressed_file)
    compressed_size = os.path.getsize(compressed_file)

    decompress(compressed_file, decompressed_file)

    with open(input_file, 'r', encoding='utf-8') as f:
        original_content = f.read()
    with open(decompressed_file, 'r', encoding='utf-8') as f:
        decompressed_content = f.read()

    compression_ratio = (compressed_size / original_size) * 100
    success = original_content == decompressed_content

    result = f"""Original file size: {original_size} bytes
Compressed file size: {compressed_size} bytes
Compression ratio: {compression_ratio:.2f}%
Decompression successful: {success}"""

    return result, decompressed_file

iface = gr.Interface(
    fn=huffman_compression,
    inputs=gr.File(label="Select a file to compress"),
    outputs=[
        gr.Textbox(label="Compression Results"),
        gr.File(label="Download Decompressed File")
    ],
    title="Huffman Compression Tool",
    description="Upload a file to compress using Huffman coding, then decompress it and download the result."
)

if __name__ == "__main__":
    iface.launch()

Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.
