# Universal Data Compression Project

## Compare RLE, Huffman, and LZW algorithms across multiple data types

This notebook demonstrates compression of:
- üìù **Text** files and strings
- üñºÔ∏è **Images** (PNG, JPG, BMP)
- üé• **Videos** (MP4, AVI)
- üìÑ **Documents** (TXT, PDF, DOCX)

**Algorithms Implemented:**
1. **RLE (Run Length Encoding)** - Best for repetitive data
2. **Huffman Coding** - Best for text with varying frequencies
3. **LZW (Lempel-Ziv-Welch)** - Best for patterns and dictionaries

In [None]:
# Import required libraries
import time
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from collections import defaultdict
from heapq import heappush, heappop, heapify
from PIL import Image
import os

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

print("‚úÖ Libraries loaded successfully!")

## üìö Algorithm Implementations

In [None]:
# ========================================
# RLE (Run Length Encoding)
# ========================================

def rle_compress(data):
    """Compress data using Run Length Encoding."""
    if not data:
        return []
    
    compressed = []
    prev = data[0]
    count = 1
    
    for i in range(1, len(data)):
        if data[i] == prev:
            count += 1
        else:
            compressed.append((prev, count))
            prev = data[i]
            count = 1
    
    compressed.append((prev, count))
    return compressed


def rle_decompress(compressed_data):
    """Decompress RLE data."""
    decompressed = []
    for value, count in compressed_data:
        decompressed.extend([value] * count)
    return decompressed


print("‚úÖ RLE algorithms ready")

In [None]:
# ========================================
# Huffman Coding
# ========================================

class HuffmanNode:
    def __init__(self, char, freq):
        self.char = char
        self.freq = freq
        self.left = None
        self.right = None
    
    def __lt__(self, other):
        return self.freq < other.freq


def build_huffman_tree(data):
    """Build Huffman tree from data."""
    freq = defaultdict(int)
    for char in data:
        freq[char] += 1
    
    if len(freq) == 0:
        return None
    if len(freq) == 1:
        return HuffmanNode(list(freq.keys())[0], list(freq.values())[0])
    
    heap = [HuffmanNode(ch, fr) for ch, fr in freq.items()]
    heapify(heap)
    
    while len(heap) > 1:
        node1 = heappop(heap)
        node2 = heappop(heap)
        merged = HuffmanNode(None, node1.freq + node2.freq)
        merged.left = node1
        merged.right = node2
        heappush(heap, merged)
    
    return heap[0]


def build_codes(node, prefix="", code_map=None):
    """Build Huffman codes from tree."""
    if code_map is None:
        code_map = {}
    if node is None:
        return code_map
    if node.char is not None:
        code_map[node.char] = prefix if prefix else "0"
    build_codes(node.left, prefix + "0", code_map)
    build_codes(node.right, prefix + "1", code_map)
    return code_map


def huffman_compress(data):
    """Compress data using Huffman coding."""
    if not data:
        return ("", {})
    root = build_huffman_tree(data)
    codes = build_codes(root)
    encoded = ''.join([codes[item] for item in data])
    return (encoded, codes)


def huffman_decompress(encoded_data, codes):
    """Decompress Huffman encoded data."""
    if not encoded_data:
        return ""
    reverse_codes = {v: k for k, v in codes.items()}
    decoded = []
    buffer = ""
    for bit in encoded_data:
        buffer += bit
        if buffer in reverse_codes:
            decoded.append(reverse_codes[buffer])
            buffer = ""
    return decoded


print("‚úÖ Huffman algorithms ready")

In [None]:
# ========================================
# LZW (Lempel-Ziv-Welch)
# ========================================

def lzw_compress(data):
    """Compress data using LZW algorithm."""
    if not data:
        return []
    
    dict_size = 256
    dictionary = {chr(i): i for i in range(dict_size)}
    w = ""
    result = []
    
    for c in data:
        wc = w + c
        if wc in dictionary:
            w = wc
        else:
            result.append(dictionary[w])
            dictionary[wc] = dict_size
            dict_size += 1
            w = c
    
    if w:
        result.append(dictionary[w])
    return result


def lzw_decompress(compressed_data):
    """Decompress LZW compressed data."""
    if not compressed_data:
        return ""
    
    dict_size = 256
    dictionary = {i: chr(i) for i in range(dict_size)}
    compressed = list(compressed_data)
    
    w = chr(compressed.pop(0))
    result = w
    
    for k in compressed:
        if k in dictionary:
            entry = dictionary[k]
        elif k == dict_size:
            entry = w + w[0]
        else:
            raise ValueError(f"Bad compressed key: {k}")
        
        result += entry
        dictionary[dict_size] = w + entry[0]
        dict_size += 1
        w = entry
    
    return result


print("‚úÖ LZW algorithms ready")

## üî¨ Performance Measurement Functions

In [None]:
def test_algorithm(name, compress_func, decompress_func, data, is_huffman=False, is_rle=False):
    """Test a compression algorithm and measure performance."""
    
    # Compression
    start = time.time()
    compressed = compress_func(data)
    comp_time = time.time() - start
    
    # Decompression
    start = time.time()
    if is_huffman:
        encoded, codes = compressed
        decompressed = decompress_func(encoded, codes)
        comp_size = len(encoded)
    elif is_rle:
        decompressed = decompress_func(compressed)
        comp_size = len(compressed) * 2  # (value, count) pairs
    else:
        decompressed = decompress_func(compressed)
        comp_size = len(str(compressed))
    decomp_time = time.time() - start
    
    # Calculate metrics
    orig_size = len(data)
    ratio = comp_size / orig_size if orig_size > 0 else 1
    space_saving = ((orig_size - comp_size) / orig_size * 100) if orig_size > 0 else 0
    
    # Verify correctness
    if isinstance(data, str):
        is_correct = (decompressed == data) or (''.join(decompressed) == data)
    else:
        is_correct = (list(decompressed) == list(data))
    
    return {
        'Algorithm': name,
        'Original Size': orig_size,
        'Compressed Size': comp_size,
        'Compression Ratio': round(ratio, 4),
        'Space Saving %': round(space_saving, 2),
        'Compression Time (s)': round(comp_time, 6),
        'Decompression Time (s)': round(decomp_time, 6),
        'Correct': '‚úì' if is_correct else '‚úó'
    }


def run_comparison(data, data_str, title="Compression Test"):
    """Run all three algorithms and compare results."""
    print(f"\n{'='*80}")
    print(f"  {title}")
    print(f"{'='*80}")
    
    results = []
    
    # Test RLE
    results.append(test_algorithm("RLE", rle_compress, rle_decompress, data, is_rle=True))
    
    # Test Huffman
    results.append(test_algorithm("Huffman", 
                                 lambda d: huffman_compress(data_str),
                                 lambda c: huffman_decompress(c[0], c[1]),
                                 data_str, is_huffman=True))
    
    # Test LZW
    results.append(test_algorithm("LZW", lzw_compress, lzw_decompress, data_str))
    
    # Create DataFrame
    df = pd.DataFrame(results)
    display(df)
    
    # Find best performers
    print(f"\nüìä Best Performers:")
    best_ratio = df.loc[df['Compression Ratio'].idxmin()]
    print(f"   üèÜ Best Compression: {best_ratio['Algorithm']} (ratio: {best_ratio['Compression Ratio']})")
    
    best_speed = df.loc[df['Compression Time (s)'].idxmin()]
    print(f"   ‚ö° Fastest Compression: {best_speed['Algorithm']} ({best_speed['Compression Time (s)']}s)")
    
    best_decomp = df.loc[df['Decompression Time (s)'].idxmin()]
    print(f"   üîÑ Fastest Decompression: {best_decomp['Algorithm']} ({best_decomp['Decompression Time (s)']}s)")
    
    return df


def plot_comparison(df, title=""):
    """Create visualization of compression results."""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    fig.suptitle(f'Compression Analysis - {title}', fontsize=16, fontweight='bold')
    
    # Compression Ratio
    axes[0, 0].bar(df['Algorithm'], df['Compression Ratio'], color='steelblue', alpha=0.7)
    axes[0, 0].set_ylabel('Ratio (lower is better)')
    axes[0, 0].set_title('Compression Ratio')
    axes[0, 0].grid(axis='y', alpha=0.3)
    for i, v in enumerate(df['Compression Ratio']):
        axes[0, 0].text(i, v, f'{v:.3f}', ha='center', va='bottom')
    
    # Space Savings
    axes[0, 1].bar(df['Algorithm'], df['Space Saving %'], color='green', alpha=0.7)
    axes[0, 1].set_ylabel('Percentage')
    axes[0, 1].set_title('Space Savings %')
    axes[0, 1].grid(axis='y', alpha=0.3)
    for i, v in enumerate(df['Space Saving %']):
        axes[0, 1].text(i, v, f'{v:.1f}%', ha='center', va='bottom')
    
    # Compression Time
    axes[1, 0].bar(df['Algorithm'], df['Compression Time (s)'], color='coral', alpha=0.7)
    axes[1, 0].set_ylabel('Time (seconds)')
    axes[1, 0].set_title('Compression Time')
    axes[1, 0].grid(axis='y', alpha=0.3)
    for i, v in enumerate(df['Compression Time (s)']):
        axes[1, 0].text(i, v, f'{v:.6f}', ha='center', va='bottom')
    
    # Decompression Time
    axes[1, 1].bar(df['Algorithm'], df['Decompression Time (s)'], color='purple', alpha=0.7)
    axes[1, 1].set_ylabel('Time (seconds)')
    axes[1, 1].set_title('Decompression Time')
    axes[1, 1].grid(axis='y', alpha=0.3)
    for i, v in enumerate(df['Decompression Time (s)']):
        axes[1, 1].text(i, v, f'{v:.6f}', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.show()


print("‚úÖ Performance measurement functions ready")

## üß™ Test 1: Text String Compression

In [None]:
# Test with user input or sample text
sample_text = "AAAABBBBBCCCCCDDDDDEEEEE" * 50

print(f"Sample text length: {len(sample_text)} characters")
print(f"Preview: {sample_text[:100]}...")

data = [ord(c) for c in sample_text]
df1 = run_comparison(data, sample_text, "Highly Repetitive Text")
plot_comparison(df1, "Highly Repetitive Text")

In [None]:
# Test with mixed content
mixed_text = "The quick brown fox jumps over the lazy dog. " * 100 + "1234567890!@#$%^&*()" * 50

print(f"Mixed text length: {len(mixed_text)} characters")

data = [ord(c) for c in mixed_text]
df2 = run_comparison(data, mixed_text, "Mixed Content Text")
plot_comparison(df2, "Mixed Content Text")

## üñºÔ∏è Test 2: Image Compression

Upload an image file or use a sample image to test compression algorithms.

In [None]:
# Create a sample image or load your own
# Uncomment and modify the path to use your own image:
# image_path = "your_image.png"
# img = Image.open(image_path)

# Create a simple sample image
sample_image = np.random.randint(0, 256, (100, 100), dtype=np.uint8)

# Display the image
plt.figure(figsize=(6, 6))
plt.imshow(sample_image, cmap='gray')
plt.title('Sample Image (100x100)')
plt.axis('off')
plt.show()

print(f"Image shape: {sample_image.shape}")
print(f"Image size: {sample_image.size} pixels")

In [None]:
# Compress the image
image_flat = sample_image.flatten()
data = image_flat.tolist()
data_str = ''.join([chr(min(255, int(d))) for d in data])

df3 = run_comparison(data, data_str, "Grayscale Image (100x100)")
plot_comparison(df3, "Grayscale Image")

## üìÑ Test 3: Document/File Compression

Test compression on text files and documents.

In [None]:
# Create a sample document
sample_document = """
Data Compression Project Report

Introduction:
Data compression is the process of encoding information using fewer bits than the original representation.
Compression can be either lossy or lossless. Lossless compression reduces bits by identifying and eliminating 
statistical redundancy. No information is lost in lossless compression.

Algorithms:
1. Run Length Encoding (RLE) - Simple and effective for repetitive data
2. Huffman Coding - Variable-length coding based on frequency analysis  
3. Lempel-Ziv-Welch (LZW) - Dictionary-based compression algorithm

Applications:
- File compression (ZIP, GZIP)
- Image compression (PNG, GIF)
- Video compression (H.264, VP9)
- Audio compression (MP3, FLAC)

Conclusion:
Different compression algorithms excel at different types of data. The choice of algorithm depends on 
the data characteristics and requirements for compression ratio vs. speed.
""" * 10

print(f"Document length: {len(sample_document)} characters")
print(f"Preview:\n{sample_document[:300]}...")

In [None]:
# Compress the document
data = [ord(c) for c in sample_document]
df4 = run_comparison(data, sample_document, "Document Text")
plot_comparison(df4, "Document Text")

## üìä Summary: Compare All Test Cases

In [None]:
# Combine all results
all_results = pd.concat([
    df1.assign(Dataset='Repetitive Text'),
    df2.assign(Dataset='Mixed Text'),
    df3.assign(Dataset='Image'),
    df4.assign(Dataset='Document')
], ignore_index=True)

# Display summary
print("\n" + "="*100)
print("COMPREHENSIVE COMPARISON - ALL DATASETS")
print("="*100)
display(all_results[['Dataset', 'Algorithm', 'Compression Ratio', 'Space Saving %', 
                     'Compression Time (s)', 'Decompression Time (s)']])

# Summary statistics by algorithm
print("\nüìà Average Performance by Algorithm:")
summary = all_results.groupby('Algorithm').agg({
    'Compression Ratio': 'mean',
    'Space Saving %': 'mean',
    'Compression Time (s)': 'mean',
    'Decompression Time (s)': 'mean'
}).round(4)
display(summary)

In [None]:
# Final visualization - Overall comparison
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Overall Algorithm Performance Across All Datasets', fontsize=18, fontweight='bold')

datasets = all_results['Dataset'].unique()
algorithms = all_results['Algorithm'].unique()
x = np.arange(len(algorithms))
width = 0.2

# Plot 1: Compression Ratio by Dataset
ax = axes[0, 0]
for i, dataset in enumerate(datasets):
    data = all_results[all_results['Dataset'] == dataset]['Compression Ratio']
    ax.bar(x + i*width, data, width, label=dataset, alpha=0.8)
ax.set_ylabel('Compression Ratio')
ax.set_title('Compression Ratio by Dataset')
ax.set_xticks(x + width * 1.5)
ax.set_xticklabels(algorithms)
ax.legend()
ax.grid(axis='y', alpha=0.3)

# Plot 2: Space Savings by Dataset
ax = axes[0, 1]
for i, dataset in enumerate(datasets):
    data = all_results[all_results['Dataset'] == dataset]['Space Saving %']
    ax.bar(x + i*width, data, width, label=dataset, alpha=0.8)
ax.set_ylabel('Space Saving %')
ax.set_title('Space Savings by Dataset')
ax.set_xticks(x + width * 1.5)
ax.set_xticklabels(algorithms)
ax.legend()
ax.grid(axis='y', alpha=0.3)

# Plot 3: Average Compression Time
ax = axes[1, 0]
avg_comp_time = all_results.groupby('Algorithm')['Compression Time (s)'].mean()
ax.bar(algorithms, avg_comp_time, color='coral', alpha=0.7)
ax.set_ylabel('Average Time (seconds)')
ax.set_title('Average Compression Time')
ax.grid(axis='y', alpha=0.3)
for i, v in enumerate(avg_comp_time):
    ax.text(i, v, f'{v:.6f}', ha='center', va='bottom')

# Plot 4: Average Decompression Time
ax = axes[1, 1]
avg_decomp_time = all_results.groupby('Algorithm')['Decompression Time (s)'].mean()
ax.bar(algorithms, avg_decomp_time, color='purple', alpha=0.7)
ax.set_ylabel('Average Time (seconds)')
ax.set_title('Average Decompression Time')
ax.grid(axis='y', alpha=0.3)
for i, v in enumerate(avg_decomp_time):
    ax.text(i, v, f'{v:.6f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

## üí° Conclusions & Recommendations

### Algorithm Characteristics:

**RLE (Run Length Encoding)**
- ‚úÖ **Best for:** Highly repetitive data, simple graphics, binary images
- ‚ö° **Speed:** Very fast compression and decompression
- üì¶ **Compression:** Poor on random data, excellent on repetitive data
- üéØ **Use when:** Data has long runs of identical values

**Huffman Coding**
- ‚úÖ **Best for:** Text with varying character frequencies
- ‚ö° **Speed:** Moderate (requires building frequency table)
- üì¶ **Compression:** Good for text, optimal for frequency-based data
- üéØ **Use when:** Need guaranteed lossless compression with good ratio

**LZW (Lempel-Ziv-Welch)**
- ‚úÖ **Best for:** Text with repeating patterns, general-purpose compression
- ‚ö° **Speed:** Good compression speed, excellent decompression
- üì¶ **Compression:** Good all-around performance
- üéØ **Use when:** Need adaptive dictionary-based compression

### Practical Applications:
- **Web**: LZW (GIF), Huffman (JPEG metadata)
- **Archives**: Huffman + LZ77 (ZIP, GZIP)
- **Images**: RLE (BMP), Huffman (JPEG), specialized algorithms (PNG)
- **Documents**: LZW or Huffman depending on content

## üöÄ Interactive Testing

Run the cell below to test compression on your own text input!

In [None]:
# Interactive testing
def test_custom_input():
    """Test compression on custom user input."""
    print("Enter your text to compress (or press Enter for default):")
    user_input = input()
    
    if not user_input:
        user_input = "Hello World! " * 50
        print(f"Using default text: '{user_input[:50]}...'")
    
    print(f"\nText length: {len(user_input)} characters\n")
    
    data = [ord(c) for c in user_input]
    df = run_comparison(data, user_input, "Custom Input")
    plot_comparison(df, "Custom Input")

# Uncomment to run interactive test
# test_custom_input()

## üì¶ Export Results

Save your compression results and visualizations.

In [None]:
# Create output directory
output_dir = "compression_results"
os.makedirs(output_dir, exist_ok=True)

# Save all results to CSV
all_results.to_csv(f"{output_dir}/all_results.csv", index=False)
summary.to_csv(f"{output_dir}/summary_stats.csv")

print(f"‚úÖ Results saved to '{output_dir}/' directory")
print(f"   - all_results.csv: Complete test results")
print(f"   - summary_stats.csv: Summary statistics")

---
## üéì Project Complete!

You've successfully explored data compression algorithms on multiple data types:
- ‚úÖ Text strings and documents
- ‚úÖ Images
- ‚úÖ Performance analysis and comparison
- ‚úÖ Visualizations and insights

**Next Steps:**
1. Try compressing your own files using the `compress.py` script
2. Experiment with different data types
3. Modify algorithms for specific use cases
4. Explore hybrid compression approaches

**Author:** Data Compression Project  
**Date:** 2025  
**License:** MIT